Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 2509ce17

History | View | Annotate | Download (25.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, BridgePoolTable,
43
                               MacPrefixPoolTable, VirtualMachineDiagnostic)
44
from synnefo.logic import utils
45
from synnefo import quotas
46
from synnefo.api.util import release_resource
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
@transaction.commit_on_success
61
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
62
    """Process a job progress notification from the backend
63

64
    Process an incoming message from the backend (currently Ganeti).
65
    Job notifications with a terminating status (sucess, error, or canceled),
66
    also update the operating state of the VM.
67

68
    """
69
    # See #1492, #1031, #1111 why this line has been removed
70
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
71
    if status not in [x[0] for x in BACKEND_STATUSES]:
72
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
73

    
74
    vm.backendjobid = jobid
75
    vm.backendjobstatus = status
76
    vm.backendopcode = opcode
77
    vm.backendlogmsg = logmsg
78

    
79
    # Notifications of success change the operating state
80
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
81
    if status == 'success' and state_for_success is not None:
82
        vm.operstate = state_for_success
83

    
84
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
85
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
86
        vm.operstate = 'ERROR'
87
        vm.backendtime = etime
88
    elif opcode == 'OP_INSTANCE_REMOVE':
89
        # Set the deleted flag explicitly, cater for admin-initiated removals
90
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
91
        # when no instance exists at the Ganeti backend.
92
        # See ticket #799 for all the details.
93
        #
94
        if status == 'success' or (status == 'error' and
95
                                   vm.operstate == 'ERROR'):
96
            release_instance_nics(vm)
97
            vm.nics.all().delete()
98
            vm.deleted = True
99
            vm.operstate = state_for_success
100
            vm.backendtime = etime
101
            # Issue and accept commission to Quotaholder
102
            quotas.issue_and_accept_commission(vm, delete=True)
103

    
104
    # Update backendtime only for jobs that have been successfully completed,
105
    # since only these jobs update the state of the VM. Else a "race condition"
106
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
107
    # before an error job and messages arrive in reversed order.
108
    if status == 'success':
109
        vm.backendtime = etime
110

    
111
    vm.save()
112

    
113

    
114
@transaction.commit_on_success
115
def process_net_status(vm, etime, nics):
116
    """Process a net status notification from the backend
117

118
    Process an incoming message from the Ganeti backend,
119
    detailing the NIC configuration of a VM instance.
120

121
    Update the state of the VM in the DB accordingly.
122
    """
123

    
124
    ganeti_nics = process_ganeti_nics(nics)
125
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
126
        log.debug("NICs for VM %s have not changed", vm)
127

    
128
    release_instance_nics(vm)
129

    
130
    for nic in ganeti_nics:
131
        ipv4 = nic.get('ipv4', '')
132
        net = nic['network']
133
        if ipv4:
134
            net.reserve_address(ipv4)
135

    
136
        nic['dirty'] = False
137
        vm.nics.create(**nic)
138
        # Dummy save the network, because UI uses changed-since for VMs
139
        # and Networks in order to show the VM NICs
140
        net.save()
141

    
142
    vm.backendtime = etime
143
    vm.save()
144

    
145

    
146
def process_ganeti_nics(ganeti_nics):
147
    """Process NIC dict from ganeti hooks."""
148
    new_nics = []
149
    for i, new_nic in enumerate(ganeti_nics):
150
        network = new_nic.get('network', '')
151
        n = str(network)
152
        pk = utils.id_from_network_name(n)
153

    
154
        net = Network.objects.get(pk=pk)
155

    
156
        # Get the new nic info
157
        mac = new_nic.get('mac', '')
158
        ipv4 = new_nic.get('ip', '')
159
        ipv6 = new_nic.get('ipv6', '')
160

    
161
        firewall = new_nic.get('firewall', '')
162
        firewall_profile = _reverse_tags.get(firewall, '')
163
        if not firewall_profile and net.public:
164
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
165

    
166
        nic = {
167
            'index': i,
168
            'network': net,
169
            'mac': mac,
170
            'ipv4': ipv4,
171
            'ipv6': ipv6,
172
            'firewall_profile': firewall_profile,
173
            'state': 'ACTIVE'}
174

    
175
        new_nics.append(nic)
176
    return new_nics
177

    
178

    
179
def nics_changed(old_nics, new_nics):
180
    """Return True if NICs have changed in any way."""
181
    if len(old_nics) != len(new_nics):
182
        return True
183
    for old_nic, new_nic in zip(old_nics, new_nics):
