Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 1ad47ca5

History | View | Annotate | Download (25.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, BridgePoolTable,
43
                               MacPrefixPoolTable, VirtualMachineDiagnostic)
44
from synnefo.logic import utils
45
from synnefo import quotas
46
from synnefo.api.util import release_resource
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
@quotas.uses_commission
61
@transaction.commit_on_success
62
def process_op_status(serials, vm, etime, jobid, opcode, status, logmsg):
63
    """Process a job progress notification from the backend
64

65
    Process an incoming message from the backend (currently Ganeti).
66
    Job notifications with a terminating status (sucess, error, or canceled),
67
    also update the operating state of the VM.
68

69
    """
70
    # See #1492, #1031, #1111 why this line has been removed
71
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
72
    if status not in [x[0] for x in BACKEND_STATUSES]:
73
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
74

    
75
    vm.backendjobid = jobid
76
    vm.backendjobstatus = status
77
    vm.backendopcode = opcode
78
    vm.backendlogmsg = logmsg
79

    
80
    # Notifications of success change the operating state
81
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
82
    if status == 'success' and state_for_success is not None:
83
        vm.operstate = state_for_success
84

    
85
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
86
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
87
        vm.operstate = 'ERROR'
88
        vm.backendtime = etime
89
    elif opcode == 'OP_INSTANCE_REMOVE':
90
        # Set the deleted flag explicitly, cater for admin-initiated removals
91
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
        # when no instance exists at the Ganeti backend.
93
        # See ticket #799 for all the details.
94
        #
95
        if status == 'success' or (status == 'error' and
96
                                   vm.operstate == 'ERROR'):
97
            # Issue commission
98
            serial = quotas.issue_vm_commission(vm.userid, vm.flavor,
99
                                                delete=True)
100
            serials.append(serial)
101
            vm.serial = serial
102
            serial.accepted = True
103
            serial.save()
104
            release_instance_nics(vm)
105
            vm.nics.all().delete()
106
            vm.deleted = True
107
            vm.operstate = state_for_success
108
            vm.backendtime = etime
109

    
110
    # Update backendtime only for jobs that have been successfully completed,
111
    # since only these jobs update the state of the VM. Else a "race condition"
112
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
113
    # before an error job and messages arrive in reversed order.
114
    if status == 'success':
115
        vm.backendtime = etime
116

    
117
    vm.save()
118

    
119

    
120
@transaction.commit_on_success
121
def process_net_status(vm, etime, nics):
122
    """Process a net status notification from the backend
123

124
    Process an incoming message from the Ganeti backend,
125
    detailing the NIC configuration of a VM instance.
126

127
    Update the state of the VM in the DB accordingly.
128
    """
129

    
130
    ganeti_nics = process_ganeti_nics(nics)
131
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
132
        log.debug("NICs for VM %s have not changed", vm)
133

    
134
    release_instance_nics(vm)
135

    
136
    for nic in ganeti_nics:
137
        ipv4 = nic.get('ipv4', '')
138
        net = nic['network']
139
        if ipv4:
140
            net.reserve_address(ipv4)
141

    
142
        nic['dirty'] = False
143
        vm.nics.create(**nic)
144
        # Dummy save the network, because UI uses changed-since for VMs
145
        # and Networks in order to show the VM NICs
146
        net.save()
147

    
148
    vm.backendtime = etime
149
    vm.save()
150

    
151

    
152
def process_ganeti_nics(ganeti_nics):
153
    """Process NIC dict from ganeti hooks."""
154
    new_nics = []
155
    for i, new_nic in enumerate(ganeti_nics):
156
        network = new_nic.get('network', '')
157
        n = str(network)
158
        pk = utils.id_from_network_name(n)
159

    
160
        net = Network.objects.get(pk=pk)
161

    
162
        # Get the new nic info
163
        mac = new_nic.get('mac', '')
164
        ipv4 = new_nic.get('ip', '')
