Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 006c6249

History | View | Annotate | Download (25.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, BridgePoolTable,
43
                               MacPrefixPoolTable, VirtualMachineDiagnostic)
44
from synnefo.logic import utils
45
from synnefo import quotas
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
@quotas.uses_commission
60
@transaction.commit_on_success
61
def process_op_status(serials, vm, etime, jobid, opcode, status, logmsg):
62
    """Process a job progress notification from the backend
63

64
    Process an incoming message from the backend (currently Ganeti).
65
    Job notifications with a terminating status (sucess, error, or canceled),
66
    also update the operating state of the VM.
67

68
    """
69
    # See #1492, #1031, #1111 why this line has been removed
70
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
71
    if status not in [x[0] for x in BACKEND_STATUSES]:
72
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
73

    
74
    vm.backendjobid = jobid
75
    vm.backendjobstatus = status
76
    vm.backendopcode = opcode
77
    vm.backendlogmsg = logmsg
78

    
79
    # Notifications of success change the operating state
80
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
81
    if status == 'success' and state_for_success is not None:
82
        vm.operstate = state_for_success
83

    
84
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
85
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
86
        vm.operstate = 'ERROR'
87
        vm.backendtime = etime
88
    elif opcode == 'OP_INSTANCE_REMOVE':
89
        # Set the deleted flag explicitly, cater for admin-initiated removals
90
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
91
        # when no instance exists at the Ganeti backend.
92
        # See ticket #799 for all the details.
93
        #
94
        if status == 'success' or (status == 'error' and
95
                                   vm.operstate == 'ERROR'):
96
            # Issue commission
97
            serial = quotas.issue_vm_commission(vm.userid, vm.flavor,
98
                                                delete=True)
99
            serials.append(serial)
100
            vm.serial = serial
101
            serial.accepted = True
102
            serial.save()
103
            release_instance_nics(vm)
104
            vm.nics.all().delete()
105
            vm.deleted = True
106
            vm.operstate = state_for_success
107
            vm.backendtime = etime
108

    
109
    # Update backendtime only for jobs that have been successfully completed,
110
    # since only these jobs update the state of the VM. Else a "race condition"
111
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
112
    # before an error job and messages arrive in reversed order.
113
    if status == 'success':
114
        vm.backendtime = etime
115

    
116
    vm.save()
117

    
118

    
119
@transaction.commit_on_success
120
def process_net_status(vm, etime, nics):
121
    """Process a net status notification from the backend
122

123
    Process an incoming message from the Ganeti backend,
124
    detailing the NIC configuration of a VM instance.
125

126
    Update the state of the VM in the DB accordingly.
127
    """
128

    
129
    ganeti_nics = process_ganeti_nics(nics)
130
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
131
        log.debug("NICs for VM %s have not changed", vm)
132

    
133
    release_instance_nics(vm)
134

    
135
    for nic in ganeti_nics:
136
        ipv4 = nic.get('ipv4', '')
137
        if ipv4:
138
            net = nic['network']
139
            net.reserve_address(ipv4)
140

    
141
        nic['dirty'] = False
142
        vm.nics.create(**nic)
143
        # Dummy save the network, because UI uses changed-since for VMs
144
        # and Networks in order to show the VM NICs
145
        net.save()
146

    
147
    vm.backendtime = etime
148
    vm.save()
149

    
150

    
151
def process_ganeti_nics(ganeti_nics):
152
    """Process NIC dict from ganeti hooks."""
153
    new_nics = []
154
    for i, new_nic in enumerate(ganeti_nics):
155
        network = new_nic.get('network', '')
156
        n = str(network)
157
        pk = utils.id_from_network_name(n)
158

    
159
        net = Network.objects.get(pk=pk)
160

    
161
        # Get the new nic info
162
        mac = new_nic.get('mac', '')
163
        ipv4 = new_nic.get('ip', '')
164
        ipv6 = new_nic.get('ipv6', '')