184
        if not (old_nic.ipv4 == new_nic['ipv4'] and
185
                old_nic.ipv6 == new_nic['ipv6'] and
186
                old_nic.mac == new_nic['mac'] and
187
                old_nic.firewall_profile == new_nic['firewall_profile'] and
188
                old_nic.index == new_nic['index'] and
189
                old_nic.network == new_nic['network']):
190
            return True
191
    return False
192

    
193

    
194
def release_instance_nics(vm):
195
    for nic in vm.nics.all():
196
        net = nic.network
197
        if nic.ipv4:
198
            net.release_address(nic.ipv4)
199
        nic.delete()
200
        net.save()
201

    
202

    
203
@transaction.commit_on_success
204
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
205
    if status not in [x[0] for x in BACKEND_STATUSES]:
206
        raise Network.InvalidBackendMsgError(opcode, status)
207

    
208
    back_network.backendjobid = jobid
209
    back_network.backendjobstatus = status
210
    back_network.backendopcode = opcode
211
    back_network.backendlogmsg = logmsg
212

    
213
    # Notifications of success change the operating state
214
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
215
    if status == 'success' and state_for_success is not None:
216
        back_network.operstate = state_for_success
217

    
218
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
219
        back_network.operstate = 'ERROR'
220
        back_network.backendtime = etime
221

    
222
    if opcode == 'OP_NETWORK_REMOVE':
223
        if status == 'success' or (status == 'error' and
224
                                   back_network.operstate == 'ERROR'):
225
            back_network.operstate = state_for_success
226
            back_network.deleted = True
227
            back_network.backendtime = etime
228

    
229
    if status == 'success':
230
        back_network.backendtime = etime
231
    back_network.save()
232
    # Also you must update the state of the Network!!
233
    update_network_state(back_network.network)
234

    
235

    
236
def update_network_state(network):
237
    old_state = network.state
238

    
239
    backend_states = [s.operstate for s in
240
                      network.backend_networks.filter(backend__offline=False)]
241
    if not backend_states:
242
        network.state = 'PENDING'
243
        network.save()
244
        return
245

    
246
    all_equal = len(set(backend_states)) <= 1
247
    network.state = all_equal and backend_states[0] or 'PENDING'
248
    # Release the resources on the deletion of the Network
249
    if old_state != 'DELETED' and network.state == 'DELETED':
250
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
251
                 network.id, network.mac_prefix, network.link)
252
        network.deleted = True
253
        if network.mac_prefix:
254
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
255
                release_resource(res_type="mac_prefix",
256
                                 value=network.mac_prefix)
257
        if network.link:
258
            if network.FLAVORS[network.flavor]["link"] == "pool":
259
                release_resource(res_type="bridge", value=network.link)
260

    
261
        # Issue commission
262
        if network.userid:
263
            quotas.issue_and_accept_commission(network, delete=True)
264
        elif not network.public:
265
            log.warning("Network %s does not have an owner!", network.id)
266
    network.save()
267

    
268

    
269
@transaction.commit_on_success
270
def process_network_modify(back_network, etime, jobid, opcode, status,
271
                           add_reserved_ips, remove_reserved_ips):
272
    assert (opcode == "OP_NETWORK_SET_PARAMS")
273
    if status not in [x[0] for x in BACKEND_STATUSES]:
274
        raise Network.InvalidBackendMsgError(opcode, status)
275

    
276
    back_network.backendjobid = jobid
277
    back_network.backendjobstatus = status
278
    back_network.opcode = opcode
279

    
280
    if add_reserved_ips or remove_reserved_ips:
281
        net = back_network.network
282
        pool = net.get_pool()
283
        if add_reserved_ips:
284
            for ip in add_reserved_ips:
285
                pool.reserve(ip, external=True)
286
        if remove_reserved_ips:
287
            for ip in remove_reserved_ips:
288
                pool.put(ip, external=True)
289
        pool.save()
290

    
291
    if status == 'success':
292
        back_network.backendtime = etime
293
    back_network.save()
294

    
295

    
296
@transaction.commit_on_success
297
def process_create_progress(vm, etime, progress):
298

    
299
    percentage = int(progress)
300

    
301
    # The percentage may exceed 100%, due to the way
302
    # snf-image:copy-progress tracks bytes read by image handling processes
303
    percentage = 100 if percentage > 100 else percentage
304
    if percentage < 0:
305
        raise ValueError("Percentage cannot be negative")
306

    
307
    # FIXME: log a warning here, see #1033
308
#   if last_update > percentage:
309
#       raise ValueError("Build percentage should increase monotonically " \
310
#                        "(old = %d, new = %d)" % (last_update, percentage))
311

    
312
    # This assumes that no message of type 'ganeti-create-progress' is going to