165
        ipv6 = new_nic.get('ipv6', '')
166

    
167
        firewall = new_nic.get('firewall', '')
168
        firewall_profile = _reverse_tags.get(firewall, '')
169
        if not firewall_profile and net.public:
170
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
171

    
172
        nic = {
173
            'index': i,
174
            'network': net,
175
            'mac': mac,
176
            'ipv4': ipv4,
177
            'ipv6': ipv6,
178
            'firewall_profile': firewall_profile}
179

    
180
        new_nics.append(nic)
181
    return new_nics
182

    
183

    
184
def nics_changed(old_nics, new_nics):
185
    """Return True if NICs have changed in any way."""
186
    if len(old_nics) != len(new_nics):
187
        return True
188
    for old_nic, new_nic in zip(old_nics, new_nics):
189
        if not (old_nic.ipv4 == new_nic['ipv4'] and
190
                old_nic.ipv6 == new_nic['ipv6'] and
191
                old_nic.mac == new_nic['mac'] and
192
                old_nic.firewall_profile == new_nic['firewall_profile'] and
193
                old_nic.index == new_nic['index'] and
194
                old_nic.network == new_nic['network']):
195
            return True
196
    return False
197

    
198

    
199
def release_instance_nics(vm):
200
    for nic in vm.nics.all():
201
        net = nic.network
202
        if nic.ipv4:
203
            net.release_address(nic.ipv4)
204
        nic.delete()
205
        net.save()
206

    
207

    
208
@transaction.commit_on_success
209
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
210
    if status not in [x[0] for x in BACKEND_STATUSES]:
211
        raise Network.InvalidBackendMsgError(opcode, status)
212

    
213
    back_network.backendjobid = jobid
214
    back_network.backendjobstatus = status
215
    back_network.backendopcode = opcode
216
    back_network.backendlogmsg = logmsg
217

    
218
    # Notifications of success change the operating state
219
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
220
    if status == 'success' and state_for_success is not None:
221
        back_network.operstate = state_for_success
222

    
223
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
224
        back_network.operstate = 'ERROR'
225
        back_network.backendtime = etime
226

    
227
    if opcode == 'OP_NETWORK_REMOVE':
228
        if status == 'success' or (status == 'error' and
229
                                   back_network.operstate == 'ERROR'):
230
            back_network.operstate = state_for_success
231
            back_network.deleted = True
232
            back_network.backendtime = etime
233

    
234
    if status == 'success':
235
        back_network.backendtime = etime
236
    back_network.save()
237
    # Also you must update the state of the Network!!
238
    update_network_state(back_network.network)
239

    
240

    
241
@quotas.uses_commission
242
def update_network_state(serials, network):
243
    old_state = network.state
244

    
245
    backend_states = [s.operstate for s in
246
                      network.backend_networks.filter(backend__offline=False)]
247
    if not backend_states:
248
        network.state = 'PENDING'
249
        network.save()
250
        return
251

    
252
    all_equal = len(set(backend_states)) <= 1
253
    network.state = all_equal and backend_states[0] or 'PENDING'
254
    # Release the resources on the deletion of the Network
255
    if old_state != 'DELETED' and network.state == 'DELETED':
256
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
257
                 network.id, network.mac_prefix, network.link)
258
        network.deleted = True
259
        if network.mac_prefix:
260
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
261
                release_resource(res_type="mac_prefix",
262
                                 value=network.mac_prefix)
263
        if network.link:
264
            if network.FLAVORS[network.flavor]["link"] == "pool":
265
                release_resource(res_type="bridge", value=network.link)
266

    
267
        # Issue commission
268
        serial = quotas.issue_network_commission(network.userid, delete=True)
269
        serials.append(serial)
270
        network.serial = serial
271
        serial.accepted = True
272
        serial.save()
273
    network.save()
