Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ ad297723

History | View | Annotate | Download (22.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from logging import getLogger
37
from django.conf import settings
38
from django.db import transaction
39
from datetime import datetime
40

    
41
from synnefo.db.models import (Backend, VirtualMachine, Network,
42
                               BackendNetwork, BACKEND_STATUSES)
43
from synnefo.logic import utils, ippool
44
from synnefo.api.faults import OverLimit
45
from synnefo.util.rapi import GanetiRapiClient
46

    
47
log = getLogger('synnefo.logic')
48

    
49

    
50
_firewall_tags = {
51
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
52
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
53
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
54

    
55
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
56

    
57

    
58
def create_client(hostname, port=5080, username=None, password=None):
59
    return GanetiRapiClient(hostname, port, username, password)
60

    
61

    
62
@transaction.commit_on_success
63
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
64
    """Process a job progress notification from the backend
65

66
    Process an incoming message from the backend (currently Ganeti).
67
    Job notifications with a terminating status (sucess, error, or canceled),
68
    also update the operating state of the VM.
69

70
    """
71
    # See #1492, #1031, #1111 why this line has been removed
72
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
73
    if status not in [x[0] for x in BACKEND_STATUSES]:
74
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
75

    
76
    vm.backendjobid = jobid
77
    vm.backendjobstatus = status
78
    vm.backendopcode = opcode
79
    vm.backendlogmsg = logmsg
80

    
81
    # Notifications of success change the operating state
82
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
83
    if status == 'success' and state_for_success is not None:
84
        utils.update_state(vm, state_for_success)
85
        # Set the deleted flag explicitly, cater for admin-initiated removals
86
        if opcode == 'OP_INSTANCE_REMOVE':
87
            release_instance_nics(vm)
88
            vm.deleted = True
89
            vm.nics.all().delete()
90

    
91
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
92
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
93
        utils.update_state(vm, 'ERROR')
94

    
95
    # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
96
    # when no instance exists at the Ganeti backend.
97
    # See ticket #799 for all the details.
98
    #
99
    if (status == 'error' and opcode == 'OP_INSTANCE_REMOVE'):
100
        vm.deleted = True
101
        vm.nics.all().delete()
102

    
103
    vm.backendtime = etime
104
    # Any other notification of failure leaves the operating state unchanged
105

    
106
    vm.save()
107

    
108

    
109
@transaction.commit_on_success
110
def process_net_status(vm, etime, nics):
111
    """Process a net status notification from the backend
112

113
    Process an incoming message from the Ganeti backend,
114
    detailing the NIC configuration of a VM instance.
115

116
    Update the state of the VM in the DB accordingly.
117
    """
118

    
119
    # Release the ips of the old nics. Get back the networks as multiple
120
    # changes in the same network, must happen in the same Network object,
121
    # because transaction will be commited only on exit of the function.
122
    networks = release_instance_nics(vm)
123

    
124
    new_nics = enumerate(nics)
125
    for i, new_nic in new_nics:
126
        network = new_nic.get('network', '')
127
        n = str(network)
128
        pk = utils.id_from_network_name(n)
129

    
130
        # Get the cached Network or get it from DB
131
        if pk in networks:
132
            net = networks[pk]
133
        else:
134
            net = Network.objects.select_for_update().get(pk=pk)
135

    
136
        # Get the new nic info
137
        mac = new_nic.get('mac', '')
138
        ipv4 = new_nic.get('ip', '')
139
        ipv6 = new_nic.get('ipv6', '')
140

    
141
        firewall = new_nic.get('firewall', '')
142
        firewall_profile = _reverse_tags.get(firewall, '')
143
        if not firewall_profile and net.public:
144
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
145

    
146
        if ipv4:
147
            net.reserve_address(ipv4)
148

    
149
        vm.nics.create(
150
            network=net,
151
            index=i,
152
            mac=mac,
153
            ipv4=ipv4,
154
            ipv6=ipv6,
155
            firewall_profile=firewall_profile,
156
            dirty=False)
157

    
158
    vm.backendtime = etime
159
    vm.save()
160

    
161

    
162
def release_instance_nics(vm):
163
    networks = {}
164

    
165
    for nic in vm.nics.all():
166
        pk = nic.network.pk
167
        # Get the cached Network or get it from DB
168
        if pk in networks:
169
            net = networks[pk]
170
        else:
171
            # Get the network object in exclusive mode in order
172
            # to guarantee consistency of the address pool
173
            net = Network.objects.select_for_update().get(pk=pk)
174
        net.release_address(nic.ipv4)
175
        nic.delete()
176

    
177
    return networks
178

    
179

    
180
@transaction.commit_on_success
181
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
182
    if status not in [x[0] for x in BACKEND_STATUSES]:
183
        return
184
        #raise Network.InvalidBackendMsgError(opcode, status)
185

    
186
    back_network.backendjobid = jobid
187
    back_network.backendjobstatus = status
188
    back_network.backendopcode = opcode
189
    back_network.backendlogmsg = logmsg
190

    
191
    # Notifications of success change the operating state
192
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
193
    if status == 'success' and state_for_success is not None:
194
        back_network.operstate = state_for_success
195
        if opcode == 'OP_NETWORK_REMOVE':
196
            back_network.deleted = True
197

    
198
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
199
        utils.update_state(back_network, 'ERROR')
200

    
201
    if (status == 'error' and opcode == 'OP_NETWORK_REMOVE'):
202
        back_network.deleted = True
203
        back_network.operstate = 'DELETED'
204

    
205
    back_network.save()
206

    
207

    
208
@transaction.commit_on_success
209
def process_create_progress(vm, etime, rprogress, wprogress):
210

    
211
    # XXX: This only uses the read progress for now.
212
    #      Explore whether it would make sense to use the value of wprogress
213
    #      somewhere.
214
    percentage = int(rprogress)
215

    
216
    # The percentage may exceed 100%, due to the way
217
    # snf-progress-monitor tracks bytes read by image handling processes
218
    percentage = 100 if percentage > 100 else percentage
219
    if percentage < 0:
220
        raise ValueError("Percentage cannot be negative")
221

    
222
    # FIXME: log a warning here, see #1033
223
#   if last_update > percentage:
224
#       raise ValueError("Build percentage should increase monotonically " \
225
#                        "(old = %d, new = %d)" % (last_update, percentage))
226

    
227
    # This assumes that no message of type 'ganeti-create-progress' is going to
228
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
229
    # the instance is STARTED.  What if the two messages are processed by two
230
    # separate dispatcher threads, and the 'ganeti-op-status' message for
231
    # successful creation gets processed before the 'ganeti-create-progress'
232
    # message? [vkoukis]
233
    #
234
    #if not vm.operstate == 'BUILD':
235
    #    raise VirtualMachine.IllegalState("VM is not in building state")
236

    
237
    vm.buildpercentage = percentage
238
    vm.backendtime = etime
239
    vm.save()
240

    
241

    
242
def start_action(vm, action):
243
    """Update the state of a VM when a new action is initiated."""
244
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
245
        raise VirtualMachine.InvalidActionError(action)
246

    
247
    # No actions to deleted and no actions beside destroy to suspended VMs
248
    if vm.deleted:
249
        raise VirtualMachine.DeletedError
250

    
251
    # No actions to machines being built. They may be destroyed, however.
252
    if vm.operstate == 'BUILD' and action != 'DESTROY':
253
        raise VirtualMachine.BuildingError
254

    
255
    vm.action = action
256
    vm.backendjobid = None
257
    vm.backendopcode = None
258
    vm.backendjobstatus = None
259
    vm.backendlogmsg = None
260

    
261
    # Update the relevant flags if the VM is being suspended or destroyed.
262
    # Do not set the deleted flag here, see ticket #721.
263
    #
264
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
265
    # completes successfully. Hence, a server may be visible for some time
266
    # after a DELETE /servers/id returns HTTP 204.
267
    #
268
    if action == "DESTROY":
269
        # vm.deleted = True
270
        pass
271
    elif action == "SUSPEND":
272
        vm.suspended = True
273
    elif action == "START":
274
        vm.suspended = False
275
    vm.save()
276

    
277

    
278
@transaction.commit_on_success
279
def create_instance(vm, flavor, image, password, personality):
280
    """`image` is a dictionary which should contain the keys:
281
            'backend_id', 'format' and 'metadata'
282

283
        metadata value should be a dictionary.
284
    """
285

    
286
    if settings.PUBLIC_ROUTED_USE_POOL:
287
        # Get the Network object in exclusive mode in order to
288
        # safely (isolated) reserve an IP address
289
        network = Network.objects.select_for_update().get(id=1)
290
        pool = ippool.IPPool(network)
291
        try:
292
            address = pool.get_free_address()
293
        except ippool.IPPool.IPPoolExhausted:
294
            raise OverLimit("Can not allocate IP for new machine."
295
                            " Public network is full.")
296
        pool.save()
297
        nic = {'ip': address, 'network': settings.GANETI_PUBLIC_NETWORK}
298
    else:
299
        nic = {'ip': 'pool', 'network': settings.GANETI_PUBLIC_NETWORK}
300

    
301
    if settings.IGNORE_FLAVOR_DISK_SIZES:
302
        if image['backend_id'].find("windows") >= 0:
303
            sz = 14000
304
        else:
305
            sz = 4000
306
    else:
307
        sz = flavor.disk * 1024
308

    
309
    # Handle arguments to CreateInstance() as a dictionary,
310
    # initialize it based on a deployment-specific value.
311
    # This enables the administrator to override deployment-specific
312
    # arguments, such as the disk template to use, name of os provider
313
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
314
    #
315
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
316
    kw['mode'] = 'create'
317
    kw['name'] = vm.backend_vm_id
318
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
319
    kw['disk_template'] = flavor.disk_template
320
    kw['disks'] = [{"size": sz}]
321
    kw['nics'] = [nic]
322
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
323
    # kw['os'] = settings.GANETI_OS_PROVIDER
324
    kw['ip_check'] = False
325
    kw['name_check'] = False
326
    # Do not specific a node explicitly, have
327
    # Ganeti use an iallocator instead
328
    #
329
    # kw['pnode']=rapi.GetNodes()[0]
330
    kw['dry_run'] = settings.TEST
331

    
332
    kw['beparams'] = {
333
        'auto_balance': True,
334
        'vcpus': flavor.cpu,
335
        'memory': flavor.ram}
336

    
337
    kw['osparams'] = {
338
        'img_id': image['backend_id'],
339
        'img_passwd': password,
340
        'img_format': image['format']}
341
    if personality:
342
        kw['osparams']['img_personality'] = json.dumps(personality)
343

    
344
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
345

    
346
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
347
    # kw['hvparams'] = dict(serial_console=False)
348

    
349
    return vm.client.CreateInstance(**kw)
350

    
351

    
352
def delete_instance(vm):
353
    start_action(vm, 'DESTROY')
354
    vm.client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
355

    
356

    
357
def reboot_instance(vm, reboot_type):
358
    assert reboot_type in ('soft', 'hard')
359
    vm.client.RebootInstance(vm.backend_vm_id, reboot_type, dry_run=settings.TEST)
360
    log.info('Rebooting instance %s', vm.backend_vm_id)
361

    
362

    
363
def startup_instance(vm):
364
    start_action(vm, 'START')
365
    vm.client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
366

    
367

    
368
def shutdown_instance(vm):
369
    start_action(vm, 'STOP')
370
    vm.client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
371

    
372

    
373
def get_instance_console(vm):
374
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
375
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
376
    # useless (see #783).
377
    #
378
    # Until this is fixed on the Ganeti side, construct a console info reply
379
    # directly.
380
    #
381
    # WARNING: This assumes that VNC runs on port network_port on
382
    #          the instance's primary node, and is probably
383
    #          hypervisor-specific.
384
    #
385
    console = {}
386
    console['kind'] = 'vnc'
387
    i = vm.client.GetInstance(vm.backend_vm_id)
388
    if i['hvparams']['serial_console']:
389
        raise Exception("hv parameter serial_console cannot be true")
390
    console['host'] = i['pnode']
391
    console['port'] = i['network_port']
392

    
393
    return console
394
    # return rapi.GetInstanceConsole(vm.backend_vm_id)
395

    
396

    
397
def request_status_update(vm):
398
    return vm.client.GetInstanceInfo(vm.backend_vm_id)
399

    
400

    
401
def update_status(vm, status):
402
    utils.update_state(vm, status)
403

    
404

    
405
def create_network(network, backends=None):
406
    """ Add and connect a network to backends.
407

408
    @param network: Network object
409
    @param backends: List of Backend objects. None defaults to all.
410

411
    """
412
    backend_jobs = _create_network(network, backends)
413
    connect_network(network, backend_jobs)
414
    return network
415

    
416

    
417
def _create_network(network, backends=None):
418
    """Add a network to backends.
419
    @param network: Network object
420
    @param backends: List of Backend objects. None defaults to all.
421

422
    """
423

    
424
    network_type = network.public and 'public' or 'private'
425

    
426
    if not backends:
427
        backends = Backend.objects.exclude(offline=True)
428

    
429
    tags = network.backend_tag
430
    if network.dhcp:
431
        tags.append('nfdhcpd')
432
    tags = ','.join(tags)
433

    
434
    backend_jobs = []
435
    for backend in backends:
436
        job = backend.client.CreateNetwork(
437
                network_name=network.backend_id,
438
                network=network.subnet,
439
                gateway=network.gateway,
440
                network_type=network_type,
441
                mac_prefix=network.mac_prefix,
442
                tags=tags)
443
        backend_jobs.append((backend, job))
444

    
445
    return backend_jobs
446

    
447

    
448
def connect_network(network, backend_jobs=None):
449
    """Connect a network to all nodegroups.
450

451
    @param network: Network object
452
    @param backend_jobs: List of tuples of the form (Backend, jobs) which are
453
                         the backends to connect the network and the jobs on
454
                         which the connect job depends.
455

456
    """
457

    
458
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
459
        mode = 'routed'
460
    else:
461
        mode = 'bridged'
462

    
463
    if not backend_jobs:
464
        backend_jobs = [(backend, []) for backend in
465
                        Backend.objects.exclude(offline=True)]
466

    
467
    for backend, job in backend_jobs:
468
        client = backend.client
469
        for group in client.GetGroups():
470
            client.ConnectNetwork(network.backend_id, group, mode,
471
                                  network.link, [job])
472

    
473

    
474
def connect_network_group(backend, network, group):
475
    """Connect a network to a specific nodegroup of a backend.
476

477
    """
478
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
479
        mode = 'routed'
480
    else:
481
        mode = 'bridged'
482

    
483
    return backend.client.ConnectNetwork(network.backend_id, group, mode,
484
                                         network.link)
485

    
486

    
487
def delete_network(network, backends=None):
488
    """ Disconnect and a remove a network from backends.
489

490
    @param network: Network object
491
    @param backends: List of Backend objects. None defaults to all.
492

493
    """
494
    backend_jobs = disconnect_network(network, backends)
495
    _delete_network(network, backend_jobs)
496

    
497

    
498
def disconnect_network(network, backends=None):
499
    """Disconnect a network from all nodegroups.
500

501
    @param network: Network object
502
    @param backends: List of Backend objects. None defaults to all.
503

504
    """
505

    
506
    if not backends:
507
        backends = Backend.objects.exclude(offline=True)
508

    
509
    backend_jobs = []
510
    for backend in backends:
511
        client = backend.client
512
        jobs = []
513
        for group in client.GetGroups():
514
            job = client.DisconnectNetwork(network.backend_id, group)
515
            jobs.append(job)
516
        backend_jobs.append((backend, jobs))
517

    
518
    return backend_jobs
519

    
520

    
521
def disconnect_from_network(vm, nic):
522
    """Disconnect a virtual machine from a network by removing it's nic.
523

524
    @param vm: VirtualMachine object
525
    @param network: Network object
526

527
    """
528

    
529
    op = [('remove', nic.index, {})]
530
    return vm.client.ModifyInstance(vm.backend_vm_id, nics=op,
531
                                   hotplug=True, dry_run=settings.TEST)
532

    
533

    
534
def _delete_network(network, backend_jobs=None):
535
    if not backend_jobs:
536
        backend_jobs = [(backend, []) for backend in
537
                Backend.objects.exclude(offline=True)]
538
    for backend, jobs in backend_jobs:
539
        backend.client.DeleteNetwork(network.backend_id, jobs)
540

    
541

    
542
def connect_to_network(vm, network, address):
543
    """Connect a virtual machine to a network.
544

545
    @param vm: VirtualMachine object
546
    @param network: Network object
547

548
    """
549

    
550
    # ip = network.dhcp and 'pool' or None
551

    
552
    nic = {'ip': address, 'network': network.backend_id}
553
    vm.client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
554
                             hotplug=True, dry_run=settings.TEST)
555

    
556

    
557
def set_firewall_profile(vm, profile):
558
    try:
559
        tag = _firewall_tags[profile]
560
    except KeyError:
561
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
562

    
563
    client = vm.client
564
    # Delete all firewall tags
565
    for t in _firewall_tags.values():
566
        client.DeleteInstanceTags(vm.backend_vm_id, [t], dry_run=settings.TEST)
567

    
568
    client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
569

    
570
    # XXX NOP ModifyInstance call to force process_net_status to run
571
    # on the dispatcher
572
    vm.client.ModifyInstance(vm.backend_vm_id,
573
                        os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
574

    
575

    
576
def get_ganeti_instances(backend=None, bulk=False):
577
    Instances = [c.client.GetInstances(bulk=bulk)\
578
                 for c in get_backends(backend)]
579
    return reduce(list.__add__, Instances, [])
580

    
581

    
582
def get_ganeti_nodes(backend=None, bulk=False):
583
    Nodes = [c.client.GetNodes(bulk=bulk) for c in get_backends(backend)]
584
    return reduce(list.__add__, Nodes, [])
585

    
586

    
587
def get_ganeti_jobs(backend=None, bulk=False):
588
    Jobs = [c.client.GetJobs(bulk=bulk) for c in get_backends(backend)]
589
    return reduce(list.__add__, Jobs, [])
590

    
591
##
592
##
593
##
594

    
595

    
596
def get_backends(backend=None):
597
    if backend:
598
        return [backend]
599
    return Backend.objects.filter(offline=False)
600

    
601

    
602
def get_physical_resources(backend):
603
    """ Get the physical resources of a backend.
604

605
    Get the resources of a backend as reported by the backend (not the db).
606

607
    """
608
    nodes = get_ganeti_nodes(backend, bulk=True)
609
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
610
    res = {}
611
    for a in attr:
612
        res[a] = 0
613
    for n in nodes:
614
        # Filter out drained, offline and not vm_capable nodes since they will
615
        # not take part in the vm allocation process
616
        if n['vm_capable'] and not n['drained'] and not n['offline']\
617
           and n['cnodes']:
618
            for a in attr:
619
                res[a] += int(n[a])
620
    return res
621

    
622

    
623
def update_resources(backend, resources=None):
624
    """ Update the state of the backend resources in db.
625

626
    """
627

    
628
    if not resources:
629
        resources = get_physical_resources(backend)
630

    
631
    backend.mfree = resources['mfree']
632
    backend.mtotal = resources['mtotal']
633
    backend.dfree = resources['dfree']
634
    backend.dtotal = resources['dtotal']
635
    backend.pinst_cnt = resources['pinst_cnt']
636
    backend.ctotal = resources['ctotal']
637
    backend.updated = datetime.now()
638
    backend.save()
639

    
640

    
641
def get_memory_from_instances(backend):
642
    """ Get the memory that is used from instances.
643

644
    Get the used memory of a backend. Note: This is different for
645
    the real memory used, due to kvm's memory de-duplication.
646

647
    """
648
    instances = backend.client.GetInstances(bulk=True)
649
    mem = 0
650
    for i in instances:
651
        mem += i['oper_ram']
652
    return mem
653

    
654
##
655
## Synchronized operations for reconciliation
656
##
657

    
658

    
659
def create_network_synced(network, backend):
660
    result = _create_network_synced(network, backend)
661
    if result[0] != 'success':
662
        return result
663
    result = connect_network_synced(network, backend)
664
    return result
665

    
666

    
667
def _create_network_synced(network, backend):
668
    client = backend.client
669
    job = client.CreateNetwork(network.backend_id, network.subnet)
670
    return wait_for_job(client, job)
671

    
672

    
673
def connect_network_synced(network, backend):
674
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
675
        mode = 'routed'
676
    else:
677
        mode = 'bridged'
678
    client = backend.client
679

    
680
    for group in client.GetGroups():
681
        job = client.ConnectNetwork(network.backend_id, group, mode,
682
                                    network.link)
683
        result = wait_for_job(client, job)
684
        if result[0] != 'success':
685
            return result
686

    
687
    return result
688

    
689

    
690
def wait_for_job(client, jobid):
691
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
692
    status = result['job_info'][0]
693
    while status not in ['success', 'error', 'cancel']:
694
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
695
                                        [result], None)
696
        status = result['job_info'][0]
697

    
698
    if status == 'success':
699
        return (status, None)
700
    else:
701
        error = result['job_info'][1]
702
        return (status, error)