313
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
314
    # the instance is STARTED.  What if the two messages are processed by two
315
    # separate dispatcher threads, and the 'ganeti-op-status' message for
316
    # successful creation gets processed before the 'ganeti-create-progress'
317
    # message? [vkoukis]
318
    #
319
    #if not vm.operstate == 'BUILD':
320
    #    raise VirtualMachine.IllegalState("VM is not in building state")
321

    
322
    vm.buildpercentage = percentage
323
    vm.backendtime = etime
324
    vm.save()
325

    
326

    
327
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
328
                               details=None):
329
    """
330
    Create virtual machine instance diagnostic entry.
331

332
    :param vm: VirtualMachine instance to create diagnostic for.
333
    :param message: Diagnostic message.
334
    :param source: Diagnostic source identifier (e.g. image-helper).
335
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
336
    :param etime: The time the message occured (if available).
337
    :param details: Additional details or debug information.
338
    """
339
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
340
                                                   source_date=etime,
341
                                                   message=message,
342
                                                   details=details)
343

    
344

    
345
def create_instance(vm, public_nic, flavor, image):
346
    """`image` is a dictionary which should contain the keys:
347
            'backend_id', 'format' and 'metadata'
348

349
        metadata value should be a dictionary.
350
    """
351

    
352
    # Handle arguments to CreateInstance() as a dictionary,
353
    # initialize it based on a deployment-specific value.
354
    # This enables the administrator to override deployment-specific
355
    # arguments, such as the disk template to use, name of os provider
356
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
357
    #
358
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
359
    kw['mode'] = 'create'
360
    kw['name'] = vm.backend_vm_id
361
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
362

    
363
    kw['disk_template'] = flavor.disk_template
364
    kw['disks'] = [{"size": flavor.disk * 1024}]
365
    provider = flavor.disk_provider
366
    if provider:
367
        kw['disks'][0]['provider'] = provider
368

    
369
        if provider == 'vlmc':
370
            kw['disks'][0]['origin'] = flavor.disk_origin
371

    
372
    kw['nics'] = [public_nic]
373
    if settings.GANETI_USE_HOTPLUG:
374
        kw['hotplug'] = True
375
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
376
    # kw['os'] = settings.GANETI_OS_PROVIDER
377
    kw['ip_check'] = False
378
    kw['name_check'] = False
379

    
380
    # Do not specific a node explicitly, have
381
    # Ganeti use an iallocator instead
382
    #kw['pnode'] = rapi.GetNodes()[0]
383

    
384
    kw['dry_run'] = settings.TEST
385

    
386
    kw['beparams'] = {
387
        'auto_balance': True,
388
        'vcpus': flavor.cpu,
389
        'memory': flavor.ram}
390

    
391
    kw['osparams'] = {
392
        'config_url': vm.config_url,
393
        # Store image id and format to Ganeti
394
        'img_id': image['backend_id'],
395
        'img_format': image['format']}
396

    
397
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
398
    # kw['hvparams'] = dict(serial_console=False)
399

    
400
    log.debug("Creating instance %s", utils.hide_pass(kw))
401
    with pooled_rapi_client(vm) as client:
402
        return client.CreateInstance(**kw)
403

    
404

    
405
def delete_instance(vm):
406
    with pooled_rapi_client(vm) as client:
407
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
408

    
409

    
410
def reboot_instance(vm, reboot_type):
411
    assert reboot_type in ('soft', 'hard')
412
    with pooled_rapi_client(vm) as client:
413
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
414
                                     dry_run=settings.TEST)
415

    
416

    
417
def startup_instance(vm):
418
    with pooled_rapi_client(vm) as client:
419
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
420

    
421

    
422
def shutdown_instance(vm):
423
    with pooled_rapi_client(vm) as client:
424
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
425

    
426

    
427
def get_instance_console(vm):
428
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
429
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
430
    # useless (see #783).
431
    #
432
    # Until this is fixed on the Ganeti side, construct a console info reply
433
    # directly.
434
    #
435
    # WARNING: This assumes that VNC runs on port network_port on
436
    #          the instance's primary node, and is probably
437
    #          hypervisor-specific.
438
    #
439
    log.debug("Getting console for vm %s", vm)
440

    
441
    console = {}
442
    console['kind'] = 'vnc'
443

    
444
    with pooled_rapi_client(vm) as client:
445
        i = client.GetInstance(vm.backend_vm_id)
446

    
447
    if i['hvparams']['serial_console']:
448
        raise Exception("hv parameter serial_console cannot be true")
449
    console['host'] = i['pnode']