274

    
275

    
276
@transaction.commit_on_success
277
def process_network_modify(back_network, etime, jobid, opcode, status,
278
                           add_reserved_ips, remove_reserved_ips):
279
    assert (opcode == "OP_NETWORK_SET_PARAMS")
280
    if status not in [x[0] for x in BACKEND_STATUSES]:
281
        raise Network.InvalidBackendMsgError(opcode, status)
282

    
283
    back_network.backendjobid = jobid
284
    back_network.backendjobstatus = status
285
    back_network.opcode = opcode
286

    
287
    if add_reserved_ips or remove_reserved_ips:
288
        net = back_network.network
289
        pool = net.get_pool()
290
        if add_reserved_ips:
291
            for ip in add_reserved_ips:
292
                pool.reserve(ip, external=True)
293
        if remove_reserved_ips:
294
            for ip in remove_reserved_ips:
295
                pool.put(ip, external=True)
296
        pool.save()
297

    
298
    if status == 'success':
299
        back_network.backendtime = etime
300
    back_network.save()
301

    
302

    
303
@transaction.commit_on_success
304
def process_create_progress(vm, etime, progress):
305

    
306
    percentage = int(progress)
307

    
308
    # The percentage may exceed 100%, due to the way
309
    # snf-image:copy-progress tracks bytes read by image handling processes
310
    percentage = 100 if percentage > 100 else percentage
311
    if percentage < 0:
312
        raise ValueError("Percentage cannot be negative")
313

    
314
    # FIXME: log a warning here, see #1033
315
#   if last_update > percentage:
316
#       raise ValueError("Build percentage should increase monotonically " \
317
#                        "(old = %d, new = %d)" % (last_update, percentage))
318

    
319
    # This assumes that no message of type 'ganeti-create-progress' is going to
320
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
321
    # the instance is STARTED.  What if the two messages are processed by two
322
    # separate dispatcher threads, and the 'ganeti-op-status' message for
323
    # successful creation gets processed before the 'ganeti-create-progress'
324
    # message? [vkoukis]
325
    #
326
    #if not vm.operstate == 'BUILD':
327
    #    raise VirtualMachine.IllegalState("VM is not in building state")
328

    
329
    vm.buildpercentage = percentage
330
    vm.backendtime = etime
331
    vm.save()
332

    
333

    
334
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
335
                               details=None):
336
    """
337
    Create virtual machine instance diagnostic entry.
338

339
    :param vm: VirtualMachine instance to create diagnostic for.
340
    :param message: Diagnostic message.
341
    :param source: Diagnostic source identifier (e.g. image-helper).
342
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
343
    :param etime: The time the message occured (if available).
344
    :param details: Additional details or debug information.
345
    """
346
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
347
                                                   source_date=etime,
348
                                                   message=message,
349
                                                   details=details)
350

    
351

    
352
def create_instance(vm, public_nic, flavor, image, password=None):
353
    """`image` is a dictionary which should contain the keys:
354
            'backend_id', 'format' and 'metadata'
355

356
        metadata value should be a dictionary.
357
    """
358

    
359
    # Handle arguments to CreateInstance() as a dictionary,
360
    # initialize it based on a deployment-specific value.
361
    # This enables the administrator to override deployment-specific
362
    # arguments, such as the disk template to use, name of os provider
363
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
364
    #
365
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
366
    kw['mode'] = 'create'
367
    kw['name'] = vm.backend_vm_id
368
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
369

    
370
    kw['disk_template'] = flavor.disk_template
371
    kw['disks'] = [{"size": flavor.disk * 1024}]
372
    provider = flavor.disk_provider
373
    if provider:
374
        kw['disks'][0]['provider'] = provider
375

    
376
        if provider == 'vlmc':
377
            kw['disks'][0]['origin'] = flavor.disk_origin
378

    
379
    kw['nics'] = [public_nic]
380
    if settings.GANETI_USE_HOTPLUG:
381
        kw['hotplug'] = True
