Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 122c4019

History | View | Annotate | Download (22.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client)
43
from synnefo.logic import utils
44

    
45
from logging import getLogger
46
log = getLogger(__name__)
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
@transaction.commit_on_success
58
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
59
    """Process a job progress notification from the backend
60

61
    Process an incoming message from the backend (currently Ganeti).
62
    Job notifications with a terminating status (sucess, error, or canceled),
63
    also update the operating state of the VM.
64

65
    """
66
    # See #1492, #1031, #1111 why this line has been removed
67
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
68
    if status not in [x[0] for x in BACKEND_STATUSES]:
69
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
70

    
71
    vm.backendjobid = jobid
72
    vm.backendjobstatus = status
73
    vm.backendopcode = opcode
74
    vm.backendlogmsg = logmsg
75

    
76
    # Notifications of success change the operating state
77
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
78
    if status == 'success' and state_for_success is not None:
79
        vm.operstate = state_for_success
80

    
81
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
82
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
83
        vm.operstate = 'ERROR'
84
        vm.backendtime = etime
85

    
86
    if opcode == 'OP_INSTANCE_REMOVE':
87
        # Set the deleted flag explicitly, cater for admin-initiated removals
88
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
89
        # when no instance exists at the Ganeti backend.
90
        # See ticket #799 for all the details.
91
        #
92
        if status == 'success' or (status == 'error' and
93
                                   vm.operstate == 'ERROR'):
94
            release_instance_nics(vm)
95
            vm.nics.all().delete()
96
            vm.deleted = True
97
            vm.operstate = state_for_success
98
            vm.backendtime = etime
99

    
100
    # Update backendtime only for jobs that have been successfully completed,
101
    # since only these jobs update the state of the VM. Else a "race condition"
102
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
103
    # before an error job and messages arrive in reversed order.
104
    if status == 'success':
105
        vm.backendtime = etime
106

    
107
    vm.save()
108

    
109

    
110
@transaction.commit_on_success
111
def process_net_status(vm, etime, nics):
112
    """Process a net status notification from the backend
113

114
    Process an incoming message from the Ganeti backend,
115
    detailing the NIC configuration of a VM instance.
116

117
    Update the state of the VM in the DB accordingly.
118
    """
119

    
120
    release_instance_nics(vm)
121

    
122
    new_nics = enumerate(nics)
123
    for i, new_nic in new_nics:
124
        network = new_nic.get('network', '')
125
        n = str(network)
126
        pk = utils.id_from_network_name(n)
127

    
128
        net = Network.objects.get(pk=pk)
129

    
130
        # Get the new nic info
131
        mac = new_nic.get('mac', '')
132
        ipv4 = new_nic.get('ip', '')
133
        ipv6 = new_nic.get('ipv6', '')
134

    
135
        firewall = new_nic.get('firewall', '')
136
        firewall_profile = _reverse_tags.get(firewall, '')
137
        if not firewall_profile and net.public:
138
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
139

    
140
        if ipv4:
141
            net.reserve_address(ipv4)
142

    
143
        vm.nics.create(
144
            network=net,
145
            index=i,
146
            mac=mac,
147
            ipv4=ipv4,
148
            ipv6=ipv6,
149
            firewall_profile=firewall_profile,
150
            dirty=False)
151

    
152
    vm.backendtime = etime
153
    vm.save()
154

    
155

    
156
def release_instance_nics(vm):
157
    for nic in vm.nics.all():
158
        if nic.ipv4:
159
            nic.network.release_address(nic.ipv4)
160
        nic.delete()
161

    
162

    
163
@transaction.commit_on_success
164
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
165
    if status not in [x[0] for x in BACKEND_STATUSES]:
166
        return
167
        #raise Network.InvalidBackendMsgError(opcode, status)
168

    
169
    back_network.backendjobid = jobid
170
    back_network.backendjobstatus = status
171
    back_network.backendopcode = opcode
172
    back_network.backendlogmsg = logmsg
173

    
174
    # Notifications of success change the operating state
175
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
176
    if status == 'success' and state_for_success is not None:
177
        back_network.operstate = state_for_success
178

    
179
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
180
        utils.update_state(back_network, 'ERROR')