450
    console['port'] = i['network_port']
451

    
452
    return console
453

    
454

    
455
def get_instance_info(vm):
456
    with pooled_rapi_client(vm) as client:
457
        return client.GetInstanceInfo(vm.backend_vm_id)
458

    
459

    
460
def create_network(network, backends=None, connect=True):
461
    """Create and connect a network."""
462
    if not backends:
463
        backends = Backend.objects.exclude(offline=True)
464

    
465
    log.debug("Creating network %s in backends %s", network, backends)
466

    
467
    for backend in backends:
468
        create_jobID = _create_network(network, backend)
469
        if connect:
470
            connect_network(network, backend, create_jobID)
471

    
472

    
473
def _create_network(network, backend):
474
    """Create a network."""
475

    
476
    network_type = network.public and 'public' or 'private'
477

    
478
    tags = network.backend_tag
479
    if network.dhcp:
480
        tags.append('nfdhcpd')
481

    
482
    if network.public:
483
        conflicts_check = True
484
    else:
485
        conflicts_check = False
486

    
487
    try:
488
        bn = BackendNetwork.objects.get(network=network, backend=backend)
489
        mac_prefix = bn.mac_prefix
490
    except BackendNetwork.DoesNotExist:
491
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
492
                        " does not exist" % (network.id, backend.id))
493

    
494
    with pooled_rapi_client(backend) as client:
495
        return client.CreateNetwork(network_name=network.backend_id,
496
                                    network=network.subnet,
497
                                    network6=network.subnet6,
498
                                    gateway=network.gateway,
499
                                    gateway6=network.gateway6,
500
                                    network_type=network_type,
501
                                    mac_prefix=mac_prefix,
502
                                    conflicts_check=conflicts_check,
503
                                    tags=tags)
504

    
505

    
506
def connect_network(network, backend, depend_job=None, group=None):
507
    """Connect a network to nodegroups."""
508
    log.debug("Connecting network %s to backend %s", network, backend)
509

    
510
    if network.public:
511
        conflicts_check = True
512
    else:
513
        conflicts_check = False
514

    
515
    depend_jobs = [depend_job] if depend_job else []
516
    with pooled_rapi_client(backend) as client:
517
        if group:
518
            client.ConnectNetwork(network.backend_id, group, network.mode,
519
                                  network.link, conflicts_check, depend_jobs)
520
        else:
521
            for group in client.GetGroups():
522
                client.ConnectNetwork(network.backend_id, group, network.mode,
523
                                      network.link, conflicts_check,
524
                                      depend_jobs)
525

    
526

    
527
def delete_network(network, backends=None, disconnect=True):
528
    if not backends:
529
        backends = Backend.objects.exclude(offline=True)
530

    
531
    log.debug("Deleting network %s from backends %s", network, backends)
532

    
533
    for backend in backends:
534
        disconnect_jobIDs = []
535
        if disconnect:
536
            disconnect_jobIDs = disconnect_network(network, backend)
537
        _delete_network(network, backend, disconnect_jobIDs)
538

    
539

    
540
def _delete_network(network, backend, depend_jobs=[]):
541
    with pooled_rapi_client(backend) as client:
542
        return client.DeleteNetwork(network.backend_id, depend_jobs)
543

    
544

    
545
def disconnect_network(network, backend, group=None):
546
    log.debug("Disconnecting network %s to backend %s", network, backend)
547

    
548
    with pooled_rapi_client(backend) as client:
549
        if group:
550
            return [client.DisconnectNetwork(network.backend_id, group)]
551
        else:
552
            jobs = []
553
            for group in client.GetGroups():
554
                job = client.DisconnectNetwork(network.backend_id, group)
555
                jobs.append(job)
556
            return jobs
557

    
558

    
559
def connect_to_network(vm, network, address=None):
560
    nic = {'ip': address, 'network': network.backend_id}
561

    
562
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
563

    
564
    with pooled_rapi_client(vm) as client:
565
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
566
                                     hotplug=settings.GANETI_USE_HOTPLUG,
567
                                     dry_run=settings.TEST)
568

    
569

    
570
def disconnect_from_network(vm, nic):
571
    op = [('remove', nic.index, {})]
572

    
573
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
574

    
575
    with pooled_rapi_client(vm) as client:
576
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
577
                                     hotplug=settings.GANETI_USE_HOTPLUG,
578
                                     dry_run=settings.TEST)
579

    
580

    
581
def set_firewall_profile(vm, profile):
582
    try:
583
        tag = _firewall_tags[profile]
584
    except KeyError:
585
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
586

    
587
    log.debug("Setting tag of VM %s to %s", vm, profile)
