Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 039e3e61

History | View | Annotate | Download (25.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, BridgePoolTable,
43
                               MacPrefixPoolTable, VirtualMachineDiagnostic)
44
from synnefo.logic import utils
45
from synnefo import quotas
46
from synnefo.api.util import release_resource
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
@quotas.uses_commission
61
@transaction.commit_on_success
62
def process_op_status(serials, vm, etime, jobid, opcode, status, logmsg):
63
    """Process a job progress notification from the backend
64

65
    Process an incoming message from the backend (currently Ganeti).
66
    Job notifications with a terminating status (sucess, error, or canceled),
67
    also update the operating state of the VM.
68

69
    """
70
    # See #1492, #1031, #1111 why this line has been removed
71
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
72
    if status not in [x[0] for x in BACKEND_STATUSES]:
73
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
74

    
75
    vm.backendjobid = jobid
76
    vm.backendjobstatus = status
77
    vm.backendopcode = opcode
78
    vm.backendlogmsg = logmsg
79

    
80
    # Notifications of success change the operating state
81
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
82
    if status == 'success' and state_for_success is not None:
83
        vm.operstate = state_for_success
84

    
85
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
86
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
87
        vm.operstate = 'ERROR'
88
        vm.backendtime = etime
89
    elif opcode == 'OP_INSTANCE_REMOVE':
90
        # Set the deleted flag explicitly, cater for admin-initiated removals
91
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
        # when no instance exists at the Ganeti backend.
93
        # See ticket #799 for all the details.
94
        #
95
        if status == 'success' or (status == 'error' and
96
                                   vm.operstate == 'ERROR'):
97
            # Issue commission
98
            serial = quotas.issue_vm_commission(vm.userid, vm.flavor,
99
                                                delete=True)
100
            serials.append(serial)
101
            vm.serial = serial
102
            serial.accepted = True
103
            serial.save()
104
            release_instance_nics(vm)
105
            vm.nics.all().delete()
106
            vm.deleted = True
107
            vm.operstate = state_for_success
108
            vm.backendtime = etime
109

    
110
    # Update backendtime only for jobs that have been successfully completed,
111
    # since only these jobs update the state of the VM. Else a "race condition"
112
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
113
    # before an error job and messages arrive in reversed order.
114
    if status == 'success':
115
        vm.backendtime = etime
116

    
117
    vm.save()
118

    
119

    
120
@transaction.commit_on_success
121
def process_net_status(vm, etime, nics):
122
    """Process a net status notification from the backend
123

124
    Process an incoming message from the Ganeti backend,
125
    detailing the NIC configuration of a VM instance.
126

127
    Update the state of the VM in the DB accordingly.
128
    """
129

    
130
    ganeti_nics = process_ganeti_nics(nics)
131
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
132
        log.debug("NICs for VM %s have not changed", vm)
133

    
134
    release_instance_nics(vm)
135

    
136
    for nic in ganeti_nics:
137
        ipv4 = nic.get('ipv4', '')
138
        net = nic['network']
139
        if ipv4:
140
            net.reserve_address(ipv4)
141

    
142
        nic['dirty'] = False
143
        vm.nics.create(**nic)
144
        # Dummy save the network, because UI uses changed-since for VMs
145
        # and Networks in order to show the VM NICs
146
        net.save()
147

    
148
    vm.backendtime = etime
149
    vm.save()
150

    
151

    
152
def process_ganeti_nics(ganeti_nics):
153
    """Process NIC dict from ganeti hooks."""
154
    new_nics = []
155
    for i, new_nic in enumerate(ganeti_nics):
156
        network = new_nic.get('network', '')
157
        n = str(network)
158
        pk = utils.id_from_network_name(n)
159

    
160
        net = Network.objects.get(pk=pk)
161

    
162
        # Get the new nic info
163
        mac = new_nic.get('mac', '')
164
        ipv4 = new_nic.get('ip', '')