382
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
383
    # kw['os'] = settings.GANETI_OS_PROVIDER
384
    kw['ip_check'] = False
385
    kw['name_check'] = False
386

    
387
    # Do not specific a node explicitly, have
388
    # Ganeti use an iallocator instead
389
    #kw['pnode'] = rapi.GetNodes()[0]
390

    
391
    kw['dry_run'] = settings.TEST
392

    
393
    kw['beparams'] = {
394
        'auto_balance': True,
395
        'vcpus': flavor.cpu,
396
        'memory': flavor.ram}
397

    
398
    kw['osparams'] = {
399
        'config_url': vm.config_url,
400
        # Store image id and format to Ganeti
401
        'img_id': image['backend_id'],
402
        'img_format': image['format']}
403

    
404
    if password:
405
        # Only for admin created VMs !!
406
        kw['osparams']['img_passwd'] = password
407

    
408
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
409
    # kw['hvparams'] = dict(serial_console=False)
410

    
411
    log.debug("Creating instance %s", utils.hide_pass(kw))
412
    with pooled_rapi_client(vm) as client:
413
        return client.CreateInstance(**kw)
414

    
415

    
416
def delete_instance(vm):
417
    with pooled_rapi_client(vm) as client:
418
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
419

    
420

    
421
def reboot_instance(vm, reboot_type):
422
    assert reboot_type in ('soft', 'hard')
423
    with pooled_rapi_client(vm) as client:
424
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
425
                                     dry_run=settings.TEST)
426

    
427

    
428
def startup_instance(vm):
429
    with pooled_rapi_client(vm) as client:
430
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
431

    
432

    
433
def shutdown_instance(vm):
434
    with pooled_rapi_client(vm) as client:
435
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
436

    
437

    
438
def get_instance_console(vm):
439
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
440
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
441
    # useless (see #783).
442
    #
443
    # Until this is fixed on the Ganeti side, construct a console info reply
444
    # directly.
445
    #
446
    # WARNING: This assumes that VNC runs on port network_port on
447
    #          the instance's primary node, and is probably
448
    #          hypervisor-specific.
449
    #
450
    log.debug("Getting console for vm %s", vm)
451

    
452
    console = {}
453
    console['kind'] = 'vnc'
454

    
455
    with pooled_rapi_client(vm) as client:
456
        i = client.GetInstance(vm.backend_vm_id)
457

    
458
    if i['hvparams']['serial_console']:
459
        raise Exception("hv parameter serial_console cannot be true")
460
    console['host'] = i['pnode']
461
    console['port'] = i['network_port']
462

    
463
    return console
464

    
465

    
466
def get_instance_info(vm):
467
    with pooled_rapi_client(vm) as client:
468
        return client.GetInstanceInfo(vm.backend_vm_id)
469

    
470

    
471
def create_network(network, backends=None, connect=True):
472
    """Create and connect a network."""
473
    if not backends:
474
        backends = Backend.objects.exclude(offline=True)
475

    
476
    log.debug("Creating network %s in backends %s", network, backends)
477

    
478
    for backend in backends:
479
        create_jobID = _create_network(network, backend)
480
        if connect:
481
            connect_network(network, backend, create_jobID)
482

    
483

    
484
def _create_network(network, backend):
485
    """Create a network."""
486

    
487
    network_type = network.public and 'public' or 'private'
488

    
489
    tags = network.backend_tag
490
    if network.dhcp:
491
        tags.append('nfdhcpd')
492

    
493
    if network.public:
494
        conflicts_check = True
495
    else:
496
        conflicts_check = False
497

    
498
    try:
499
        bn = BackendNetwork.objects.get(network=network, backend=backend)
500
        mac_prefix = bn.mac_prefix
501
    except BackendNetwork.DoesNotExist:
502
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
503
                        " does not exist" % (network.id, backend.id))
504

    
505
    with pooled_rapi_client(backend) as client:
506
        return client.CreateNetwork(network_name=network.backend_id,
507
                                    network=network.subnet,
508
                                    network6=network.subnet6,
509
                                    gateway=network.gateway,
510
                                    gateway6=network.gateway6,
511
                                    network_type=network_type,
512
                                    mac_prefix=mac_prefix,
513
                                    conflicts_check=conflicts_check,
514
                                    tags=tags)
515

    
516

    
517
def connect_network(network, backend, depend_job=None, group=None):
518
    """Connect a network to nodegroups."""
519
    log.debug("Connecting network %s to backend %s", network, backend)
520

    
521
    if network.public:
522
        conflicts_check = True
523
    else:
524
        conflicts_check = False
525

    
526
    depend_jobs = [depend_job] if depend_job else []
527
    with pooled_rapi_client(backend) as client:
528
        if group:
529
            client.ConnectNetwork(network.backend_id, group, network.mode,
530
                                  network.link, conflicts_check, depend_jobs)
531
        else:
532
            for group in client.GetGroups():
533
                client.ConnectNetwork(network.backend_id, group, network.mode,
534
                                      network.link, conflicts_check,
535
                                      depend_jobs)
536

    
537

    
538
def delete_network(network, backends=None, disconnect=True):
539
    if not backends:
540
        backends = Backend.objects.exclude(offline=True)
541

    
542
    log.debug("Deleting network %s from backends %s", network, backends)
543

    
544
    for backend in backends:
545
        disconnect_jobIDs = []
546
        if disconnect:
547
            disconnect_jobIDs = disconnect_network(network, backend)
548
        _delete_network(network, backend, disconnect_jobIDs)
549

    
550

    
551
def _delete_network(network, backend, depend_jobs=[]):
552
    with pooled_rapi_client(backend) as client:
553
        return client.DeleteNetwork(network.backend_id, depend_jobs)
554

    
555

    
556
def disconnect_network(network, backend, group=None):
557
    log.debug("Disconnecting network %s to backend %s", network, backend)
558

    
559
    with pooled_rapi_client(backend) as client:
560
        if group:
561
            return [client.DisconnectNetwork(network.backend_id, group)]
562
        else:
563
            jobs = []
564
            for group in client.GetGroups():
565
                job = client.DisconnectNetwork(network.backend_id, group)
566
                jobs.append(job)
567
            return jobs
568

    
569

    
570
def connect_to_network(vm, network, address=None):
571
    nic = {'ip': address, 'network': network.backend_id}
572

    
573
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
574

    
575
    with pooled_rapi_client(vm) as client:
576
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
577
                                     hotplug=settings.GANETI_USE_HOTPLUG,
578
                                     dry_run=settings.TEST)
579

    
580

    
581
def disconnect_from_network(vm, nic):
582
    op = [('remove', nic.index, {})]
583

    
584
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
585

    
586
    with pooled_rapi_client(vm) as client:
587
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
588
                                     hotplug=settings.GANETI_USE_HOTPLUG,
589
                                     dry_run=settings.TEST)
590

    
591

    
592
def set_firewall_profile(vm, profile):
593
    try:
594
        tag = _firewall_tags[profile]
595
    except KeyError:
596
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
597

    
598
    log.debug("Setting tag of VM %s to %s", vm, profile)
599

    
600
    with pooled_rapi_client(vm) as client:
601
        # Delete all firewall tags
602
        for t in _firewall_tags.values():
603
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
604
                                      dry_run=settings.TEST)
605

    
606
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
607

    
608
        # XXX NOP ModifyInstance call to force process_net_status to run
609
        # on the dispatcher
610
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
611
        client.ModifyInstance(vm.backend_vm_id,
612
                              os_name=os_name)
613

    
614

    
615
def get_ganeti_instances(backend=None, bulk=False):
616
    instances = []
617
    for backend in get_backends(backend):
618
        with pooled_rapi_client(backend) as client:
619
            instances.append(client.GetInstances(bulk=bulk))