181
        back_network.backendtime = etime
182

    
183
    if opcode == 'OP_NETWORK_REMOVE':
184
        if status == 'success' or (status == 'error' and
185
                                   back_network.operstate == 'ERROR'):
186
            back_network.operstate = state_for_success
187
            back_network.deleted = True
188
            back_network.backendtime = etime
189

    
190
    if status == 'success':
191
        back_network.backendtime = etime
192
    back_network.save()
193

    
194

    
195
@transaction.commit_on_success
196
def process_create_progress(vm, etime, rprogress, wprogress):
197

    
198
    # XXX: This only uses the read progress for now.
199
    #      Explore whether it would make sense to use the value of wprogress
200
    #      somewhere.
201
    percentage = int(rprogress)
202

    
203
    # The percentage may exceed 100%, due to the way
204
    # snf-progress-monitor tracks bytes read by image handling processes
205
    percentage = 100 if percentage > 100 else percentage
206
    if percentage < 0:
207
        raise ValueError("Percentage cannot be negative")
208

    
209
    # FIXME: log a warning here, see #1033
210
#   if last_update > percentage:
211
#       raise ValueError("Build percentage should increase monotonically " \
212
#                        "(old = %d, new = %d)" % (last_update, percentage))
213

    
214
    # This assumes that no message of type 'ganeti-create-progress' is going to
215
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
216
    # the instance is STARTED.  What if the two messages are processed by two
217
    # separate dispatcher threads, and the 'ganeti-op-status' message for
218
    # successful creation gets processed before the 'ganeti-create-progress'
219
    # message? [vkoukis]
220
    #
221
    #if not vm.operstate == 'BUILD':
222
    #    raise VirtualMachine.IllegalState("VM is not in building state")
223

    
224
    vm.buildpercentage = percentage
225
    vm.backendtime = etime
226
    vm.save()
227

    
228

    
229
def start_action(vm, action):
230
    """Update the state of a VM when a new action is initiated."""
231
    log.debug("Applying action %s to VM %s", action, vm)
232

    
233
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
234
        raise VirtualMachine.InvalidActionError(action)
235

    
236
    # No actions to deleted and no actions beside destroy to suspended VMs
237
    if vm.deleted:
238
        raise VirtualMachine.DeletedError
239

    
240
    # No actions to machines being built. They may be destroyed, however.
241
    if vm.operstate == 'BUILD' and action != 'DESTROY':
242
        raise VirtualMachine.BuildingError
243

    
244
    vm.action = action
245
    vm.backendjobid = None
246
    vm.backendopcode = None
247
    vm.backendjobstatus = None
248
    vm.backendlogmsg = None
249

    
250
    # Update the relevant flags if the VM is being suspended or destroyed.
251
    # Do not set the deleted flag here, see ticket #721.
252
    #
253
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
254
    # completes successfully. Hence, a server may be visible for some time
255
    # after a DELETE /servers/id returns HTTP 204.
256
    #
257
    if action == "DESTROY":
258
        # vm.deleted = True
259
        pass
260
    elif action == "SUSPEND":
261
        vm.suspended = True
262
    elif action == "START":
263
        vm.suspended = False
264
    vm.save()
265

    
266

    
267
def create_instance(vm, public_nic, flavor, image, password, personality):
268
    """`image` is a dictionary which should contain the keys:
269
            'backend_id', 'format' and 'metadata'
270

271
        metadata value should be a dictionary.
272
    """
273

    
274
    if settings.IGNORE_FLAVOR_DISK_SIZES:
275
        if image['backend_id'].find("windows") >= 0:
276
            sz = 14000
277
        else:
278
            sz = 4000
279
    else:
280
        sz = flavor.disk * 1024
281

    
282
    # Handle arguments to CreateInstance() as a dictionary,
283
    # initialize it based on a deployment-specific value.
284
    # This enables the administrator to override deployment-specific
285
    # arguments, such as the disk template to use, name of os provider
286
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
287
    #
288
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
289
    kw['mode'] = 'create'
290
    kw['name'] = vm.backend_vm_id
291
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
292

    
293
    # Identify if provider parameter should be set in disk options.