588

    
589
    with pooled_rapi_client(vm) as client:
590
        # Delete all firewall tags
591
        for t in _firewall_tags.values():
592
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
593
                                      dry_run=settings.TEST)
594

    
595
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
596

    
597
        # XXX NOP ModifyInstance call to force process_net_status to run
598
        # on the dispatcher
599
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
600
        client.ModifyInstance(vm.backend_vm_id,
601
                              os_name=os_name)
602

    
603

    
604
def get_ganeti_instances(backend=None, bulk=False):
605
    instances = []
606
    for backend in get_backends(backend):
607
        with pooled_rapi_client(backend) as client:
608
            instances.append(client.GetInstances(bulk=bulk))
609

    
610
    return reduce(list.__add__, instances, [])
611

    
612

    
613
def get_ganeti_nodes(backend=None, bulk=False):
614
    nodes = []
615
    for backend in get_backends(backend):
616
        with pooled_rapi_client(backend) as client:
617
            nodes.append(client.GetNodes(bulk=bulk))
618

    
619
    return reduce(list.__add__, nodes, [])
620

    
621

    
622
def get_ganeti_jobs(backend=None, bulk=False):
623
    jobs = []
624
    for backend in get_backends(backend):
625
        with pooled_rapi_client(backend) as client:
626
            jobs.append(client.GetJobs(bulk=bulk))
627
    return reduce(list.__add__, jobs, [])
628

    
629
##
630
##
631
##
632

    
633

    
634
def get_backends(backend=None):
635
    if backend:
636
        if backend.offline:
637
            return []
638
        return [backend]
639
    return Backend.objects.filter(offline=False)
640

    
641

    
642
def get_physical_resources(backend):
643
    """ Get the physical resources of a backend.
644

645
    Get the resources of a backend as reported by the backend (not the db).
646

647
    """
648
    nodes = get_ganeti_nodes(backend, bulk=True)
649
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
650
    res = {}
651
    for a in attr:
652
        res[a] = 0
653
    for n in nodes:
654
        # Filter out drained, offline and not vm_capable nodes since they will
655
        # not take part in the vm allocation process
656
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
657
        if can_host_vms and n['cnodes']:
658
            for a in attr:
659
                res[a] += int(n[a])
660
    return res
661

    
662

    
663
def update_resources(backend, resources=None):
664
    """ Update the state of the backend resources in db.
665

666
    """
667

    
668
    if not resources:
669
        resources = get_physical_resources(backend)
670

    
671
    backend.mfree = resources['mfree']
672
    backend.mtotal = resources['mtotal']
673
    backend.dfree = resources['dfree']
674
    backend.dtotal = resources['dtotal']
675
    backend.pinst_cnt = resources['pinst_cnt']
676
    backend.ctotal = resources['ctotal']
677
    backend.updated = datetime.now()
678
    backend.save()
679

    
680

    
681
def get_memory_from_instances(backend):
682
    """ Get the memory that is used from instances.
683

684
    Get the used memory of a backend. Note: This is different for
685
    the real memory used, due to kvm's memory de-duplication.
686

687
    """
688
    with pooled_rapi_client(backend) as client:
689
        instances = client.GetInstances(bulk=True)
690
    mem = 0
691
    for i in instances:
692
        mem += i['oper_ram']
693
    return mem
694

    
695
##
696
## Synchronized operations for reconciliation
697
##
698

    
699

    
700
def create_network_synced(network, backend):
701
    result = _create_network_synced(network, backend)
702
    if result[0] != 'success':
703
        return result
704
    result = connect_network_synced(network, backend)
705
    return result
706

    
707

    
708
def _create_network_synced(network, backend):
709
    with pooled_rapi_client(backend) as client:
710
        job = _create_network(network, backend)
711
        result = wait_for_job(client, job)
712
    return result
713

    
714

    
715
def connect_network_synced(network, backend):
716
    with pooled_rapi_client(backend) as client:
717
        for group in client.GetGroups():
718
            job = client.ConnectNetwork(network.backend_id, group,
719
                                        network.mode, network.link)
720
            result = wait_for_job(client, job)
721
            if result[0] != 'success':
722
                return result
723

    
724
    return result
725

    
726

    
727
def wait_for_job(client, jobid):
728
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
729
    status = result['job_info'][0]
730
    while status not in ['success', 'error', 'cancel']:
731
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
732
                                         [result], None)
733
        status = result['job_info'][0]
734

    
735
    if status == 'success':
736
        return (status, None)
737
    else:
738
        error = result['job_info'][1]
739
        return (status, error)