Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 939d71dd

History | View | Annotate | Download (25.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, BridgePoolTable,
43
                               MacPrefixPoolTable, VirtualMachineDiagnostic)
44
from synnefo.logic import utils
45
from synnefo import quotas
46
from synnefo.api.util import release_resource
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
@quotas.uses_commission
61
@transaction.commit_on_success
62
def process_op_status(serials, vm, etime, jobid, opcode, status, logmsg):
63
    """Process a job progress notification from the backend
64

65
    Process an incoming message from the backend (currently Ganeti).
66
    Job notifications with a terminating status (sucess, error, or canceled),
67
    also update the operating state of the VM.
68

69
    """
70
    # See #1492, #1031, #1111 why this line has been removed
71
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
72
    if status not in [x[0] for x in BACKEND_STATUSES]:
73
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
74

    
75
    vm.backendjobid = jobid
76
    vm.backendjobstatus = status
77
    vm.backendopcode = opcode
78
    vm.backendlogmsg = logmsg
79

    
80
    # Notifications of success change the operating state
81
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
82
    if status == 'success' and state_for_success is not None:
83
        vm.operstate = state_for_success
84

    
85
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
86
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
87
        vm.operstate = 'ERROR'
88
        vm.backendtime = etime
89
    elif opcode == 'OP_INSTANCE_REMOVE':
90
        # Set the deleted flag explicitly, cater for admin-initiated removals
91
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
        # when no instance exists at the Ganeti backend.
93
        # See ticket #799 for all the details.
94
        #
95
        if status == 'success' or (status == 'error' and
96
                                   vm.operstate == 'ERROR'):
97
            # Issue commission
98
            serial = quotas.issue_vm_commission(vm.userid, vm.flavor,
99
                                                delete=True)
100
            serials.append(serial)
101
            vm.serial = serial
102
            serial.accepted = True
103
            serial.save()
104
            release_instance_nics(vm)
105
            vm.nics.all().delete()
106
            vm.deleted = True
107
            vm.operstate = state_for_success
108
            vm.backendtime = etime
109

    
110
    # Update backendtime only for jobs that have been successfully completed,
111
    # since only these jobs update the state of the VM. Else a "race condition"
112
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
113
    # before an error job and messages arrive in reversed order.
114
    if status == 'success':
115
        vm.backendtime = etime
116

    
117
    vm.save()
118

    
119

    
120
@transaction.commit_on_success
121
def process_net_status(vm, etime, nics):
122
    """Process a net status notification from the backend
123

124
    Process an incoming message from the Ganeti backend,
125
    detailing the NIC configuration of a VM instance.
126

127
    Update the state of the VM in the DB accordingly.
128
    """
129

    
130
    ganeti_nics = process_ganeti_nics(nics)
131
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
132
        log.debug("NICs for VM %s have not changed", vm)
133

    
134
    release_instance_nics(vm)
135

    
136
    for nic in ganeti_nics:
137
        ipv4 = nic.get('ipv4', '')
138
        net = nic['network']
139
        if ipv4:
140
            net.reserve_address(ipv4)
141

    
142
        nic['dirty'] = False
143
        vm.nics.create(**nic)
144
        # Dummy save the network, because UI uses changed-since for VMs
145
        # and Networks in order to show the VM NICs
146
        net.save()
147

    
148
    vm.backendtime = etime
149
    vm.save()
150

    
151

    
152
def process_ganeti_nics(ganeti_nics):
153
    """Process NIC dict from ganeti hooks."""
154
    new_nics = []
155
    for i, new_nic in enumerate(ganeti_nics):
156
        network = new_nic.get('network', '')
157
        n = str(network)
158
        pk = utils.id_from_network_name(n)
159

    
160
        net = Network.objects.get(pk=pk)
161

    
162
        # Get the new nic info
163
        mac = new_nic.get('mac', '')
164
        ipv4 = new_nic.get('ip', '')