165

    
166
        firewall = new_nic.get('firewall', '')
167
        firewall_profile = _reverse_tags.get(firewall, '')
168
        if not firewall_profile and net.public:
169
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
170

    
171
        nic = {
172
               'index': i,
173
               'network': net,
174
               'mac': mac,
175
               'ipv4': ipv4,
176
               'ipv6': ipv6,
177
               'firewall_profile': firewall_profile}
178

    
179
        new_nics.append(nic)
180
    return new_nics
181

    
182

    
183
def nics_changed(old_nics, new_nics):
184
    """Return True if NICs have changed in any way."""
185
    if len(old_nics) != len(new_nics):
186
        return True
187
    for old_nic, new_nic in zip(old_nics, new_nics):
188
        if not (old_nic.ipv4 == new_nic['ipv4'] and\
189
                old_nic.ipv6 == new_nic['ipv6'] and\
190
                old_nic.mac == new_nic['mac'] and\
191
                old_nic.firewall_profile == new_nic['firewall_profile'] and\
192
                old_nic.index == new_nic['index'] and\
193
                old_nic.network == new_nic['network']):
194
            return True
195
    return False
196

    
197

    
198
def release_instance_nics(vm):
199
    for nic in vm.nics.all():
200
        net = nic.network
201
        if nic.ipv4:
202
            net.release_address(nic.ipv4)
203
        nic.delete()
204
        net.save()
205

    
206

    
207
@transaction.commit_on_success
208
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
209
    if status not in [x[0] for x in BACKEND_STATUSES]:
210
        raise Network.InvalidBackendMsgError(opcode, status)
211

    
212
    back_network.backendjobid = jobid
213
    back_network.backendjobstatus = status
214
    back_network.backendopcode = opcode
215
    back_network.backendlogmsg = logmsg
216

    
217
    # Notifications of success change the operating state
218
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
219
    if status == 'success' and state_for_success is not None:
220
        back_network.operstate = state_for_success
221

    
222
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
223
        utils.update_state(back_network, 'ERROR')
224
        back_network.backendtime = etime
225

    
226
    if opcode == 'OP_NETWORK_REMOVE':
227
        if status == 'success' or (status == 'error' and
228
                                   back_network.operstate == 'ERROR'):
229
            back_network.operstate = state_for_success
230
            back_network.deleted = True
231
            back_network.backendtime = etime
232

    
233
    if status == 'success':
234
        back_network.backendtime = etime
235
    back_network.save()
236
    # Also you must update the state of the Network!!
237
    update_network_state(back_network.network)
238

    
239

    
240
@quotas.uses_commission
241
def update_network_state(serials, network):
242
    old_state = network.state
243

    
244
    backend_states = [s.operstate for s in network.backend_networks.all()]
245
    if not backend_states:
246
        network.state = 'PENDING'
247
        network.save()
248
        return
249

    
250
    all_equal = len(set(backend_states)) <= 1
251
    network.state = all_equal and backend_states[0] or 'PENDING'
252

    
253
    # Release the resources on the deletion of the Network
254
    if old_state != 'DELETED' and network.state == 'DELETED':
255
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
256
                 network.id, network.mac_prefix, network.link)
257
        network.deleted = True
258
        if network.mac_prefix and network.type == 'PRIVATE_MAC_FILTERED':
259
            mac_pool = MacPrefixPoolTable.get_pool()
260
            mac_pool.put(network.mac_prefix)
261
            mac_pool.save()
262

    
263
        if network.link and network.type == 'PRIVATE_VLAN':
264
            bridge_pool = BridgePoolTable.get_pool()
265
            bridge_pool.put(network.link)
266
            bridge_pool.save()
267

    
268
        # Issue commission
269
        serial = quotas.issue_network_commission(network.userid, delete=True)
270
        serials.append(serial)
271
        network.serial = serial
272
        serial.accepted = True
273
        serial.save()
274

    
275
    network.save()