294
    # Current implementation support providers only fo ext template.
295
    # To select specific provider for an ext template, template name
296
    # should be formated as `ext_<provider_name>`.
297
    provider = None
298
    disk_template = flavor.disk_template
299
    if flavor.disk_template.startswith("ext"):
300
        disk_template, provider = flavor.disk_template.split("_", 1)
301

    
302
    kw['disk_template'] = disk_template
303
    kw['disks'] = [{"size": sz}]
304
    if provider:
305
        kw['disks'][0]['provider'] = provider
306

    
307
        if provider == 'vlmc':
308
            kw['disks'][0]['origin'] = image['backend_id']
309

    
310
    kw['nics'] = [public_nic]
311
    if settings.GANETI_USE_HOTPLUG:
312
        kw['hotplug'] = True
313
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
314
    # kw['os'] = settings.GANETI_OS_PROVIDER
315
    kw['ip_check'] = False
316
    kw['name_check'] = False
317
    # Do not specific a node explicitly, have
318
    # Ganeti use an iallocator instead
319
    #
320
    # kw['pnode']=rapi.GetNodes()[0]
321
    kw['dry_run'] = settings.TEST
322

    
323
    kw['beparams'] = {
324
        'auto_balance': True,
325
        'vcpus': flavor.cpu,
326
        'memory': flavor.ram}
327

    
328
    kw['osparams'] = {
329
        'img_id': image['backend_id'],
330
        'img_passwd': password,
331
        'img_format': image['format']}
332
    if personality:
333
        kw['osparams']['img_personality'] = json.dumps(personality)
334

    
335
    if provider != None and provider == 'vlmc':
336
        kw['osparams']['img_id'] = 'null'
337

    
338
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
339

    
340
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
341
    # kw['hvparams'] = dict(serial_console=False)
342
    log.debug("Creating instance %s", utils.hide_pass(kw))
343
    with pooled_rapi_client(vm) as client:
344
        return client.CreateInstance(**kw)
345

    
346

    
347
def delete_instance(vm):
348
    start_action(vm, 'DESTROY')
349
    with pooled_rapi_client(vm) as client:
350
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
351

    
352

    
353
def reboot_instance(vm, reboot_type):
354
    assert reboot_type in ('soft', 'hard')
355
    with pooled_rapi_client(vm) as client:
356
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
357
                                     dry_run=settings.TEST)
358

    
359

    
360
def startup_instance(vm):
361
    start_action(vm, 'START')
362
    with pooled_rapi_client(vm) as client:
363
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
364

    
365

    
366
def shutdown_instance(vm):
367
    start_action(vm, 'STOP')
368
    with pooled_rapi_client(vm) as client:
369
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
370

    
371

    
372
def get_instance_console(vm):
373
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
374
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
375
    # useless (see #783).
376
    #
377
    # Until this is fixed on the Ganeti side, construct a console info reply
378
    # directly.
379
    #
380
    # WARNING: This assumes that VNC runs on port network_port on
381
    #          the instance's primary node, and is probably
382
    #          hypervisor-specific.
383
    #
384
    log.debug("Getting console for vm %s", vm)
385

    
386
    console = {}
387
    console['kind'] = 'vnc'
388

    
389
    with pooled_rapi_client(vm) as client:
390
        i = client.GetInstance(vm.backend_vm_id)
391

    
392
    if i['hvparams']['serial_console']:
393
        raise Exception("hv parameter serial_console cannot be true")
394
    console['host'] = i['pnode']
395
    console['port'] = i['network_port']
396

    
397
    return console
398

    
399

    
400
def get_instance_info(vm):
401
    with pooled_rapi_client(vm) as client:
402
        return client.GetInstanceInfo(vm.backend_vm_id)
403

    
404

    
405
def create_network(network, backends=None, connect=True):
406
    """Create and connect a network."""
407
    if not backends:
408
        backends = Backend.objects.exclude(offline=True)
409

    
410
    log.debug("Creating network %s in backends %s", network, backends)
411

    
412
    for backend in backends:
413
        create_jobID = _create_network(network, backend)
414
        if connect:
415
            connect_network(network, backend, create_jobID)
416

    
417

    
418
def _create_network(network, backend):
419
    """Create a network."""
