Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ f45a7ac4

History | View | Annotate | Download (25.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, BridgePoolTable,
43
                               MacPrefixPoolTable, VirtualMachineDiagnostic)
44
from synnefo.logic import utils
45
from synnefo import quotas
46
from synnefo.api.util import release_resource
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
@quotas.uses_commission
61
@transaction.commit_on_success
62
def process_op_status(serials, vm, etime, jobid, opcode, status, logmsg):
63
    """Process a job progress notification from the backend
64

65
    Process an incoming message from the backend (currently Ganeti).
66
    Job notifications with a terminating status (sucess, error, or canceled),
67
    also update the operating state of the VM.
68

69
    """
70
    # See #1492, #1031, #1111 why this line has been removed
71
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
72
    if status not in [x[0] for x in BACKEND_STATUSES]:
73
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
74

    
75
    vm.backendjobid = jobid
76
    vm.backendjobstatus = status
77
    vm.backendopcode = opcode
78
    vm.backendlogmsg = logmsg
79

    
80
    # Notifications of success change the operating state
81
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
82
    if status == 'success' and state_for_success is not None:
83
        vm.operstate = state_for_success
84

    
85
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
86
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
87
        vm.operstate = 'ERROR'
88
        vm.backendtime = etime
89
    elif opcode == 'OP_INSTANCE_REMOVE':
90
        # Set the deleted flag explicitly, cater for admin-initiated removals
91
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
        # when no instance exists at the Ganeti backend.
93
        # See ticket #799 for all the details.
94
        #
95
        if status == 'success' or (status == 'error' and
96
                                   vm.operstate == 'ERROR'):
97
            # Issue commission
98
            serial = quotas.issue_vm_commission(vm.userid, vm.flavor,
99
                                                delete=True)
100
            serials.append(serial)
101
            vm.serial = serial
102
            serial.accepted = True
103
            serial.save()
104
            release_instance_nics(vm)
105
            vm.nics.all().delete()
106
            vm.deleted = True
107
            vm.operstate = state_for_success
108
            vm.backendtime = etime
109

    
110
    # Update backendtime only for jobs that have been successfully completed,
111
    # since only these jobs update the state of the VM. Else a "race condition"
112
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
113
    # before an error job and messages arrive in reversed order.
114
    if status == 'success':
115
        vm.backendtime = etime
116

    
117
    vm.save()
118

    
119

    
120
@transaction.commit_on_success
121
def process_net_status(vm, etime, nics):
122
    """Process a net status notification from the backend
123

124
    Process an incoming message from the Ganeti backend,
125
    detailing the NIC configuration of a VM instance.
126

127
    Update the state of the VM in the DB accordingly.
128
    """
129

    
130
    ganeti_nics = process_ganeti_nics(nics)
131
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
132
        log.debug("NICs for VM %s have not changed", vm)
133

    
134
    release_instance_nics(vm)
135

    
136
    for nic in ganeti_nics:
137
        ipv4 = nic.get('ipv4', '')
138
        net = nic['network']
139
        if ipv4:
140
            net.reserve_address(ipv4)
141

    
142
        nic['dirty'] = False
143
        vm.nics.create(**nic)
144
        # Dummy save the network, because UI uses changed-since for VMs
145
        # and Networks in order to show the VM NICs
146
        net.save()
147

    
148
    vm.backendtime = etime
149
    vm.save()
150

    
151

    
152
def process_ganeti_nics(ganeti_nics):
153
    """Process NIC dict from ganeti hooks."""
154
    new_nics = []
155
    for i, new_nic in enumerate(ganeti_nics):
156
        network = new_nic.get('network', '')
157
        n = str(network)
158
        pk = utils.id_from_network_name(n)
159

    
160
        net = Network.objects.get(pk=pk)
161

    
162
        # Get the new nic info
163
        mac = new_nic.get('mac', '')
164
        ipv4 = new_nic.get('ip', '')