165
        ipv6 = new_nic.get('ipv6', '')
166

    
167
        firewall = new_nic.get('firewall', '')
168
        firewall_profile = _reverse_tags.get(firewall, '')
169
        if not firewall_profile and net.public:
170
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
171

    
172
        nic = {
173
            'index': i,
174
            'network': net,
175
            'mac': mac,
176
            'ipv4': ipv4,
177
            'ipv6': ipv6,
178
            'firewall_profile': firewall_profile,
179
            'state': 'ACTIVE'}
180

    
181
        new_nics.append(nic)
182
    return new_nics
183

    
184

    
185
def nics_changed(old_nics, new_nics):
186
    """Return True if NICs have changed in any way."""
187
    if len(old_nics) != len(new_nics):
188
        return True
189
    for old_nic, new_nic in zip(old_nics, new_nics):
190
        if not (old_nic.ipv4 == new_nic['ipv4'] and
191
                old_nic.ipv6 == new_nic['ipv6'] and
192
                old_nic.mac == new_nic['mac'] and
193
                old_nic.firewall_profile == new_nic['firewall_profile'] and
194
                old_nic.index == new_nic['index'] and
195
                old_nic.network == new_nic['network']):
196
            return True
197
    return False
198

    
199

    
200
def release_instance_nics(vm):
201
    for nic in vm.nics.all():
202
        net = nic.network
203
        if nic.ipv4:
204
            net.release_address(nic.ipv4)
205
        nic.delete()
206
        net.save()
207

    
208

    
209
@transaction.commit_on_success
210
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
211
    if status not in [x[0] for x in BACKEND_STATUSES]:
212
        raise Network.InvalidBackendMsgError(opcode, status)
213

    
214
    back_network.backendjobid = jobid
215
    back_network.backendjobstatus = status
216
    back_network.backendopcode = opcode
217
    back_network.backendlogmsg = logmsg
218

    
219
    # Notifications of success change the operating state
220
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
221
    if status == 'success' and state_for_success is not None:
222
        back_network.operstate = state_for_success
223

    
224
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
225
        back_network.operstate = 'ERROR'
226
        back_network.backendtime = etime
227

    
228
    if opcode == 'OP_NETWORK_REMOVE':
229
        if status == 'success' or (status == 'error' and
230
                                   back_network.operstate == 'ERROR'):
231
            back_network.operstate = state_for_success
232
            back_network.deleted = True
233
            back_network.backendtime = etime
234

    
235
    if status == 'success':
236
        back_network.backendtime = etime
237
    back_network.save()
238
    # Also you must update the state of the Network!!
239
    update_network_state(back_network.network)
240

    
241

    
242
@quotas.uses_commission
243
def update_network_state(serials, network):
244
    old_state = network.state
245

    
246
    backend_states = [s.operstate for s in
247
                      network.backend_networks.filter(backend__offline=False)]
248
    if not backend_states:
249
        network.state = 'PENDING'
250
        network.save()
251
        return
252

    
253
    all_equal = len(set(backend_states)) <= 1
254
    network.state = all_equal and backend_states[0] or 'PENDING'
255
    # Release the resources on the deletion of the Network
256
    if old_state != 'DELETED' and network.state == 'DELETED':
257
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
258
                 network.id, network.mac_prefix, network.link)
259
        network.deleted = True
260
        if network.mac_prefix:
261
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
262
                release_resource(res_type="mac_prefix",
263
                                 value=network.mac_prefix)
264
        if network.link:
265
            if network.FLAVORS[network.flavor]["link"] == "pool":
266
                release_resource(res_type="bridge", value=network.link)
267

    
268
        # Issue commission
269
        if network.userid:
270
            serial = quotas.issue_network_commission(network.userid,
271
                                                     delete=True)
272
            serials.append(serial)
273
            network.serial = serial
274
            serial.accepted = True
275
            serial.save()
276
        elif not network.public:
277
            log.warning("Network %s does not have an owner!", network.id)
278
    network.save()
279

    
280

    
281
@transaction.commit_on_success
282
def process_network_modify(back_network, etime, jobid, opcode, status,
283
                           add_reserved_ips, remove_reserved_ips):
284
    assert (opcode == "OP_NETWORK_SET_PARAMS")
285
    if status not in [x[0] for x in BACKEND_STATUSES]:
286
        raise Network.InvalidBackendMsgError(opcode, status)
287

    
288
    back_network.backendjobid = jobid
289
    back_network.backendjobstatus = status
290
    back_network.opcode = opcode
291

    
292
    if add_reserved_ips or remove_reserved_ips:
293
        net = back_network.network
294
        pool = net.get_pool()
295
        if add_reserved_ips:
296
            for ip in add_reserved_ips:
297
                pool.reserve(ip, external=True)
298
        if remove_reserved_ips:
299
            for ip in remove_reserved_ips:
300
                pool.put(ip, external=True)
301
        pool.save()
302

    
303
    if status == 'success':
304
        back_network.backendtime = etime
305
    back_network.save()
306

    
307

    
308
@transaction.commit_on_success
309
def process_create_progress(vm, etime, progress):
310

    
311
    percentage = int(progress)
312

    
313
    # The percentage may exceed 100%, due to the way
314
    # snf-image:copy-progress tracks bytes read by image handling processes
315
    percentage = 100 if percentage > 100 else percentage
316
    if percentage < 0:
317
        raise ValueError("Percentage cannot be negative")
318

    
319
    # FIXME: log a warning here, see #1033
320
#   if last_update > percentage:
321
#       raise ValueError("Build percentage should increase monotonically " \
322
#                        "(old = %d, new = %d)" % (last_update, percentage))
323

    
324
    # This assumes that no message of type 'ganeti-create-progress' is going to
325
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
326
    # the instance is STARTED.  What if the two messages are processed by two
327
    # separate dispatcher threads, and the 'ganeti-op-status' message for
328
    # successful creation gets processed before the 'ganeti-create-progress'
329
    # message? [vkoukis]
330
    #
331
    #if not vm.operstate == 'BUILD':
332
    #    raise VirtualMachine.IllegalState("VM is not in building state")
333

    
334
    vm.buildpercentage = percentage
335
    vm.backendtime = etime
336
    vm.save()
337

    
338

    
339
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
340
                               details=None):
341
    """
342
    Create virtual machine instance diagnostic entry.
343

344
    :param vm: VirtualMachine instance to create diagnostic for.
345
    :param message: Diagnostic message.
346
    :param source: Diagnostic source identifier (e.g. image-helper).
347
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
348
    :param etime: The time the message occured (if available).
349
    :param details: Additional details or debug information.
350
    """
351
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
352
                                                   source_date=etime,
353
                                                   message=message,
354
                                                   details=details)
355

    
356

    
357
def create_instance(vm, public_nic, flavor, image):
358
    """`image` is a dictionary which should contain the keys:
359
            'backend_id', 'format' and 'metadata'
360

361
        metadata value should be a dictionary.
362
    """
363

    
364
    # Handle arguments to CreateInstance() as a dictionary,
365
    # initialize it based on a deployment-specific value.
366
    # This enables the administrator to override deployment-specific
367
    # arguments, such as the disk template to use, name of os provider
368
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
369
    #
370
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
371
    kw['mode'] = 'create'
372
    kw['name'] = vm.backend_vm_id
373
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
374

    
375
    kw['disk_template'] = flavor.disk_template
376
    kw['disks'] = [{"size": flavor.disk * 1024}]
377
    provider = flavor.disk_provider
378
    if provider:
379
        kw['disks'][0]['provider'] = provider
380

    
381
        if provider == 'vlmc':
382
            kw['disks'][0]['origin'] = flavor.disk_origin
383

    
384
    kw['nics'] = [public_nic]