165
        ipv6 = new_nic.get('ipv6', '')
166

    
167
        firewall = new_nic.get('firewall', '')
168
        firewall_profile = _reverse_tags.get(firewall, '')
169
        if not firewall_profile and net.public:
170
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
171

    
172
        nic = {
173
               'index': i,
174
               'network': net,
175
               'mac': mac,
176
               'ipv4': ipv4,
177
               'ipv6': ipv6,
178
               'firewall_profile': firewall_profile}
179

    
180
        new_nics.append(nic)
181
    return new_nics
182

    
183

    
184
def nics_changed(old_nics, new_nics):
185
    """Return True if NICs have changed in any way."""
186
    if len(old_nics) != len(new_nics):
187
        return True
188
    for old_nic, new_nic in zip(old_nics, new_nics):
189
        if not (old_nic.ipv4 == new_nic['ipv4'] and\
190
                old_nic.ipv6 == new_nic['ipv6'] and\
191
                old_nic.mac == new_nic['mac'] and\
192
                old_nic.firewall_profile == new_nic['firewall_profile'] and\
193
                old_nic.index == new_nic['index'] and\
194
                old_nic.network == new_nic['network']):
195
            return True
196
    return False
197

    
198

    
199
def release_instance_nics(vm):
200
    for nic in vm.nics.all():
201
        net = nic.network
202
        if nic.ipv4:
203
            net.release_address(nic.ipv4)
204
        nic.delete()
205
        net.save()
206

    
207

    
208
@transaction.commit_on_success
209
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
210
    if status not in [x[0] for x in BACKEND_STATUSES]:
211
        raise Network.InvalidBackendMsgError(opcode, status)
212

    
213
    back_network.backendjobid = jobid
214
    back_network.backendjobstatus = status
215
    back_network.backendopcode = opcode
216
    back_network.backendlogmsg = logmsg
217

    
218
    # Notifications of success change the operating state
219
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
220
    if status == 'success' and state_for_success is not None:
221
        back_network.operstate = state_for_success
222

    
223
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
224
        back_network.operstate = 'ERROR'
225
        back_network.backendtime = etime
226

    
227
    if opcode == 'OP_NETWORK_REMOVE':
228
        if status == 'success' or (status == 'error' and
229
                                   back_network.operstate == 'ERROR'):
230
            back_network.operstate = state_for_success
231
            back_network.deleted = True
232
            back_network.backendtime = etime
233

    
234
    if status == 'success':
235
        back_network.backendtime = etime
236
    back_network.save()
237
    # Also you must update the state of the Network!!
238
    update_network_state(back_network.network)
239

    
240

    
241
@quotas.uses_commission
242
def update_network_state(serials, network):
243
    old_state = network.state
244

    
245
    backend_states = [s.operstate for s in network.backend_networks.all()]
246
    if not backend_states:
247
        network.state = 'PENDING'
248
        network.save()
249
        return
250

    
251
    all_equal = len(set(backend_states)) <= 1
252
    network.state = all_equal and backend_states[0] or 'PENDING'
253
    # Release the resources on the deletion of the Network
254
    if old_state != 'DELETED' and network.state == 'DELETED':
255
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
256
                 network.id, network.mac_prefix, network.link)
257
        network.deleted = True
258
        if network.mac_prefix:
259
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
260
                release_resource(res_type="mac_prefix",
261
                                 value=network.mac_prefix)
262
        if network.link:
263
            if network.FLAVORS[network.flavor]["link"] == "pool":
264
                release_resource(res_type="bridge", value=network.link)
265

    
266
        # Issue commission
267
        serial = quotas.issue_network_commission(network.userid, delete=True)
268
        serials.append(serial)
269
        network.serial = serial
270
        serial.accepted = True
271
        serial.save()
272
    network.save()
273

    
274

    
275
@transaction.commit_on_success
276
def process_network_modify(back_network, etime, jobid, opcode, status,
277
                           add_reserved_ips, remove_reserved_ips):