165
        ipv6 = new_nic.get('ipv6', '')
166

    
167
        firewall = new_nic.get('firewall', '')
168
        firewall_profile = _reverse_tags.get(firewall, '')
169
        if not firewall_profile and net.public:
170
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
171

    
172
        nic = {
173
               'index': i,
174
               'network': net,
175
               'mac': mac,
176
               'ipv4': ipv4,
177
               'ipv6': ipv6,
178
               'firewall_profile': firewall_profile}
179

    
180
        new_nics.append(nic)
181
    return new_nics
182

    
183

    
184
def nics_changed(old_nics, new_nics):
185
    """Return True if NICs have changed in any way."""
186
    if len(old_nics) != len(new_nics):
187
        return True
188
    for old_nic, new_nic in zip(old_nics, new_nics):
189
        if not (old_nic.ipv4 == new_nic['ipv4'] and\
190
                old_nic.ipv6 == new_nic['ipv6'] and\
191
                old_nic.mac == new_nic['mac'] and\
192
                old_nic.firewall_profile == new_nic['firewall_profile'] and\
193
                old_nic.index == new_nic['index'] and\
194
                old_nic.network == new_nic['network']):
195
            return True
196
    return False
197

    
198

    
199
def release_instance_nics(vm):
200
    for nic in vm.nics.all():
201
        net = nic.network
202
        if nic.ipv4:
203
            net.release_address(nic.ipv4)
204
        nic.delete()
205
        net.save()
206

    
207

    
208
@transaction.commit_on_success
209
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
210
    if status not in [x[0] for x in BACKEND_STATUSES]:
211
        raise Network.InvalidBackendMsgError(opcode, status)
212

    
213
    back_network.backendjobid = jobid
214
    back_network.backendjobstatus = status
215
    back_network.backendopcode = opcode
216
    back_network.backendlogmsg = logmsg
217

    
218
    # Notifications of success change the operating state
219
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
220
    if status == 'success' and state_for_success is not None:
221
        back_network.operstate = state_for_success
222

    
223
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
224
        utils.update_state(back_network, 'ERROR')
225
        back_network.backendtime = etime
226

    
227
    if opcode == 'OP_NETWORK_REMOVE':
228
        if status == 'success' or (status == 'error' and
229
                                   back_network.operstate == 'ERROR'):
230
            back_network.operstate = state_for_success
231
            back_network.deleted = True
232
            back_network.backendtime = etime
233

    
234
    if status == 'success':
235
        back_network.backendtime = etime
236
    back_network.save()
237
    # Also you must update the state of the Network!!
238
    update_network_state(back_network.network)
239

    
240

    
241
@quotas.uses_commission
242
def update_network_state(serials, network):
243
    old_state = network.state
244

    
245
    backend_states = [s.operstate for s in network.backend_networks.all()]
246
    if not backend_states:
247
        network.state = 'PENDING'
248
        network.save()
249
        return
250

    
251
    all_equal = len(set(backend_states)) <= 1
252
    network.state = all_equal and backend_states[0] or 'PENDING'
253

    
254
    # Release the resources on the deletion of the Network
255
    if old_state != 'DELETED' and network.state == 'DELETED':
256
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
257
                 network.id, network.mac_prefix, network.link)
258
        network.deleted = True
259
        if network.mac_prefix:
260
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
261
                release_resource(res_type="mac_prefix",
262
                                 value=network.mac_prefix)
263
        if network.link:
264
            if network.FLAVORS[network.flavor]["link"] == "pool":
265
                release_resource(res_type="bridge", value=network.link)
266

    
267
        # Issue commission
268
        serial = quotas.issue_network_commission(network.userid, delete=True)
269
        serials.append(serial)
270
        network.serial = serial
271
        serial.accepted = True
272
        serial.save()
273

    
274
    network.save()