276

    
277

    
278
@transaction.commit_on_success
279
def process_network_modify(back_network, etime, jobid, opcode, status,
280
                           add_reserved_ips, remove_reserved_ips):
281
    assert (opcode == "OP_NETWORK_SET_PARAMS")
282
    if status not in [x[0] for x in BACKEND_STATUSES]:
283
        raise Network.InvalidBackendMsgError(opcode, status)
284

    
285
    back_network.backendjobid = jobid
286
    back_network.backendjobstatus = status
287
    back_network.opcode = opcode
288

    
289
    if add_reserved_ips or remove_reserved_ips:
290
        net = back_network.network
291
        pool = net.get_pool()
292
        if add_reserved_ips:
293
            for ip in add_reserved_ips:
294
                pool.reserve(ip, external=True)
295
        if remove_reserved_ips:
296
            for ip in remove_reserved_ips:
297
                pool.put(ip, external=True)
298
        pool.save()
299

    
300
    if status == 'success':
301
        back_network.backendtime = etime
302
    back_network.save()
303

    
304

    
305
@transaction.commit_on_success
306
def process_create_progress(vm, etime, progress):
307

    
308
    percentage = int(progress)
309

    
310
    # The percentage may exceed 100%, due to the way
311
    # snf-image:copy-progress tracks bytes read by image handling processes
312
    percentage = 100 if percentage > 100 else percentage
313
    if percentage < 0:
314
        raise ValueError("Percentage cannot be negative")
315

    
316
    # FIXME: log a warning here, see #1033
317
#   if last_update > percentage:
318
#       raise ValueError("Build percentage should increase monotonically " \
319
#                        "(old = %d, new = %d)" % (last_update, percentage))
320

    
321
    # This assumes that no message of type 'ganeti-create-progress' is going to
322
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
323
    # the instance is STARTED.  What if the two messages are processed by two
324
    # separate dispatcher threads, and the 'ganeti-op-status' message for
325
    # successful creation gets processed before the 'ganeti-create-progress'
326
    # message? [vkoukis]
327
    #
328
    #if not vm.operstate == 'BUILD':
329
    #    raise VirtualMachine.IllegalState("VM is not in building state")
330

    
331
    vm.buildpercentage = percentage
332
    vm.backendtime = etime
333
    vm.save()
334

    
335

    
336
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
337
    details=None):
338
    """
339
    Create virtual machine instance diagnostic entry.
340

341
    :param vm: VirtualMachine instance to create diagnostic for.
342
    :param message: Diagnostic message.
343
    :param source: Diagnostic source identifier (e.g. image-helper).
344
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
345
    :param etime: The time the message occured (if available).
346
    :param details: Additional details or debug information.
347
    """
348
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
349
            source_date=etime, message=message, details=details)
350

    
351

    
352
def create_instance(vm, public_nic, flavor, image, password=None):
353
    """`image` is a dictionary which should contain the keys:
354
            'backend_id', 'format' and 'metadata'
355

356
        metadata value should be a dictionary.
357
    """
358

    
359
    # Handle arguments to CreateInstance() as a dictionary,
360
    # initialize it based on a deployment-specific value.
361
    # This enables the administrator to override deployment-specific
362
    # arguments, such as the disk template to use, name of os provider
363
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
364
    #
365
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
366
    kw['mode'] = 'create'
367
    kw['name'] = vm.backend_vm_id
368
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
369

    
370
    kw['disk_template'] = flavor.disk_template
371
    kw['disks'] = [{"size": flavor.disk * 1024}]
372
    provider = flavor.disk_provider
373
    if provider:
374
        kw['disks'][0]['provider'] = provider
375

    
376
        if provider == 'vlmc':
377
            kw['disks'][0]['origin'] = flavor.disk_origin
378

    
379
    kw['nics'] = [public_nic]
380
    if settings.GANETI_USE_HOTPLUG:
381
        kw['hotplug'] = True
382
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
383
    # kw['os'] = settings.GANETI_OS_PROVIDER