278
    assert (opcode == "OP_NETWORK_SET_PARAMS")
279
    if status not in [x[0] for x in BACKEND_STATUSES]:
280
        raise Network.InvalidBackendMsgError(opcode, status)
281

    
282
    back_network.backendjobid = jobid
283
    back_network.backendjobstatus = status
284
    back_network.opcode = opcode
285

    
286
    if add_reserved_ips or remove_reserved_ips:
287
        net = back_network.network
288
        pool = net.get_pool()
289
        if add_reserved_ips:
290
            for ip in add_reserved_ips:
291
                pool.reserve(ip, external=True)
292
        if remove_reserved_ips:
293
            for ip in remove_reserved_ips:
294
                pool.put(ip, external=True)
295
        pool.save()
296

    
297
    if status == 'success':
298
        back_network.backendtime = etime
299
    back_network.save()
300

    
301

    
302
@transaction.commit_on_success
303
def process_create_progress(vm, etime, progress):
304

    
305
    percentage = int(progress)
306

    
307
    # The percentage may exceed 100%, due to the way
308
    # snf-image:copy-progress tracks bytes read by image handling processes
309
    percentage = 100 if percentage > 100 else percentage
310
    if percentage < 0:
311
        raise ValueError("Percentage cannot be negative")
312

    
313
    # FIXME: log a warning here, see #1033
314
#   if last_update > percentage:
315
#       raise ValueError("Build percentage should increase monotonically " \
316
#                        "(old = %d, new = %d)" % (last_update, percentage))
317

    
318
    # This assumes that no message of type 'ganeti-create-progress' is going to
319
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
320
    # the instance is STARTED.  What if the two messages are processed by two
321
    # separate dispatcher threads, and the 'ganeti-op-status' message for
322
    # successful creation gets processed before the 'ganeti-create-progress'
323
    # message? [vkoukis]
324
    #
325
    #if not vm.operstate == 'BUILD':
326
    #    raise VirtualMachine.IllegalState("VM is not in building state")
327

    
328
    vm.buildpercentage = percentage
329
    vm.backendtime = etime
330
    vm.save()
331

    
332

    
333
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
334
    details=None):
335
    """
336
    Create virtual machine instance diagnostic entry.
337

338
    :param vm: VirtualMachine instance to create diagnostic for.
339
    :param message: Diagnostic message.
340
    :param source: Diagnostic source identifier (e.g. image-helper).
341
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
342
    :param etime: The time the message occured (if available).
343
    :param details: Additional details or debug information.
344
    """
345
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
346
            source_date=etime, message=message, details=details)
347

    
348

    
349
def create_instance(vm, public_nic, flavor, image, password=None):
350
    """`image` is a dictionary which should contain the keys:
351
            'backend_id', 'format' and 'metadata'
352

353
        metadata value should be a dictionary.
354
    """
355

    
356
    # Handle arguments to CreateInstance() as a dictionary,
357
    # initialize it based on a deployment-specific value.
358
    # This enables the administrator to override deployment-specific
359
    # arguments, such as the disk template to use, name of os provider
360
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
361
    #
362
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
363
    kw['mode'] = 'create'
364
    kw['name'] = vm.backend_vm_id
365
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
366

    
367
    kw['disk_template'] = flavor.disk_template
368
    kw['disks'] = [{"size": flavor.disk * 1024}]
369
    provider = flavor.disk_provider
370
    if provider:
371
        kw['disks'][0]['provider'] = provider
372

    
373
        if provider == 'vlmc':
374
            kw['disks'][0]['origin'] = flavor.disk_origin
375

    
376
    kw['nics'] = [public_nic]
377
    if settings.GANETI_USE_HOTPLUG:
378
        kw['hotplug'] = True
379
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
380
    # kw['os'] = settings.GANETI_OS_PROVIDER