275

    
276

    
277
@transaction.commit_on_success
278
def process_network_modify(back_network, etime, jobid, opcode, status,
279
                           add_reserved_ips, remove_reserved_ips):
280
    assert (opcode == "OP_NETWORK_SET_PARAMS")
281
    if status not in [x[0] for x in BACKEND_STATUSES]:
282
        raise Network.InvalidBackendMsgError(opcode, status)
283

    
284
    back_network.backendjobid = jobid
285
    back_network.backendjobstatus = status
286
    back_network.opcode = opcode
287

    
288
    if add_reserved_ips or remove_reserved_ips:
289
        net = back_network.network
290
        pool = net.get_pool()
291
        if add_reserved_ips:
292
            for ip in add_reserved_ips:
293
                pool.reserve(ip, external=True)
294
        if remove_reserved_ips:
295
            for ip in remove_reserved_ips:
296
                pool.put(ip, external=True)
297
        pool.save()
298

    
299
    if status == 'success':
300
        back_network.backendtime = etime
301
    back_network.save()
302

    
303

    
304
@transaction.commit_on_success
305
def process_create_progress(vm, etime, progress):
306

    
307
    percentage = int(progress)
308

    
309
    # The percentage may exceed 100%, due to the way
310
    # snf-image:copy-progress tracks bytes read by image handling processes
311
    percentage = 100 if percentage > 100 else percentage
312
    if percentage < 0:
313
        raise ValueError("Percentage cannot be negative")
314

    
315
    # FIXME: log a warning here, see #1033
316
#   if last_update > percentage:
317
#       raise ValueError("Build percentage should increase monotonically " \
318
#                        "(old = %d, new = %d)" % (last_update, percentage))
319

    
320
    # This assumes that no message of type 'ganeti-create-progress' is going to
321
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
322
    # the instance is STARTED.  What if the two messages are processed by two
323
    # separate dispatcher threads, and the 'ganeti-op-status' message for
324
    # successful creation gets processed before the 'ganeti-create-progress'
325
    # message? [vkoukis]
326
    #
327
    #if not vm.operstate == 'BUILD':
328
    #    raise VirtualMachine.IllegalState("VM is not in building state")
329

    
330
    vm.buildpercentage = percentage
331
    vm.backendtime = etime
332
    vm.save()
333

    
334

    
335
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
336
    details=None):
337
    """
338
    Create virtual machine instance diagnostic entry.
339

340
    :param vm: VirtualMachine instance to create diagnostic for.
341
    :param message: Diagnostic message.
342
    :param source: Diagnostic source identifier (e.g. image-helper).
343
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
344
    :param etime: The time the message occured (if available).
345
    :param details: Additional details or debug information.
346
    """
347
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
348
            source_date=etime, message=message, details=details)
349

    
350

    
351
def create_instance(vm, public_nic, flavor, image, password=None):
352
    """`image` is a dictionary which should contain the keys:
353
            'backend_id', 'format' and 'metadata'
354

355
        metadata value should be a dictionary.
356
    """
357

    
358
    # Handle arguments to CreateInstance() as a dictionary,
359
    # initialize it based on a deployment-specific value.
360
    # This enables the administrator to override deployment-specific
361
    # arguments, such as the disk template to use, name of os provider
362
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
363
    #
364
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
365
    kw['mode'] = 'create'
366
    kw['name'] = vm.backend_vm_id
367
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
368

    
369
    kw['disk_template'] = flavor.disk_template
370
    kw['disks'] = [{"size": flavor.disk * 1024}]
371
    provider = flavor.disk_provider
372
    if provider:
373
        kw['disks'][0]['provider'] = provider
374

    
375
        if provider == 'vlmc':
376
            kw['disks'][0]['origin'] = flavor.disk_origin
377

    
378
    kw['nics'] = [public_nic]
379
    if settings.GANETI_USE_HOTPLUG:
380
        kw['hotplug'] = True
381
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
382
    # kw['os'] = settings.GANETI_OS_PROVIDER