385
    if settings.GANETI_USE_HOTPLUG:
386
        kw['hotplug'] = True
387
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
388
    # kw['os'] = settings.GANETI_OS_PROVIDER
389
    kw['ip_check'] = False
390
    kw['name_check'] = False
391

    
392
    # Do not specific a node explicitly, have
393
    # Ganeti use an iallocator instead
394
    #kw['pnode'] = rapi.GetNodes()[0]
395

    
396
    kw['dry_run'] = settings.TEST
397

    
398
    kw['beparams'] = {
399
        'auto_balance': True,
400
        'vcpus': flavor.cpu,
401
        'memory': flavor.ram}
402

    
403
    kw['osparams'] = {
404
        'config_url': vm.config_url,
405
        # Store image id and format to Ganeti
406
        'img_id': image['backend_id'],
407
        'img_format': image['format']}
408

    
409
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
410
    # kw['hvparams'] = dict(serial_console=False)
411

    
412
    log.debug("Creating instance %s", utils.hide_pass(kw))
413
    with pooled_rapi_client(vm) as client:
414
        return client.CreateInstance(**kw)
415

    
416

    
417
def delete_instance(vm):
418
    with pooled_rapi_client(vm) as client:
419
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
420

    
421

    
422
def reboot_instance(vm, reboot_type):
423
    assert reboot_type in ('soft', 'hard')
424
    with pooled_rapi_client(vm) as client:
425
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
426
                                     dry_run=settings.TEST)
427

    
428

    
429
def startup_instance(vm):
430
    with pooled_rapi_client(vm) as client:
431
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
432

    
433

    
434
def shutdown_instance(vm):
435
    with pooled_rapi_client(vm) as client:
436
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
437

    
438

    
439
def get_instance_console(vm):
440
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
441
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
442
    # useless (see #783).
443
    #
444
    # Until this is fixed on the Ganeti side, construct a console info reply
445
    # directly.
446
    #
447
    # WARNING: This assumes that VNC runs on port network_port on
448
    #          the instance's primary node, and is probably
449
    #          hypervisor-specific.
450
    #
451
    log.debug("Getting console for vm %s", vm)
452

    
453
    console = {}
454
    console['kind'] = 'vnc'
455

    
456
    with pooled_rapi_client(vm) as client:
457
        i = client.GetInstance(vm.backend_vm_id)
458

    
459
    if i['hvparams']['serial_console']:
460
        raise Exception("hv parameter serial_console cannot be true")
461
    console['host'] = i['pnode']
462
    console['port'] = i['network_port']
463

    
464
    return console
465

    
466

    
467
def get_instance_info(vm):
468
    with pooled_rapi_client(vm) as client:
469
        return client.GetInstanceInfo(vm.backend_vm_id)
470

    
471

    
472
def create_network(network, backends=None, connect=True):
473
    """Create and connect a network."""
474
    if not backends:
475
        backends = Backend.objects.exclude(offline=True)
476

    
477
    log.debug("Creating network %s in backends %s", network, backends)
478

    
479
    for backend in backends:
480
        create_jobID = _create_network(network, backend)
481
        if connect:
482
            connect_network(network, backend, create_jobID)
483

    
484

    
485
def _create_network(network, backend):
486
    """Create a network."""
487

    
488
    network_type = network.public and 'public' or 'private'
489

    
490
    tags = network.backend_tag
491
    if network.dhcp:
492
        tags.append('nfdhcpd')
493

    
494
    if network.public:
495
        conflicts_check = True
496
    else:
497
        conflicts_check = False
498

    
499
    try:
500
        bn = BackendNetwork.objects.get(network=network, backend=backend)
501
        mac_prefix = bn.mac_prefix
502
    except BackendNetwork.DoesNotExist:
503
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
504
                        " does not exist" % (network.id, backend.id))
505

    
506
    with pooled_rapi_client(backend) as client:
507
        return client.CreateNetwork(network_name=network.backend_id,
508
                                    network=network.subnet,
509
                                    network6=network.subnet6,
510
                                    gateway=network.gateway,
511
                                    gateway6=network.gateway6,
512
                                    network_type=network_type,
513
                                    mac_prefix=mac_prefix,
514
                                    conflicts_check=conflicts_check,
515
                                    tags=tags)
516

    
517

    
518
def connect_network(network, backend, depend_job=None, group=None):
519
    """Connect a network to nodegroups."""
520
    log.debug("Connecting network %s to backend %s", network, backend)
521

    
522
    if network.public:
523
        conflicts_check = True
524
    else:
525
        conflicts_check = False
526

    
527
    depend_jobs = [depend_job] if depend_job else []
528
    with pooled_rapi_client(backend) as client:
529
        if group:
530
            client.ConnectNetwork(network.backend_id, group, network.mode,
531
                                  network.link, conflicts_check, depend_jobs)
532
        else:
533
            for group in client.GetGroups():
534
                client.ConnectNetwork(network.backend_id, group, network.mode,
535
                                      network.link, conflicts_check,
536
                                      depend_jobs)
537

    
538

    
539
def delete_network(network, backends=None, disconnect=True):
540
    if not backends:
541
        backends = Backend.objects.exclude(offline=True)
542

    
543
    log.debug("Deleting network %s from backends %s", network, backends)
544

    
545
    for backend in backends:
546
        disconnect_jobIDs = []
547
        if disconnect:
548
            disconnect_jobIDs = disconnect_network(network, backend)
549
        _delete_network(network, backend, disconnect_jobIDs)
550

    
551

    
552
def _delete_network(network, backend, depend_jobs=[]):
553
    with pooled_rapi_client(backend) as client:
554
        return client.DeleteNetwork(network.backend_id, depend_jobs)
555

    
556

    
557
def disconnect_network(network, backend, group=None):
558
    log.debug("Disconnecting network %s to backend %s", network, backend)
559

    
560
    with pooled_rapi_client(backend) as client:
561
        if group:
562
            return [client.DisconnectNetwork(network.backend_id, group)]
563
        else:
564
            jobs = []
565
            for group in client.GetGroups():
566
                job = client.DisconnectNetwork(network.backend_id, group)
567
                jobs.append(job)
568
            return jobs
569

    
570

    
571
def connect_to_network(vm, network, address=None):
572
    nic = {'ip': address, 'network': network.backend_id}
573

    
574
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
575

    
576
    with pooled_rapi_client(vm) as client:
577
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
578
                                     hotplug=settings.GANETI_USE_HOTPLUG,
579
                                     dry_run=settings.TEST)
580

    
581

    
582
def disconnect_from_network(vm, nic):
583
    op = [('remove', nic.index, {})]
584

    
585
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
586

    
587
    with pooled_rapi_client(vm) as client:
588
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
589
                                     hotplug=settings.GANETI_USE_HOTPLUG,
590
                                     dry_run=settings.TEST)
591

    
592

    
593
def set_firewall_profile(vm, profile):
594
    try:
595
        tag = _firewall_tags[profile]
596
    except KeyError:
597
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
598

    
599
    log.debug("Setting tag of VM %s to %s", vm, profile)
600

    
601
    with pooled_rapi_client(vm) as client:
602
        # Delete all firewall tags
603
        for t in _firewall_tags.values():
604
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
605
                                      dry_run=settings.TEST)
606

    
607
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
608

    
609
        # XXX NOP ModifyInstance call to force process_net_status to run
610
        # on the dispatcher
611
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
612
        client.ModifyInstance(vm.backend_vm_id,
613
                              os_name=os_name)
614

    
615

    
616
def get_ganeti_instances(backend=None, bulk=False):
617
    instances = []
618
    for backend in get_backends(backend):
619
        with pooled_rapi_client(backend) as client:
620
            instances.append(client.GetInstances(bulk=bulk))
621

    
622
    return reduce(list.__add__, instances, [])
623

    
624

    
625
def get_ganeti_nodes(backend=None, bulk=False):
626
    nodes = []
627
    for backend in get_backends(backend):
628
        with pooled_rapi_client(backend) as client:
629
            nodes.append(client.GetNodes(bulk=bulk))
630

    
631
    return reduce(list.__add__, nodes, [])
632

    
633

    
634
def get_ganeti_jobs(backend=None, bulk=False):
635
    jobs = []
636
    for backend in get_backends(backend):
637
        with pooled_rapi_client(backend) as client:
638
            jobs.append(client.GetJobs(bulk=bulk))
639
    return reduce(list.__add__, jobs, [])
640

    
641
##
642
##
643
##
644

    
645

    
646
def get_backends(backend=None):
647
    if backend:
648
        if backend.offline:
649
            return []
650
        return [backend]
651
    return Backend.objects.filter(offline=False)
652

    
653

    
654
def get_physical_resources(backend):
655
    """ Get the physical resources of a backend.
656

657
    Get the resources of a backend as reported by the backend (not the db).
658

659
    """
660
    nodes = get_ganeti_nodes(backend, bulk=True)
661
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
662
    res = {}
663
    for a in attr:
664
        res[a] = 0
665
    for n in nodes:
666
        # Filter out drained, offline and not vm_capable nodes since they will
667
        # not take part in the vm allocation process
668
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
669
        if can_host_vms and n['cnodes']:
670
            for a in attr:
671
                res[a] += int(n[a])
672
    return res
673

    
674

    
675
def update_resources(backend, resources=None):
676
    """ Update the state of the backend resources in db.
677

678
    """
679

    
680
    if not resources:
681
        resources = get_physical_resources(backend)
682

    
683
    backend.mfree = resources['mfree']
684
    backend.mtotal = resources['mtotal']
685
    backend.dfree = resources['dfree']
686
    backend.dtotal = resources['dtotal']
687
    backend.pinst_cnt = resources['pinst_cnt']
688
    backend.ctotal = resources['ctotal']
689
    backend.updated = datetime.now()
690
    backend.save()
691

    
692

    
693
def get_memory_from_instances(backend):
694
    """ Get the memory that is used from instances.
695

696
    Get the used memory of a backend. Note: This is different for
697
    the real memory used, due to kvm's memory de-duplication.
698

699
    """
700
    with pooled_rapi_client(backend) as client:
701
        instances = client.GetInstances(bulk=True)
702
    mem = 0
703
    for i in instances:
704
        mem += i['oper_ram']
705
    return mem
706

    
707
##
708
## Synchronized operations for reconciliation
709
##
710

    
711

    
712
def create_network_synced(network, backend):
713
    result = _create_network_synced(network, backend)
714
    if result[0] != 'success':
715
        return result
716
    result = connect_network_synced(network, backend)
717
    return result
718

    
719

    
720
def _create_network_synced(network, backend):
721
    with pooled_rapi_client(backend) as client:
722
        job = _create_network(network, backend)
723
        result = wait_for_job(client, job)
724
    return result
725

    
726

    
727
def connect_network_synced(network, backend):
728
    with pooled_rapi_client(backend) as client:
729
        for group in client.GetGroups():
730
            job = client.ConnectNetwork(network.backend_id, group,
731
                                        network.mode, network.link)
732
            result = wait_for_job(client, job)
733
            if result[0] != 'success':
734
                return result
735

    
736
    return result
737

    
738

    
739
def wait_for_job(client, jobid):
740
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
741
    status = result['job_info'][0]
742
    while status not in ['success', 'error', 'cancel']:
743
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
744
                                         [result], None)
745
        status = result['job_info'][0]
746

    
747
    if status == 'success':
748
        return (status, None)
749
    else:
750
        error = result['job_info'][1]
751
        return (status, error)