Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ bd392934

History | View | Annotate | Download (22.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from logging import getLogger
37
from django.conf import settings
38
from django.db import transaction
39
from datetime import datetime
40

    
41
from synnefo.db.models import (Backend, VirtualMachine, Network,
42
                               BackendNetwork, BACKEND_STATUSES)
43
from synnefo.logic import utils, ippool
44
from synnefo.api.faults import OverLimit
45
from synnefo.util.rapi import GanetiRapiClient
46

    
47
log = getLogger('synnefo.logic')
48

    
49

    
50
_firewall_tags = {
51
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
52
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
53
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
54

    
55
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
56

    
57

    
58
def create_client(hostname, port=5080, username=None, password=None):
59
    return GanetiRapiClient(hostname, port, username, password)
60

    
61

    
62
@transaction.commit_on_success
63
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
64
    """Process a job progress notification from the backend
65

66
    Process an incoming message from the backend (currently Ganeti).
67
    Job notifications with a terminating status (sucess, error, or canceled),
68
    also update the operating state of the VM.
69

70
    """
71
    # See #1492, #1031, #1111 why this line has been removed
72
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
73
    if status not in [x[0] for x in BACKEND_STATUSES]:
74
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
75

    
76
    vm.backendjobid = jobid
77
    vm.backendjobstatus = status
78
    vm.backendopcode = opcode
79
    vm.backendlogmsg = logmsg
80

    
81
    # Notifications of success change the operating state
82
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
83
    if status == 'success' and state_for_success is not None:
84
        utils.update_state(vm, state_for_success)
85
        # Set the deleted flag explicitly, cater for admin-initiated removals
86
        if opcode == 'OP_INSTANCE_REMOVE':
87
            release_instance_nics(vm)
88
            vm.deleted = True
89
            vm.nics.all().delete()
90

    
91
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
92
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
93
        utils.update_state(vm, 'ERROR')
94

    
95
    # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
96
    # when no instance exists at the Ganeti backend.
97
    # See ticket #799 for all the details.
98
    #
99
    if (status == 'error' and opcode == 'OP_INSTANCE_REMOVE'):
100
        vm.deleted = True
101
        vm.nics.all().delete()
102

    
103
    vm.backendtime = etime
104
    # Any other notification of failure leaves the operating state unchanged
105

    
106
    vm.save()
107

    
108

    
109
@transaction.commit_on_success
110
def process_net_status(vm, etime, nics):
111
    """Process a net status notification from the backend
112

113
    Process an incoming message from the Ganeti backend,
114
    detailing the NIC configuration of a VM instance.
115

116
    Update the state of the VM in the DB accordingly.
117
    """
118

    
119
    # Release the ips of the old nics. Get back the networks as multiple
120
    # changes in the same network, must happen in the same Network object,
121
    # because transaction will be commited only on exit of the function.
122
    networks = release_instance_nics(vm)
123

    
124
    new_nics = enumerate(nics)
125
    for i, new_nic in new_nics:
126
        network = new_nic.get('network', '')
127
        n = str(network)
128
        pk = utils.id_from_network_name(n)
129

    
130
        # Get the cached Network or get it from DB
131
        if pk in networks:
132
            net = networks[pk]
133
        else:
134
            net = Network.objects.select_for_update().get(pk=pk)
135

    
136
        # Get the new nic info
137
        mac = new_nic.get('mac', '')
138
        ipv4 = new_nic.get('ip', '')
139
        ipv6 = new_nic.get('ipv6', '')
140

    
141
        firewall = new_nic.get('firewall', '')
142
        firewall_profile = _reverse_tags.get(firewall, '')
143
        if not firewall_profile and net.public:
144
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
145

    
146
        if ipv4:
147
            net.reserve_address(ipv4)
148

    
149
        vm.nics.create(
150
            network=net,
151
            index=i,
152
            mac=mac,
153
            ipv4=ipv4,
154
            ipv6=ipv6,
155
            firewall_profile=firewall_profile,
156
            dirty=False)
157

    
158
    vm.backendtime = etime
159
    vm.save()
160

    
161

    
162
def release_instance_nics(vm):
163
    networks = {}
164

    
165
    for nic in vm.nics.all():
166
        pk = nic.network.pk
167
        # Get the cached Network or get it from DB
168
        if pk in networks:
169
            net = networks[pk]
170
        else:
171
            # Get the network object in exclusive mode in order
172
            # to guarantee consistency of the address pool
173
            net = Network.objects.select_for_update().get(pk=pk)
174
        net.release_address(nic.ipv4)
175
        nic.delete()
176

    
177
    return networks
178

    
179

    
180
@transaction.commit_on_success
181
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
182
    if status not in [x[0] for x in BACKEND_STATUSES]:
183
        return
184
        #raise Network.InvalidBackendMsgError(opcode, status)
185

    
186
    back_network.backendjobid = jobid
187
    back_network.backendjobstatus = status
188
    back_network.backendopcode = opcode
189
    back_network.backendlogmsg = logmsg
190

    
191
    # Notifications of success change the operating state
192
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
193
    if status == 'success' and state_for_success is not None:
194
        back_network.operstate = state_for_success
195
        if opcode == 'OP_NETWORK_REMOVE':
196
            back_network.deleted = True
197

    
198
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
199
        utils.update_state(back_network, 'ERROR')
200

    
201
    if (status == 'error' and opcode == 'OP_NETWORK_REMOVE'):
202
        back_network.deleted = True
203
        back_network.operstate = 'DELETED'
204

    
205
    back_network.save()
206

    
207

    
208
@transaction.commit_on_success
209
def process_create_progress(vm, etime, rprogress, wprogress):
210

    
211
    # XXX: This only uses the read progress for now.
212
    #      Explore whether it would make sense to use the value of wprogress
213
    #      somewhere.
214
    percentage = int(rprogress)
215

    
216
    # The percentage may exceed 100%, due to the way
217
    # snf-progress-monitor tracks bytes read by image handling processes
218
    percentage = 100 if percentage > 100 else percentage
219
    if percentage < 0:
220
        raise ValueError("Percentage cannot be negative")
221

    
222
    # FIXME: log a warning here, see #1033
223
#   if last_update > percentage:
224
#       raise ValueError("Build percentage should increase monotonically " \
225
#                        "(old = %d, new = %d)" % (last_update, percentage))
226

    
227
    # This assumes that no message of type 'ganeti-create-progress' is going to
228
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
229
    # the instance is STARTED.  What if the two messages are processed by two
230
    # separate dispatcher threads, and the 'ganeti-op-status' message for
231
    # successful creation gets processed before the 'ganeti-create-progress'
232
    # message? [vkoukis]
233
    #
234
    #if not vm.operstate == 'BUILD':
235
    #    raise VirtualMachine.IllegalState("VM is not in building state")
236

    
237
    vm.buildpercentage = percentage
238
    vm.backendtime = etime
239
    vm.save()
240

    
241

    
242
def start_action(vm, action):
243
    """Update the state of a VM when a new action is initiated."""
244
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
245
        raise VirtualMachine.InvalidActionError(action)
246

    
247
    # No actions to deleted and no actions beside destroy to suspended VMs
248
    if vm.deleted:
249
        raise VirtualMachine.DeletedError
250

    
251
    # No actions to machines being built. They may be destroyed, however.
252
    if vm.operstate == 'BUILD' and action != 'DESTROY':
253
        raise VirtualMachine.BuildingError
254

    
255
    vm.action = action
256
    vm.backendjobid = None
257
    vm.backendopcode = None
258
    vm.backendjobstatus = None
259
    vm.backendlogmsg = None
260

    
261
    # Update the relevant flags if the VM is being suspended or destroyed.
262
    # Do not set the deleted flag here, see ticket #721.
263
    #
264
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
265
    # completes successfully. Hence, a server may be visible for some time
266
    # after a DELETE /servers/id returns HTTP 204.
267
    #
268
    if action == "DESTROY":
269
        # vm.deleted = True
270
        pass
271
    elif action == "SUSPEND":
272
        vm.suspended = True
273
    elif action == "START":
274
        vm.suspended = False
275
    vm.save()
276

    
277

    
278
def create_instance(vm, flavor, image, password, personality):
279
    """`image` is a dictionary which should contain the keys:
280
            'backend_id', 'format' and 'metadata'
281

282
        metadata value should be a dictionary.
283
    """
284

    
285
    # Get the Network object in exclusive mode in order to
286
    # safely (isolated) reserve an IP address
287
    network = Network.objects.select_for_update().get(id=1)
288
    pool = ippool.IPPool(network)
289
    try:
290
        address = pool.get_free_address()
291
    except ippool.IPPool.IPPoolExhausted:
292
        raise OverLimit("Can not allocate IP for new machine."
293
                        " Public network is full.")
294
    pool.save()
295

    
296
    nic = {'ip': address, 'network': settings.GANETI_PUBLIC_NETWORK}
297

    
298
    if settings.IGNORE_FLAVOR_DISK_SIZES:
299
        if image['backend_id'].find("windows") >= 0:
300
            sz = 14000
301
        else:
302
            sz = 4000
303
    else:
304
        sz = flavor.disk * 1024
305

    
306
    # Handle arguments to CreateInstance() as a dictionary,
307
    # initialize it based on a deployment-specific value.
308
    # This enables the administrator to override deployment-specific
309
    # arguments, such as the disk template to use, name of os provider
310
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
311
    #
312
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
313
    kw['mode'] = 'create'
314
    kw['name'] = vm.backend_vm_id
315
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
316
    kw['disk_template'] = flavor.disk_template
317
    kw['disks'] = [{"size": sz}]
318
    kw['nics'] = [nic]
319
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
320
    # kw['os'] = settings.GANETI_OS_PROVIDER
321
    kw['ip_check'] = False
322
    kw['name_check'] = False
323
    # Do not specific a node explicitly, have
324
    # Ganeti use an iallocator instead
325
    #
326
    # kw['pnode']=rapi.GetNodes()[0]
327
    kw['dry_run'] = settings.TEST
328

    
329
    kw['beparams'] = {
330
        'auto_balance': True,
331
        'vcpus': flavor.cpu,
332
        'memory': flavor.ram}
333

    
334
    kw['osparams'] = {
335
        'img_id': image['backend_id'],
336
        'img_passwd': password,
337
        'img_format': image['format']}
338
    if personality:
339
        kw['osparams']['img_personality'] = json.dumps(personality)
340

    
341
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
342

    
343
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
344
    # kw['hvparams'] = dict(serial_console=False)
345

    
346
    return vm.client.CreateInstance(**kw)
347

    
348

    
349
def delete_instance(vm):
350
    start_action(vm, 'DESTROY')
351
    vm.client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
352

    
353

    
354
def reboot_instance(vm, reboot_type):
355
    assert reboot_type in ('soft', 'hard')
356
    vm.client.RebootInstance(vm.backend_vm_id, reboot_type, dry_run=settings.TEST)
357
    log.info('Rebooting instance %s', vm.backend_vm_id)
358

    
359

    
360
def startup_instance(vm):
361
    start_action(vm, 'START')
362
    vm.client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
363

    
364

    
365
def shutdown_instance(vm):
366
    start_action(vm, 'STOP')
367
    vm.client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
368

    
369

    
370
def get_instance_console(vm):
371
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
372
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
373
    # useless (see #783).
374
    #
375
    # Until this is fixed on the Ganeti side, construct a console info reply
376
    # directly.
377
    #
378
    # WARNING: This assumes that VNC runs on port network_port on
379
    #          the instance's primary node, and is probably
380
    #          hypervisor-specific.
381
    #
382
    console = {}
383
    console['kind'] = 'vnc'
384
    i = vm.client.GetInstance(vm.backend_vm_id)
385
    if i['hvparams']['serial_console']:
386
        raise Exception("hv parameter serial_console cannot be true")
387
    console['host'] = i['pnode']
388
    console['port'] = i['network_port']
389

    
390
    return console
391
    # return rapi.GetInstanceConsole(vm.backend_vm_id)
392

    
393

    
394
def request_status_update(vm):
395
    return vm.client.GetInstanceInfo(vm.backend_vm_id)
396

    
397

    
398
def update_status(vm, status):
399
    utils.update_state(vm, status)
400

    
401

    
402
def create_network(network, backends=None):
403
    """ Add and connect a network to backends.
404

405
    @param network: Network object
406
    @param backends: List of Backend objects. None defaults to all.
407

408
    """
409
    backend_jobs = _create_network(network, backends)
410
    connect_network(network, backend_jobs)
411
    return network
412

    
413

    
414
def _create_network(network, backends=None):
415
    """Add a network to backends.
416
    @param network: Network object
417
    @param backends: List of Backend objects. None defaults to all.
418

419
    """
420

    
421
    network_type = network.public and 'public' or 'private'
422

    
423
    if not backends:
424
        backends = Backend.objects.exclude(offline=True)
425

    
426
    tags = network.backend_tag
427
    if network.dhcp:
428
        tags.append('nfdhcpd')
429
    tags = ','.join(tags)
430

    
431
    backend_jobs = []
432
    for backend in backends:
433
        job = backend.client.CreateNetwork(
434
                network_name=network.backend_id,
435
                network=network.subnet,
436
                gateway=network.gateway,
437
                network_type=network_type,
438
                mac_prefix=network.mac_prefix,
439
                tags=tags)
440
        backend_jobs.append((backend, job))
441

    
442
    return backend_jobs
443

    
444

    
445
def connect_network(network, backend_jobs=None):
446
    """Connect a network to all nodegroups.
447

448
    @param network: Network object
449
    @param backend_jobs: List of tuples of the form (Backend, jobs) which are
450
                         the backends to connect the network and the jobs on
451
                         which the connect job depends.
452

453
    """
454

    
455
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
456
        mode = 'routed'
457
    else:
458
        mode = 'bridged'
459

    
460
    if not backend_jobs:
461
        backend_jobs = [(backend, []) for backend in
462
                        Backend.objects.exclude(offline=True)]
463

    
464
    for backend, job in backend_jobs:
465
        client = backend.client
466
        for group in client.GetGroups():
467
            client.ConnectNetwork(network.backend_id, group, mode,
468
                                  network.link, [job])
469

    
470

    
471
def connect_network_group(backend, network, group):
472
    """Connect a network to a specific nodegroup of a backend.
473

474
    """
475
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
476
        mode = 'routed'
477
    else:
478
        mode = 'bridged'
479

    
480
    return backend.client.ConnectNetwork(network.backend_id, group, mode,
481
                                         network.link)
482

    
483

    
484
def delete_network(network, backends=None):
485
    """ Disconnect and a remove a network from backends.
486

487
    @param network: Network object
488
    @param backends: List of Backend objects. None defaults to all.
489

490
    """
491
    backend_jobs = disconnect_network(network, backends)
492
    _delete_network(network, backend_jobs)
493

    
494

    
495
def disconnect_network(network, backends=None):
496
    """Disconnect a network from all nodegroups.
497

498
    @param network: Network object
499
    @param backends: List of Backend objects. None defaults to all.
500

501
    """
502

    
503
    if not backends:
504
        backends = Backend.objects.exclude(offline=True)
505

    
506
    backend_jobs = []
507
    for backend in backends:
508
        client = backend.client
509
        jobs = []
510
        for group in client.GetGroups():
511
            job = client.DisconnectNetwork(network.backend_id, group)
512
            jobs.append(job)
513
        backend_jobs.append((backend, jobs))
514

    
515
    return backend_jobs
516

    
517

    
518
def disconnect_from_network(vm, nic):
519
    """Disconnect a virtual machine from a network by removing it's nic.
520

521
    @param vm: VirtualMachine object
522
    @param network: Network object
523

524
    """
525

    
526
    op = [('remove', nic.index, {})]
527
    return vm.client.ModifyInstance(vm.backend_vm_id, nics=op,
528
                                   hotplug=True, dry_run=settings.TEST)
529

    
530

    
531
def _delete_network(network, backend_jobs=None):
532
    if not backend_jobs:
533
        backend_jobs = [(backend, []) for backend in
534
                Backend.objects.exclude(offline=True)]
535
    for backend, jobs in backend_jobs:
536
        backend.client.DeleteNetwork(network.backend_id, jobs)
537

    
538

    
539
def connect_to_network(vm, network, address):
540
    """Connect a virtual machine to a network.
541

542
    @param vm: VirtualMachine object
543
    @param network: Network object
544

545
    """
546

    
547
    # ip = network.dhcp and 'pool' or None
548

    
549
    nic = {'ip': address, 'network': network.backend_id}
550
    vm.client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
551
                             hotplug=True, dry_run=settings.TEST)
552

    
553

    
554
def set_firewall_profile(vm, profile):
555
    try:
556
        tag = _firewall_tags[profile]
557
    except KeyError:
558
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
559

    
560
    client = vm.client
561
    # Delete all firewall tags
562
    for t in _firewall_tags.values():
563
        client.DeleteInstanceTags(vm.backend_vm_id, [t], dry_run=settings.TEST)
564

    
565
    client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
566

    
567
    # XXX NOP ModifyInstance call to force process_net_status to run
568
    # on the dispatcher
569
    vm.client.ModifyInstance(vm.backend_vm_id,
570
                        os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
571

    
572

    
573
def get_ganeti_instances(backend=None, bulk=False):
574
    Instances = [c.client.GetInstances(bulk=bulk)\
575
                 for c in get_backends(backend)]
576
    return reduce(list.__add__, Instances, [])
577

    
578

    
579
def get_ganeti_nodes(backend=None, bulk=False):
580
    Nodes = [c.client.GetNodes(bulk=bulk) for c in get_backends(backend)]
581
    return reduce(list.__add__, Nodes, [])
582

    
583

    
584
def get_ganeti_jobs(backend=None, bulk=False):
585
    Jobs = [c.client.GetJobs(bulk=bulk) for c in get_backends(backend)]
586
    return reduce(list.__add__, Jobs, [])
587

    
588
##
589
##
590
##
591

    
592

    
593
def get_backends(backend=None):
594
    if backend:
595
        return [backend]
596
    return Backend.objects.filter(offline=False)
597

    
598

    
599
def get_physical_resources(backend):
600
    """ Get the physical resources of a backend.
601

602
    Get the resources of a backend as reported by the backend (not the db).
603

604
    """
605
    nodes = get_ganeti_nodes(backend, bulk=True)
606
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
607
    res = {}
608
    for a in attr:
609
        res[a] = 0
610
    for n in nodes:
611
        # Filter out drained, offline and not vm_capable nodes since they will
612
        # not take part in the vm allocation process
613
        if n['vm_capable'] and not n['drained'] and not n['offline']\
614
           and n['cnodes']:
615
            for a in attr:
616
                res[a] += int(n[a])
617
    return res
618

    
619

    
620
def update_resources(backend, resources=None):
621
    """ Update the state of the backend resources in db.
622

623
    """
624

    
625
    if not resources:
626
        resources = get_physical_resources(backend)
627

    
628
    backend.mfree = resources['mfree']
629
    backend.mtotal = resources['mtotal']
630
    backend.dfree = resources['dfree']
631
    backend.dtotal = resources['dtotal']
632
    backend.pinst_cnt = resources['pinst_cnt']
633
    backend.ctotal = resources['ctotal']
634
    backend.updated = datetime.now()
635
    backend.save()
636

    
637

    
638
def get_memory_from_instances(backend):
639
    """ Get the memory that is used from instances.
640

641
    Get the used memory of a backend. Note: This is different for
642
    the real memory used, due to kvm's memory de-duplication.
643

644
    """
645
    instances = backend.client.GetInstances(bulk=True)
646
    mem = 0
647
    for i in instances:
648
        mem += i['oper_ram']
649
    return mem
650

    
651
##
652
## Synchronized operations for reconciliation
653
##
654

    
655

    
656
def create_network_synced(network, backend):
657
    result = _create_network_synced(network, backend)
658
    if result[0] != 'success':
659
        return result
660
    result = connect_network_synced(network, backend)
661
    return result
662

    
663

    
664
def _create_network_synced(network, backend):
665
    client = backend.client
666
    job = client.CreateNetwork(network.backend_id, network.subnet)
667
    return wait_for_job(client, job)
668

    
669

    
670
def connect_network_synced(network, backend):
671
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
672
        mode = 'routed'
673
    else:
674
        mode = 'bridged'
675
    client = backend.client
676

    
677
    for group in client.GetGroups():
678
        job = client.ConnectNetwork(network.backend_id, group, mode,
679
                                    network.link)
680
        result = wait_for_job(client, job)
681
        if result[0] != 'success':
682
            return result
683

    
684
    return result
685

    
686

    
687
def wait_for_job(client, jobid):
688
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
689
    status = result['job_info'][0]
690
    while status not in ['success', 'error', 'cancel']:
691
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
692
                                        [result], None)
693
        status = result['job_info'][0]
694

    
695
    if status == 'success':
696
        return (status, None)
697
    else:
698
        error = result['job_info'][1]
699
        return (status, error)