383
    kw['ip_check'] = False
384
    kw['name_check'] = False
385

    
386
    # Do not specific a node explicitly, have
387
    # Ganeti use an iallocator instead
388
    #kw['pnode'] = rapi.GetNodes()[0]
389

    
390
    kw['dry_run'] = settings.TEST
391

    
392
    kw['beparams'] = {
393
       'auto_balance': True,
394
       'vcpus': flavor.cpu,
395
       'memory': flavor.ram}
396

    
397
    kw['osparams'] = {
398
        'config_url': vm.config_url,
399
        # Store image id and format to Ganeti
400
        'img_id': image['backend_id'],
401
        'img_format': image['format']}
402

    
403
    if password:
404
        # Only for admin created VMs !!
405
        kw['osparams']['img_passwd'] = password
406

    
407
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
408
    # kw['hvparams'] = dict(serial_console=False)
409

    
410
    log.debug("Creating instance %s", utils.hide_pass(kw))
411
    with pooled_rapi_client(vm) as client:
412
        return client.CreateInstance(**kw)
413

    
414

    
415
def delete_instance(vm):
416
    with pooled_rapi_client(vm) as client:
417
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
418

    
419

    
420
def reboot_instance(vm, reboot_type):
421
    assert reboot_type in ('soft', 'hard')
422
    with pooled_rapi_client(vm) as client:
423
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
424
                                     dry_run=settings.TEST)
425

    
426

    
427
def startup_instance(vm):
428
    with pooled_rapi_client(vm) as client:
429
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
430

    
431

    
432
def shutdown_instance(vm):
433
    with pooled_rapi_client(vm) as client:
434
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
435

    
436

    
437
def get_instance_console(vm):
438
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
439
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
440
    # useless (see #783).
441
    #
442
    # Until this is fixed on the Ganeti side, construct a console info reply
443
    # directly.
444
    #
445
    # WARNING: This assumes that VNC runs on port network_port on
446
    #          the instance's primary node, and is probably
447
    #          hypervisor-specific.
448
    #
449
    log.debug("Getting console for vm %s", vm)
450

    
451
    console = {}
452
    console['kind'] = 'vnc'
453

    
454
    with pooled_rapi_client(vm) as client:
455
        i = client.GetInstance(vm.backend_vm_id)
456

    
457
    if i['hvparams']['serial_console']:
458
        raise Exception("hv parameter serial_console cannot be true")
459
    console['host'] = i['pnode']
460
    console['port'] = i['network_port']
461

    
462
    return console
463

    
464

    
465
def get_instance_info(vm):
466
    with pooled_rapi_client(vm) as client:
467
        return client.GetInstanceInfo(vm.backend_vm_id)
468

    
469

    
470
def create_network(network, backends=None, connect=True):
471
    """Create and connect a network."""
472
    if not backends:
473
        backends = Backend.objects.exclude(offline=True)
474

    
475
    log.debug("Creating network %s in backends %s", network, backends)
476

    
477
    for backend in backends:
478
        create_jobID = _create_network(network, backend)
479
        if connect:
480
            connect_network(network, backend, create_jobID)
481

    
482

    
483
def _create_network(network, backend):
484
    """Create a network."""
485

    
486
    network_type = network.public and 'public' or 'private'
487

    
488
    tags = network.backend_tag
489
    if network.dhcp:
490
        tags.append('nfdhcpd')
491

    
492
    if network.public:
493
        conflicts_check = True
494
    else:
495
        conflicts_check = False
496

    
497
    try:
498
        bn = BackendNetwork.objects.get(network=network, backend=backend)
499
        mac_prefix = bn.mac_prefix
500
    except BackendNetwork.DoesNotExist:
501
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
502
                        " does not exist" % (network.id, backend.id))
503

    
504
    with pooled_rapi_client(backend) as client:
505
        return client.CreateNetwork(network_name=network.backend_id,
506
                                    network=network.subnet,
507
                                    network6=network.subnet6,
508
                                    gateway=network.gateway,
509
                                    gateway6=network.gateway6,
510
                                    network_type=network_type,
511
                                    mac_prefix=mac_prefix,
512
                                    conflicts_check=conflicts_check,
513
                                    tags=tags)
514

    
515

    
516
def connect_network(network, backend, depend_job=None, group=None):
517
    """Connect a network to nodegroups."""
518
    log.debug("Connecting network %s to backend %s", network, backend)
519

    
520
    if network.public:
521
        conflicts_check = True
522
    else:
523
        conflicts_check = False
524

    
525
    depend_jobs = [depend_job] if depend_job else []
526
    with pooled_rapi_client(backend) as client:
527
        if group:
528
            client.ConnectNetwork(network.backend_id, group, network.mode,
529
                                  network.link, conflicts_check, depend_jobs)
530
        else:
531
            for group in client.GetGroups():
532
                client.ConnectNetwork(network.backend_id, group, network.mode,
533
                                      network.link, conflicts_check,
534
                                      depend_jobs)
535

    
536

    
537
def delete_network(network, backends=None, disconnect=True):
538
    if not backends:
539
        backends = Backend.objects.exclude(offline=True)
540

    
541
    log.debug("Deleting network %s from backends %s", network, backends)
542

    
543
    for backend in backends:
544
        disconnect_jobIDs = []
545
        if disconnect:
546
            disconnect_jobIDs = disconnect_network(network, backend)
547
        _delete_network(network, backend, disconnect_jobIDs)
548

    
549

    
550
def _delete_network(network, backend, depend_jobs=[]):
551
    with pooled_rapi_client(backend) as client:
552
        return client.DeleteNetwork(network.backend_id, depend_jobs)
553

    
554

    
555
def disconnect_network(network, backend, group=None):
556
    log.debug("Disconnecting network %s to backend %s", network, backend)
557

    
558
    with pooled_rapi_client(backend) as client:
559
        if group:
560
            return [client.DisconnectNetwork(network.backend_id, group)]
561
        else:
562
            jobs = []
563
            for group in client.GetGroups():
564
                job = client.DisconnectNetwork(network.backend_id, group)
565
                jobs.append(job)
566
            return jobs
567

    
568

    
569
def connect_to_network(vm, network, address=None):
570
    nic = {'ip': address, 'network': network.backend_id}
571

    
572
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
573

    
574
    with pooled_rapi_client(vm) as client:
575
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
576
                                     hotplug=settings.GANETI_USE_HOTPLUG,
577
                                     dry_run=settings.TEST)
578

    
579

    
580
def disconnect_from_network(vm, nic):
581
    op = [('remove', nic.index, {})]
582

    
583
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
584

    
585
    with pooled_rapi_client(vm) as client:
586
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
587
                                     hotplug=settings.GANETI_USE_HOTPLUG,
588
                                     dry_run=settings.TEST)
589

    
590

    
591
def set_firewall_profile(vm, profile):
592
    try:
593
        tag = _firewall_tags[profile]
594
    except KeyError:
595
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
596

    
597
    log.debug("Setting tag of VM %s to %s", vm, profile)
598

    
599
    with pooled_rapi_client(vm) as client:
600
        # Delete all firewall tags
601
        for t in _firewall_tags.values():
602
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
603
                                      dry_run=settings.TEST)
604

    
605
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
606

    
607
        # XXX NOP ModifyInstance call to force process_net_status to run
608
        # on the dispatcher