420

    
421
    network_type = network.public and 'public' or 'private'
422

    
423
    tags = network.backend_tag
424
    if network.dhcp:
425
        tags.append('nfdhcpd')
426
    tags = ','.join(tags)
427

    
428
    try:
429
        bn = BackendNetwork.objects.get(network=network, backend=backend)
430
        mac_prefix = bn.mac_prefix
431
    except BackendNetwork.DoesNotExist:
432
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
433
                        " does not exist" % (network.id, backend.id))
434

    
435
    with pooled_rapi_client(backend) as client:
436
        return client.CreateNetwork(network_name=network.backend_id,
437
                                    network=network.subnet,
438
                                    network6=network.subnet6,
439
                                    gateway=network.gateway,
440
                                    gateway6=network.gateway6,
441
                                    network_type=network_type,
442
                                    mac_prefix=mac_prefix,
443
                                    tags=tags)
444

    
445

    
446
def connect_network(network, backend, depend_job=None, group=None):
447
    """Connect a network to nodegroups."""
448
    log.debug("Connecting network %s to backend %s", network, backend)
449

    
450
    mode = "routed" if "ROUTED" in network.type else "bridged"
451

    
452
    depend_jobs = [depend_job] if depend_job else []
453
    with pooled_rapi_client(backend) as client:
454
        if group:
455
            client.ConnectNetwork(network.backend_id, group, mode,
456
                                  network.link, depend_jobs)
457
        else:
458
            for group in client.GetGroups():
459
                client.ConnectNetwork(network.backend_id, group, mode,
460
                                      network.link, depend_jobs)
461

    
462

    
463
def delete_network(network, backends=None, disconnect=True):
464
    if not backends:
465
        backends = Backend.objects.exclude(offline=True)
466

    
467
    log.debug("Deleting network %s from backends %s", network, backends)
468

    
469
    for backend in backends:
470
        disconnect_jobIDs = []
471
        if disconnect:
472
            disconnect_jobIDs = disconnect_network(network, backend)
473
        _delete_network(network, backend, disconnect_jobIDs)
474

    
475

    
476
def _delete_network(network, backend, depend_jobs=[]):
477
    with pooled_rapi_client(backend) as client:
478
        return client.DeleteNetwork(network.backend_id, depend_jobs)
479

    
480

    
481
def disconnect_network(network, backend, group=None):
482
    log.debug("Disconnecting network %s to backend %s", network, backend)
483

    
484
    with pooled_rapi_client(backend) as client:
485
        if group:
486
            return [client.DisconnectNetwork(network.backend_id, group)]
487
        else:
488
            jobs = []
489
            for group in client.GetGroups():
490
                job = client.DisconnectNetwork(network.backend_id, group)
491
                jobs.append(job)
492
            return jobs
493

    
494

    
495
def connect_to_network(vm, network, address):
496
    nic = {'ip': address, 'network': network.backend_id}
497

    
498
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
499

    
500
    with pooled_rapi_client(vm) as client:
501
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
502
                                     hotplug=settings.GANETI_USE_HOTPLUG,
503
                                     dry_run=settings.TEST)
504

    
505

    
506
def disconnect_from_network(vm, nic):
507
    op = [('remove', nic.index, {})]
508

    
509
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
510

    
511
    with pooled_rapi_client(vm) as client:
512
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
513
                                     hotplug=settings.GANETI_USE_HOTPLUG,
514
                                     dry_run=settings.TEST)
515

    
516

    
517
def set_firewall_profile(vm, profile):
518
    try:
519
        tag = _firewall_tags[profile]
520
    except KeyError:
521
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
522

    
523
    log.debug("Setting tag of VM %s to %s", vm, profile)
524

    
525
    with pooled_rapi_client(vm) as client:
526
        # Delete all firewall tags
527
        for t in _firewall_tags.values():
528
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
529
                                      dry_run=settings.TEST)
530

    
531
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
532

    
533
        # XXX NOP ModifyInstance call to force process_net_status to run
534
        # on the dispatcher