381
    kw['ip_check'] = False
382
    kw['name_check'] = False
383

    
384
    # Do not specific a node explicitly, have
385
    # Ganeti use an iallocator instead
386
    #kw['pnode'] = rapi.GetNodes()[0]
387

    
388
    kw['dry_run'] = settings.TEST
389

    
390
    kw['beparams'] = {
391
       'auto_balance': True,
392
       'vcpus': flavor.cpu,
393
       'memory': flavor.ram}
394

    
395
    kw['osparams'] = {
396
        'config_url': vm.config_url,
397
        # Store image id and format to Ganeti
398
        'img_id': image['backend_id'],
399
        'img_format': image['format']}
400

    
401
    if password:
402
        # Only for admin created VMs !!
403
        kw['osparams']['img_passwd'] = password
404

    
405
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
406
    # kw['hvparams'] = dict(serial_console=False)
407

    
408
    log.debug("Creating instance %s", utils.hide_pass(kw))
409
    with pooled_rapi_client(vm) as client:
410
        return client.CreateInstance(**kw)
411

    
412

    
413
def delete_instance(vm):
414
    with pooled_rapi_client(vm) as client:
415
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
416

    
417

    
418
def reboot_instance(vm, reboot_type):
419
    assert reboot_type in ('soft', 'hard')
420
    with pooled_rapi_client(vm) as client:
421
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
422
                                     dry_run=settings.TEST)
423

    
424

    
425
def startup_instance(vm):
426
    with pooled_rapi_client(vm) as client:
427
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
428

    
429

    
430
def shutdown_instance(vm):
431
    with pooled_rapi_client(vm) as client:
432
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
433

    
434

    
435
def get_instance_console(vm):
436
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
437
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
438
    # useless (see #783).
439
    #
440
    # Until this is fixed on the Ganeti side, construct a console info reply
441
    # directly.
442
    #
443
    # WARNING: This assumes that VNC runs on port network_port on
444
    #          the instance's primary node, and is probably
445
    #          hypervisor-specific.
446
    #
447
    log.debug("Getting console for vm %s", vm)
448

    
449
    console = {}
450
    console['kind'] = 'vnc'
451

    
452
    with pooled_rapi_client(vm) as client:
453
        i = client.GetInstance(vm.backend_vm_id)
454

    
455
    if i['hvparams']['serial_console']:
456
        raise Exception("hv parameter serial_console cannot be true")
457
    console['host'] = i['pnode']
458
    console['port'] = i['network_port']
459

    
460
    return console
461

    
462

    
463
def get_instance_info(vm):
464
    with pooled_rapi_client(vm) as client:
465
        return client.GetInstanceInfo(vm.backend_vm_id)
466

    
467

    
468
def create_network(network, backends=None, connect=True):
469
    """Create and connect a network."""
470
    if not backends:
471
        backends = Backend.objects.exclude(offline=True)
472

    
473
    log.debug("Creating network %s in backends %s", network, backends)
474

    
475
    for backend in backends:
476
        create_jobID = _create_network(network, backend)
477
        if connect:
478
            connect_network(network, backend, create_jobID)
479

    
480

    
481
def _create_network(network, backend):
482
    """Create a network."""
483

    
484
    network_type = network.public and 'public' or 'private'
485

    
486
    tags = network.backend_tag
487
    if network.dhcp:
488
        tags.append('nfdhcpd')
489

    
490
    if network.public:
491
        conflicts_check = True
492
    else:
493
        conflicts_check = False
494

    
495
    try:
496
        bn = BackendNetwork.objects.get(network=network, backend=backend)
497
        mac_prefix = bn.mac_prefix
498
    except BackendNetwork.DoesNotExist:
499
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
500
                        " does not exist" % (network.id, backend.id))
501

    
502
    with pooled_rapi_client(backend) as client:
503
        return client.CreateNetwork(network_name=network.backend_id,
504
                                    network=network.subnet,
505
                                    network6=network.subnet6,
506
                                    gateway=network.gateway,
507
                                    gateway6=network.gateway6,
508
                                    network_type=network_type,
509
                                    mac_prefix=mac_prefix,
510
                                    conflicts_check=conflicts_check,
511
                                    tags=tags)
512

    
513

    
514
def connect_network(network, backend, depend_job=None, group=None):
515
    """Connect a network to nodegroups."""
516
    log.debug("Connecting network %s to backend %s", network, backend)
517

    
518
    if network.public:
519
        conflicts_check = True
520
    else:
521
        conflicts_check = False
522

    
523
    depend_jobs = [depend_job] if depend_job else []
524
    with pooled_rapi_client(backend) as client:
525
        if group:
526
            client.ConnectNetwork(network.backend_id, group, network.mode,
527
                                  network.link, conflicts_check, depend_jobs)
528
        else:
529
            for group in client.GetGroups():
530
                client.ConnectNetwork(network.backend_id, group, network.mode,
531
                                      network.link, conflicts_check,
532
                                      depend_jobs)
533

    
534

    
535
def delete_network(network, backends=None, disconnect=True):
536
    if not backends:
537
        backends = Backend.objects.exclude(offline=True)
538

    
539
    log.debug("Deleting network %s from backends %s", network, backends)
540

    
541
    for backend in backends:
542
        disconnect_jobIDs = []
543
        if disconnect:
544
            disconnect_jobIDs = disconnect_network(network, backend)
545
        _delete_network(network, backend, disconnect_jobIDs)
546

    
547

    
548
def _delete_network(network, backend, depend_jobs=[]):
549
    with pooled_rapi_client(backend) as client:
550
        return client.DeleteNetwork(network.backend_id, depend_jobs)
551

    
552

    
553
def disconnect_network(network, backend, group=None):
554
    log.debug("Disconnecting network %s to backend %s", network, backend)
555

    
556
    with pooled_rapi_client(backend) as client:
557
        if group:
558
            return [client.DisconnectNetwork(network.backend_id, group)]
559
        else:
560
            jobs = []
561
            for group in client.GetGroups():
562
                job = client.DisconnectNetwork(network.backend_id, group)
563
                jobs.append(job)
564
            return jobs
565

    
566

    
567
def connect_to_network(vm, network, address=None):
568
    nic = {'ip': address, 'network': network.backend_id}
569

    
570
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
571

    
572
    with pooled_rapi_client(vm) as client:
573
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
574
                                     hotplug=settings.GANETI_USE_HOTPLUG,
575
                                     dry_run=settings.TEST)
576

    
577

    
578
def disconnect_from_network(vm, nic):
579
    op = [('remove', nic.index, {})]
580

    
581
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
582

    
583
    with pooled_rapi_client(vm) as client:
584
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
585
                                     hotplug=settings.GANETI_USE_HOTPLUG,
586
                                     dry_run=settings.TEST)
587

    
588

    
589
def set_firewall_profile(vm, profile):
590
    try:
591
        tag = _firewall_tags[profile]
592
    except KeyError:
593
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
594

    
595
    log.debug("Setting tag of VM %s to %s", vm, profile)
596

    
597
    with pooled_rapi_client(vm) as client:
598
        # Delete all firewall tags
599
        for t in _firewall_tags.values():
600
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
601
                                      dry_run=settings.TEST)
602

    
603
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
604

    
605
        # XXX NOP ModifyInstance call to force process_net_status to run
606
        # on the dispatcher