384
    kw['ip_check'] = False
385
    kw['name_check'] = False
386

    
387
    # Do not specific a node explicitly, have
388
    # Ganeti use an iallocator instead
389
    #kw['pnode'] = rapi.GetNodes()[0]
390

    
391
    kw['dry_run'] = settings.TEST
392

    
393
    kw['beparams'] = {
394
       'auto_balance': True,
395
       'vcpus': flavor.cpu,
396
       'memory': flavor.ram}
397

    
398
    kw['osparams'] = {
399
        'config_url': vm.config_url,
400
        # Store image id and format to Ganeti
401
        'img_id': image['backend_id'],
402
        'img_format': image['format']}
403

    
404
    if password:
405
        # Only for admin created VMs !!
406
        kw['osparams']['img_passwd'] = password
407

    
408
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
409
    # kw['hvparams'] = dict(serial_console=False)
410

    
411
    log.debug("Creating instance %s", utils.hide_pass(kw))
412
    with pooled_rapi_client(vm) as client:
413
        return client.CreateInstance(**kw)
414

    
415

    
416
def delete_instance(vm):
417
    with pooled_rapi_client(vm) as client:
418
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
419

    
420

    
421
def reboot_instance(vm, reboot_type):
422
    assert reboot_type in ('soft', 'hard')
423
    with pooled_rapi_client(vm) as client:
424
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
425
                                     dry_run=settings.TEST)
426

    
427

    
428
def startup_instance(vm):
429
    with pooled_rapi_client(vm) as client:
430
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
431

    
432

    
433
def shutdown_instance(vm):
434
    with pooled_rapi_client(vm) as client:
435
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
436

    
437

    
438
def get_instance_console(vm):
439
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
440
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
441
    # useless (see #783).
442
    #
443
    # Until this is fixed on the Ganeti side, construct a console info reply
444
    # directly.
445
    #
446
    # WARNING: This assumes that VNC runs on port network_port on
447
    #          the instance's primary node, and is probably
448
    #          hypervisor-specific.
449
    #
450
    log.debug("Getting console for vm %s", vm)
451

    
452
    console = {}
453
    console['kind'] = 'vnc'
454

    
455
    with pooled_rapi_client(vm) as client:
456
        i = client.GetInstance(vm.backend_vm_id)
457

    
458
    if i['hvparams']['serial_console']:
459
        raise Exception("hv parameter serial_console cannot be true")
460
    console['host'] = i['pnode']
461
    console['port'] = i['network_port']
462

    
463
    return console
464

    
465

    
466
def get_instance_info(vm):
467
    with pooled_rapi_client(vm) as client:
468
        return client.GetInstanceInfo(vm.backend_vm_id)
469

    
470

    
471
def create_network(network, backends=None, connect=True):
472
    """Create and connect a network."""
473
    if not backends:
474
        backends = Backend.objects.exclude(offline=True)
475

    
476
    log.debug("Creating network %s in backends %s", network, backends)
477

    
478
    for backend in backends:
479
        create_jobID = _create_network(network, backend)
480
        if connect:
481
            connect_network(network, backend, create_jobID)
482

    
483

    
484
def _create_network(network, backend):
485
    """Create a network."""
486

    
487
    network_type = network.public and 'public' or 'private'
488

    
489
    tags = network.backend_tag
490
    if network.dhcp:
491
        tags.append('nfdhcpd')
492

    
493
    if network.public:
494
        conflicts_check = True
495
    else:
496
        conflicts_check = False
497

    
498
    try:
499
        bn = BackendNetwork.objects.get(network=network, backend=backend)
500
        mac_prefix = bn.mac_prefix
501
    except BackendNetwork.DoesNotExist:
502
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
503
                        " does not exist" % (network.id, backend.id))
504

    
505
    with pooled_rapi_client(backend) as client:
506
        return client.CreateNetwork(network_name=network.backend_id,
507
                                    network=network.subnet,
508
                                    network6=network.subnet6,
509
                                    gateway=network.gateway,
510
                                    gateway6=network.gateway6,
511
                                    network_type=network_type,
512
                                    mac_prefix=mac_prefix,
513
                                    conflicts_check=conflicts_check,
514
                                    tags=tags)
515

    
516

    
517
def connect_network(network, backend, depend_job=None, group=None):
518
    """Connect a network to nodegroups."""
519
    log.debug("Connecting network %s to backend %s", network, backend)
520

    
521
    mode = "routed" if "ROUTED" in network.type else "bridged"
522

    
523
    if network.public:
524
        conflicts_check = True
525
    else:
526
        conflicts_check = False
527

    
528
    depend_jobs = [depend_job] if depend_job else []
529
    with pooled_rapi_client(backend) as client:
530
        if group:
531
            client.ConnectNetwork(network.backend_id, group, mode,
532
                                  network.link, conflicts_check, depend_jobs)
533
        else:
534
            for group in client.GetGroups():
535
                client.ConnectNetwork(network.backend_id, group, mode,
536
                                      network.link, conflicts_check,
537
                                      depend_jobs)
538

    
539

    
540
def delete_network(network, backends=None, disconnect=True):
541
    if not backends:
542
        backends = Backend.objects.exclude(offline=True)
543

    
544
    log.debug("Deleting network %s from backends %s", network, backends)
545

    
546
    for backend in backends:
547
        disconnect_jobIDs = []
548
        if disconnect:
549
            disconnect_jobIDs = disconnect_network(network, backend)
550
        _delete_network(network, backend, disconnect_jobIDs)
551

    
552

    
553
def _delete_network(network, backend, depend_jobs=[]):
554
    with pooled_rapi_client(backend) as client:
555
        return client.DeleteNetwork(network.backend_id, depend_jobs)
556

    
557

    
558
def disconnect_network(network, backend, group=None):
559
    log.debug("Disconnecting network %s to backend %s", network, backend)
560

    
561
    with pooled_rapi_client(backend) as client:
562
        if group:
563
            return [client.DisconnectNetwork(network.backend_id, group)]
564
        else:
565
            jobs = []
566
            for group in client.GetGroups():
567
                job = client.DisconnectNetwork(network.backend_id, group)
568
                jobs.append(job)
569
            return jobs
570

    
571

    
572
def connect_to_network(vm, network, address=None):
573
    nic = {'ip': address, 'network': network.backend_id}
574

    
575
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
576

    
577
    with pooled_rapi_client(vm) as client:
578
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
579
                                     hotplug=settings.GANETI_USE_HOTPLUG,
580
                                     dry_run=settings.TEST)
581

    
582

    
583
def disconnect_from_network(vm, nic):
584
    op = [('remove', nic.index, {})]
585

    
586
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
587

    
588
    with pooled_rapi_client(vm) as client:
589
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
590
                                     hotplug=settings.GANETI_USE_HOTPLUG,
591
                                     dry_run=settings.TEST)
592

    
593

    
594
def set_firewall_profile(vm, profile):
595
    try:
596
        tag = _firewall_tags[profile]
597
    except KeyError:
598
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
599

    
600
    log.debug("Setting tag of VM %s to %s", vm, profile)
601

    
602
    with pooled_rapi_client(vm) as client:
603
        # Delete all firewall tags
604
        for t in _firewall_tags.values():
605
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
606
                                      dry_run=settings.TEST)
607

    
608
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
609

    
610
        # XXX NOP ModifyInstance call to force process_net_status to run
611
        # on the dispatcher