535
        client.ModifyInstance(vm.backend_vm_id,
536
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
537

    
538

    
539
def get_ganeti_instances(backend=None, bulk=False):
540
    instances = []
541
    for backend in get_backends(backend):
542
        with pooled_rapi_client(backend) as client:
543
            instances.append(client.GetInstances(bulk=bulk))
544

    
545
    return reduce(list.__add__, instances, [])
546

    
547

    
548
def get_ganeti_nodes(backend=None, bulk=False):
549
    nodes = []
550
    for backend in get_backends(backend):
551
        with pooled_rapi_client(backend) as client:
552
            nodes.append(client.GetNodes(bulk=bulk))
553

    
554
    return reduce(list.__add__, nodes, [])
555

    
556

    
557
def get_ganeti_jobs(backend=None, bulk=False):
558
    jobs = []
559
    for backend in get_backends(backend):
560
        with pooled_rapi_client(backend) as client:
561
            jobs.append(client.GetJobs(bulk=bulk))
562
    return reduce(list.__add__, jobs, [])
563

    
564
##
565
##
566
##
567

    
568

    
569
def get_backends(backend=None):
570
    if backend:
571
        return [backend]
572
    return Backend.objects.filter(offline=False)
573

    
574

    
575
def get_physical_resources(backend):
576
    """ Get the physical resources of a backend.
577

578
    Get the resources of a backend as reported by the backend (not the db).
579

580
    """
581
    nodes = get_ganeti_nodes(backend, bulk=True)
582
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
583
    res = {}
584
    for a in attr:
585
        res[a] = 0
586
    for n in nodes:
587
        # Filter out drained, offline and not vm_capable nodes since they will
588
        # not take part in the vm allocation process
589
        if n['vm_capable'] and not n['drained'] and not n['offline']\
590
           and n['cnodes']:
591
            for a in attr:
592
                res[a] += int(n[a])
593
    return res
594

    
595

    
596
def update_resources(backend, resources=None):
597
    """ Update the state of the backend resources in db.
598

599
    """
600

    
601
    if not resources:
602
        resources = get_physical_resources(backend)
603

    
604
    backend.mfree = resources['mfree']
605
    backend.mtotal = resources['mtotal']
606
    backend.dfree = resources['dfree']
607
    backend.dtotal = resources['dtotal']
608
    backend.pinst_cnt = resources['pinst_cnt']
609
    backend.ctotal = resources['ctotal']
610
    backend.updated = datetime.now()
611
    backend.save()
612

    
613

    
614
def get_memory_from_instances(backend):
615
    """ Get the memory that is used from instances.
616

617
    Get the used memory of a backend. Note: This is different for
618
    the real memory used, due to kvm's memory de-duplication.
619

620
    """
621
    with pooled_rapi_client(backend) as client:
622
        instances = client.GetInstances(bulk=True)
623
    mem = 0
624
    for i in instances:
625
        mem += i['oper_ram']
626
    return mem
627

    
628
##
629
## Synchronized operations for reconciliation
630
##
631

    
632

    
633
def create_network_synced(network, backend):
634
    result = _create_network_synced(network, backend)
635
    if result[0] != 'success':
636
        return result
637
    result = connect_network_synced(network, backend)
638
    return result
639

    
640

    
641
def _create_network_synced(network, backend):
642
    with pooled_rapi_client(backend) as client:
643
        job = _create_network(network, backend)
644
        result = wait_for_job(client, job)
645
    return result
646

    
647

    
648
def connect_network_synced(network, backend):
649
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
650
        mode = 'routed'
651
    else:
652
        mode = 'bridged'
653
    with pooled_rapi_client(backend) as client:
654
        for group in client.GetGroups():
655
            job = client.ConnectNetwork(network.backend_id, group, mode,
656
                                        network.link)
657
            result = wait_for_job(client, job)
658
            if result[0] != 'success':
659
                return result
660

    
661
    return result
662

    
663

    
664
def wait_for_job(client, jobid):
665
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
666
    status = result['job_info'][0]
667
    while status not in ['success', 'error', 'cancel']:
668
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
669
                                        [result], None)
670
        status = result['job_info'][0]
671

    
672
    if status == 'success':
673
        return (status, None)
674
    else:
675
        error = result['job_info'][1]
676
        return (status, error)