620

    
621
    return reduce(list.__add__, instances, [])
622

    
623

    
624
def get_ganeti_nodes(backend=None, bulk=False):
625
    nodes = []
626
    for backend in get_backends(backend):
627
        with pooled_rapi_client(backend) as client:
628
            nodes.append(client.GetNodes(bulk=bulk))
629

    
630
    return reduce(list.__add__, nodes, [])
631

    
632

    
633
def get_ganeti_jobs(backend=None, bulk=False):
634
    jobs = []
635
    for backend in get_backends(backend):
636
        with pooled_rapi_client(backend) as client:
637
            jobs.append(client.GetJobs(bulk=bulk))
638
    return reduce(list.__add__, jobs, [])
639

    
640
##
641
##
642
##
643

    
644

    
645
def get_backends(backend=None):
646
    if backend:
647
        if backend.offline:
648
            return []
649
        return [backend]
650
    return Backend.objects.filter(offline=False)
651

    
652

    
653
def get_physical_resources(backend):
654
    """ Get the physical resources of a backend.
655

656
    Get the resources of a backend as reported by the backend (not the db).
657

658
    """
659
    nodes = get_ganeti_nodes(backend, bulk=True)
660
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
661
    res = {}
662
    for a in attr:
663
        res[a] = 0
664
    for n in nodes:
665
        # Filter out drained, offline and not vm_capable nodes since they will
666
        # not take part in the vm allocation process
667
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
668
        if can_host_vms and n['cnodes']:
669
            for a in attr:
670
                res[a] += int(n[a])
671
    return res
672

    
673

    
674
def update_resources(backend, resources=None):
675
    """ Update the state of the backend resources in db.
676

677
    """
678

    
679
    if not resources:
680
        resources = get_physical_resources(backend)
681

    
682
    backend.mfree = resources['mfree']
683
    backend.mtotal = resources['mtotal']
684
    backend.dfree = resources['dfree']
685
    backend.dtotal = resources['dtotal']
686
    backend.pinst_cnt = resources['pinst_cnt']
687
    backend.ctotal = resources['ctotal']
688
    backend.updated = datetime.now()
689
    backend.save()
690

    
691

    
692
def get_memory_from_instances(backend):
693
    """ Get the memory that is used from instances.
694

695
    Get the used memory of a backend. Note: This is different for
696
    the real memory used, due to kvm's memory de-duplication.
697

698
    """
699
    with pooled_rapi_client(backend) as client:
700
        instances = client.GetInstances(bulk=True)
701
    mem = 0
702
    for i in instances:
703
        mem += i['oper_ram']
704
    return mem
705

    
706
##
707
## Synchronized operations for reconciliation
708
##
709

    
710

    
711
def create_network_synced(network, backend):
712
    result = _create_network_synced(network, backend)
713
    if result[0] != 'success':
714
        return result
715
    result = connect_network_synced(network, backend)
716
    return result
717

    
718

    
719
def _create_network_synced(network, backend):
720
    with pooled_rapi_client(backend) as client:
721
        job = _create_network(network, backend)
722
        result = wait_for_job(client, job)
723
    return result
724

    
725

    
726
def connect_network_synced(network, backend):
727
    with pooled_rapi_client(backend) as client:
728
        for group in client.GetGroups():
729
            job = client.ConnectNetwork(network.backend_id, group,
730
                                        network.mode, network.link)
731
            result = wait_for_job(client, job)
732
            if result[0] != 'success':
733
                return result
734

    
735
    return result
736

    
737

    
738
def wait_for_job(client, jobid):
739
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
740
    status = result['job_info'][0]
741
    while status not in ['success', 'error', 'cancel']:
742
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
743
                                         [result], None)
744
        status = result['job_info'][0]
745

    
746
    if status == 'success':
747
        return (status, None)
748
    else:
749
        error = result['job_info'][1]
750
        return (status, error)