612
        client.ModifyInstance(vm.backend_vm_id,
613
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
614

    
615

    
616
def get_ganeti_instances(backend=None, bulk=False):
617
    instances = []
618
    for backend in get_backends(backend):
619
        with pooled_rapi_client(backend) as client:
620
            instances.append(client.GetInstances(bulk=bulk))
621

    
622
    return reduce(list.__add__, instances, [])
623

    
624

    
625
def get_ganeti_nodes(backend=None, bulk=False):
626
    nodes = []
627
    for backend in get_backends(backend):
628
        with pooled_rapi_client(backend) as client:
629
            nodes.append(client.GetNodes(bulk=bulk))
630

    
631
    return reduce(list.__add__, nodes, [])
632

    
633

    
634
def get_ganeti_jobs(backend=None, bulk=False):
635
    jobs = []
636
    for backend in get_backends(backend):
637
        with pooled_rapi_client(backend) as client:
638
            jobs.append(client.GetJobs(bulk=bulk))
639
    return reduce(list.__add__, jobs, [])
640

    
641
##
642
##
643
##
644

    
645

    
646
def get_backends(backend=None):
647
    if backend:
648
        return [backend]
649
    return Backend.objects.filter(offline=False)
650

    
651

    
652
def get_physical_resources(backend):
653
    """ Get the physical resources of a backend.
654

655
    Get the resources of a backend as reported by the backend (not the db).
656

657
    """
658
    nodes = get_ganeti_nodes(backend, bulk=True)
659
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
660
    res = {}
661
    for a in attr:
662
        res[a] = 0
663
    for n in nodes:
664
        # Filter out drained, offline and not vm_capable nodes since they will
665
        # not take part in the vm allocation process
666
        if n['vm_capable'] and not n['drained'] and not n['offline']\
667
           and n['cnodes']:
668
            for a in attr:
669
                res[a] += int(n[a])
670
    return res
671

    
672

    
673
def update_resources(backend, resources=None):
674
    """ Update the state of the backend resources in db.
675

676
    """
677

    
678
    if not resources:
679
        resources = get_physical_resources(backend)
680

    
681
    backend.mfree = resources['mfree']
682
    backend.mtotal = resources['mtotal']
683
    backend.dfree = resources['dfree']
684
    backend.dtotal = resources['dtotal']
685
    backend.pinst_cnt = resources['pinst_cnt']
686
    backend.ctotal = resources['ctotal']
687
    backend.updated = datetime.now()
688
    backend.save()
689

    
690

    
691
def get_memory_from_instances(backend):
692
    """ Get the memory that is used from instances.
693

694
    Get the used memory of a backend. Note: This is different for
695
    the real memory used, due to kvm's memory de-duplication.
696

697
    """
698
    with pooled_rapi_client(backend) as client:
699
        instances = client.GetInstances(bulk=True)
700
    mem = 0
701
    for i in instances:
702
        mem += i['oper_ram']
703
    return mem
704

    
705
##
706
## Synchronized operations for reconciliation
707
##
708

    
709

    
710
def create_network_synced(network, backend):
711
    result = _create_network_synced(network, backend)
712
    if result[0] != 'success':
713
        return result
714
    result = connect_network_synced(network, backend)
715
    return result
716

    
717

    
718
def _create_network_synced(network, backend):
719
    with pooled_rapi_client(backend) as client:
720
        job = _create_network(network, backend)
721
        result = wait_for_job(client, job)
722
    return result
723

    
724

    
725
def connect_network_synced(network, backend):
726
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
727
        mode = 'routed'
728
    else:
729
        mode = 'bridged'
730
    with pooled_rapi_client(backend) as client:
731
        for group in client.GetGroups():
732
            job = client.ConnectNetwork(network.backend_id, group, mode,
733
                                        network.link)
734
            result = wait_for_job(client, job)
735
            if result[0] != 'success':
736
                return result
737

    
738
    return result
739

    
740

    
741
def wait_for_job(client, jobid):
742
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
743
    status = result['job_info'][0]
744
    while status not in ['success', 'error', 'cancel']:
745
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
746
                                        [result], None)
747
        status = result['job_info'][0]
748

    
749
    if status == 'success':
750
        return (status, None)
751
    else:
752
        error = result['job_info'][1]
753
        return (status, error)