609
        client.ModifyInstance(vm.backend_vm_id,
610
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
611

    
612

    
613
def get_ganeti_instances(backend=None, bulk=False):
614
    instances = []
615
    for backend in get_backends(backend):
616
        with pooled_rapi_client(backend) as client:
617
            instances.append(client.GetInstances(bulk=bulk))
618

    
619
    return reduce(list.__add__, instances, [])
620

    
621

    
622
def get_ganeti_nodes(backend=None, bulk=False):
623
    nodes = []
624
    for backend in get_backends(backend):
625
        with pooled_rapi_client(backend) as client:
626
            nodes.append(client.GetNodes(bulk=bulk))
627

    
628
    return reduce(list.__add__, nodes, [])
629

    
630

    
631
def get_ganeti_jobs(backend=None, bulk=False):
632
    jobs = []
633
    for backend in get_backends(backend):
634
        with pooled_rapi_client(backend) as client:
635
            jobs.append(client.GetJobs(bulk=bulk))
636
    return reduce(list.__add__, jobs, [])
637

    
638
##
639
##
640
##
641

    
642

    
643
def get_backends(backend=None):
644
    if backend:
645
        if backend.offline:
646
            return []
647
        return [backend]
648
    return Backend.objects.filter(offline=False)
649

    
650

    
651
def get_physical_resources(backend):
652
    """ Get the physical resources of a backend.
653

654
    Get the resources of a backend as reported by the backend (not the db).
655

656
    """
657
    nodes = get_ganeti_nodes(backend, bulk=True)
658
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
659
    res = {}
660
    for a in attr:
661
        res[a] = 0
662
    for n in nodes:
663
        # Filter out drained, offline and not vm_capable nodes since they will
664
        # not take part in the vm allocation process
665
        if n['vm_capable'] and not n['drained'] and not n['offline']\
666
           and n['cnodes']:
667
            for a in attr:
668
                res[a] += int(n[a])
669
    return res
670

    
671

    
672
def update_resources(backend, resources=None):
673
    """ Update the state of the backend resources in db.
674

675
    """
676

    
677
    if not resources:
678
        resources = get_physical_resources(backend)
679

    
680
    backend.mfree = resources['mfree']
681
    backend.mtotal = resources['mtotal']
682
    backend.dfree = resources['dfree']
683
    backend.dtotal = resources['dtotal']
684
    backend.pinst_cnt = resources['pinst_cnt']
685
    backend.ctotal = resources['ctotal']
686
    backend.updated = datetime.now()
687
    backend.save()
688

    
689

    
690
def get_memory_from_instances(backend):
691
    """ Get the memory that is used from instances.
692

693
    Get the used memory of a backend. Note: This is different for
694
    the real memory used, due to kvm's memory de-duplication.
695

696
    """
697
    with pooled_rapi_client(backend) as client:
698
        instances = client.GetInstances(bulk=True)
699
    mem = 0
700
    for i in instances:
701
        mem += i['oper_ram']
702
    return mem
703

    
704
##
705
## Synchronized operations for reconciliation
706
##
707

    
708

    
709
def create_network_synced(network, backend):
710
    result = _create_network_synced(network, backend)
711
    if result[0] != 'success':
712
        return result
713
    result = connect_network_synced(network, backend)
714
    return result
715

    
716

    
717
def _create_network_synced(network, backend):
718
    with pooled_rapi_client(backend) as client:
719
        job = _create_network(network, backend)
720
        result = wait_for_job(client, job)
721
    return result
722

    
723

    
724
def connect_network_synced(network, backend):
725
    with pooled_rapi_client(backend) as client:
726
        for group in client.GetGroups():
727
            job = client.ConnectNetwork(network.backend_id, group,
728
                                        network.mode, network.link)
729
            result = wait_for_job(client, job)
730
            if result[0] != 'success':
731
                return result
732

    
733
    return result
734

    
735

    
736
def wait_for_job(client, jobid):
737
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
738
    status = result['job_info'][0]
739
    while status not in ['success', 'error', 'cancel']:
740
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
741
                                        [result], None)
742
        status = result['job_info'][0]
743

    
744
    if status == 'success':
745
        return (status, None)
746
    else:
747
        error = result['job_info'][1]
748
        return (status, error)