607
        client.ModifyInstance(vm.backend_vm_id,
608
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
609

    
610

    
611
def get_ganeti_instances(backend=None, bulk=False):
612
    instances = []
613
    for backend in get_backends(backend):
614
        with pooled_rapi_client(backend) as client:
615
            instances.append(client.GetInstances(bulk=bulk))
616

    
617
    return reduce(list.__add__, instances, [])
618

    
619

    
620
def get_ganeti_nodes(backend=None, bulk=False):
621
    nodes = []
622
    for backend in get_backends(backend):
623
        with pooled_rapi_client(backend) as client:
624
            nodes.append(client.GetNodes(bulk=bulk))
625

    
626
    return reduce(list.__add__, nodes, [])
627

    
628

    
629
def get_ganeti_jobs(backend=None, bulk=False):
630
    jobs = []
631
    for backend in get_backends(backend):
632
        with pooled_rapi_client(backend) as client:
633
            jobs.append(client.GetJobs(bulk=bulk))
634
    return reduce(list.__add__, jobs, [])
635

    
636
##
637
##
638
##
639

    
640

    
641
def get_backends(backend=None):
642
    if backend:
643
        if backend.offline:
644
            return []
645
        return [backend]
646
    return Backend.objects.filter(offline=False)
647

    
648

    
649
def get_physical_resources(backend):
650
    """ Get the physical resources of a backend.
651

652
    Get the resources of a backend as reported by the backend (not the db).
653

654
    """
655
    nodes = get_ganeti_nodes(backend, bulk=True)
656
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
657
    res = {}
658
    for a in attr:
659
        res[a] = 0
660
    for n in nodes:
661
        # Filter out drained, offline and not vm_capable nodes since they will
662
        # not take part in the vm allocation process
663
        if n['vm_capable'] and not n['drained'] and not n['offline']\
664
           and n['cnodes']:
665
            for a in attr:
666
                res[a] += int(n[a])
667
    return res
668

    
669

    
670
def update_resources(backend, resources=None):
671
    """ Update the state of the backend resources in db.
672

673
    """
674

    
675
    if not resources:
676
        resources = get_physical_resources(backend)
677

    
678
    backend.mfree = resources['mfree']
679
    backend.mtotal = resources['mtotal']
680
    backend.dfree = resources['dfree']
681
    backend.dtotal = resources['dtotal']
682
    backend.pinst_cnt = resources['pinst_cnt']
683
    backend.ctotal = resources['ctotal']
684
    backend.updated = datetime.now()
685
    backend.save()
686

    
687

    
688
def get_memory_from_instances(backend):
689
    """ Get the memory that is used from instances.
690

691
    Get the used memory of a backend. Note: This is different for
692
    the real memory used, due to kvm's memory de-duplication.
693

694
    """
695
    with pooled_rapi_client(backend) as client:
696
        instances = client.GetInstances(bulk=True)
697
    mem = 0
698
    for i in instances:
699
        mem += i['oper_ram']
700
    return mem
701

    
702
##
703
## Synchronized operations for reconciliation
704
##
705

    
706

    
707
def create_network_synced(network, backend):
708
    result = _create_network_synced(network, backend)
709
    if result[0] != 'success':
710
        return result
711
    result = connect_network_synced(network, backend)
712
    return result
713

    
714

    
715
def _create_network_synced(network, backend):
716
    with pooled_rapi_client(backend) as client:
717
        job = _create_network(network, backend)
718
        result = wait_for_job(client, job)
719
    return result
720

    
721

    
722
def connect_network_synced(network, backend):
723
    with pooled_rapi_client(backend) as client:
724
        for group in client.GetGroups():
725
            job = client.ConnectNetwork(network.backend_id, group,
726
                                        network.mode, network.link)
727
            result = wait_for_job(client, job)
728
            if result[0] != 'success':
729
                return result
730

    
731
    return result
732

    
733

    
734
def wait_for_job(client, jobid):
735
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
736
    status = result['job_info'][0]
737
    while status not in ['success', 'error', 'cancel']:
738
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
739
                                        [result], None)
740
        status = result['job_info'][0]
741

    
742
    if status == 'success':
743
        return (status, None)
744
    else:
745
        error = result['job_info'][1]
746
        return (status, error)