Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 8283d6c1

History | View | Annotate | Download (25.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, BridgePoolTable,
43
                               MacPrefixPoolTable, VirtualMachineDiagnostic)
44
from synnefo.logic import utils
45
from synnefo import quotas
46
from synnefo.api.util import release_resource
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
@quotas.uses_commission
61
@transaction.commit_on_success
62
def process_op_status(serials, vm, etime, jobid, opcode, status, logmsg):
63
    """Process a job progress notification from the backend
64

65
    Process an incoming message from the backend (currently Ganeti).
66
    Job notifications with a terminating status (sucess, error, or canceled),
67
    also update the operating state of the VM.
68

69
    """
70
    # See #1492, #1031, #1111 why this line has been removed
71
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
72
    if status not in [x[0] for x in BACKEND_STATUSES]:
73
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
74

    
75
    vm.backendjobid = jobid
76
    vm.backendjobstatus = status
77
    vm.backendopcode = opcode
78
    vm.backendlogmsg = logmsg
79

    
80
    # Notifications of success change the operating state
81
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
82
    if status == 'success' and state_for_success is not None:
83
        vm.operstate = state_for_success
84

    
85
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
86
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
87
        vm.operstate = 'ERROR'
88
        vm.backendtime = etime
89
    elif opcode == 'OP_INSTANCE_REMOVE':
90
        # Set the deleted flag explicitly, cater for admin-initiated removals
91
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
        # when no instance exists at the Ganeti backend.
93
        # See ticket #799 for all the details.
94
        #
95
        if status == 'success' or (status == 'error' and
96
                                   vm.operstate == 'ERROR'):
97
            # Issue commission
98
            serial = quotas.issue_vm_commission(vm.userid, vm.flavor,
99
                                                delete=True)
100
            serials.append(serial)
101
            vm.serial = serial
102
            serial.accepted = True
103
            serial.save()
104
            release_instance_nics(vm)
105
            vm.nics.all().delete()
106
            vm.deleted = True
107
            vm.operstate = state_for_success
108
            vm.backendtime = etime
109

    
110
    # Update backendtime only for jobs that have been successfully completed,
111
    # since only these jobs update the state of the VM. Else a "race condition"
112
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
113
    # before an error job and messages arrive in reversed order.
114
    if status == 'success':
115
        vm.backendtime = etime
116

    
117
    vm.save()
118

    
119

    
120
@transaction.commit_on_success
121
def process_net_status(vm, etime, nics):
122
    """Process a net status notification from the backend
123

124
    Process an incoming message from the Ganeti backend,
125
    detailing the NIC configuration of a VM instance.
126

127
    Update the state of the VM in the DB accordingly.
128
    """
129

    
130
    ganeti_nics = process_ganeti_nics(nics)
131
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
132
        log.debug("NICs for VM %s have not changed", vm)
133

    
134
    release_instance_nics(vm)
135

    
136
    for nic in ganeti_nics:
137
        ipv4 = nic.get('ipv4', '')
138
        net = nic['network']
139
        if ipv4:
140
            net.reserve_address(ipv4)
141

    
142
        nic['dirty'] = False
143
        vm.nics.create(**nic)
144
        # Dummy save the network, because UI uses changed-since for VMs
145
        # and Networks in order to show the VM NICs
146
        net.save()
147

    
148
    vm.backendtime = etime
149
    vm.save()
150

    
151

    
152
def process_ganeti_nics(ganeti_nics):
153
    """Process NIC dict from ganeti hooks."""
154
    new_nics = []
155
    for i, new_nic in enumerate(ganeti_nics):
156
        network = new_nic.get('network', '')
157
        n = str(network)
158
        pk = utils.id_from_network_name(n)
159

    
160
        net = Network.objects.get(pk=pk)
161

    
162
        # Get the new nic info
163
        mac = new_nic.get('mac', '')
164
        ipv4 = new_nic.get('ip', '')
165
        ipv6 = new_nic.get('ipv6', '')
166

    
167
        firewall = new_nic.get('firewall', '')
168
        firewall_profile = _reverse_tags.get(firewall, '')
169
        if not firewall_profile and net.public:
170
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
171

    
172
        nic = {
173
            'index': i,
174
            'network': net,
175
            'mac': mac,
176
            'ipv4': ipv4,
177
            'ipv6': ipv6,
178
            'firewall_profile': firewall_profile}
179

    
180
        new_nics.append(nic)
181
    return new_nics
182

    
183

    
184
def nics_changed(old_nics, new_nics):
185
    """Return True if NICs have changed in any way."""
186
    if len(old_nics) != len(new_nics):
187
        return True
188
    for old_nic, new_nic in zip(old_nics, new_nics):
189
        if not (old_nic.ipv4 == new_nic['ipv4'] and
190
                old_nic.ipv6 == new_nic['ipv6'] and
191
                old_nic.mac == new_nic['mac'] and
192
                old_nic.firewall_profile == new_nic['firewall_profile'] and
193
                old_nic.index == new_nic['index'] and
194
                old_nic.network == new_nic['network']):
195
            return True
196
    return False
197

    
198

    
199
def release_instance_nics(vm):
200
    for nic in vm.nics.all():
201
        net = nic.network
202
        if nic.ipv4:
203
            net.release_address(nic.ipv4)
204
        nic.delete()
205
        net.save()
206

    
207

    
208
@transaction.commit_on_success
209
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
210
    if status not in [x[0] for x in BACKEND_STATUSES]:
211
        raise Network.InvalidBackendMsgError(opcode, status)
212

    
213
    back_network.backendjobid = jobid
214
    back_network.backendjobstatus = status
215
    back_network.backendopcode = opcode
216
    back_network.backendlogmsg = logmsg
217

    
218
    # Notifications of success change the operating state
219
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
220
    if status == 'success' and state_for_success is not None:
221
        back_network.operstate = state_for_success
222

    
223
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
224
        back_network.operstate = 'ERROR'
225
        back_network.backendtime = etime
226

    
227
    if opcode == 'OP_NETWORK_REMOVE':
228
        if status == 'success' or (status == 'error' and
229
                                   back_network.operstate == 'ERROR'):
230
            back_network.operstate = state_for_success
231
            back_network.deleted = True
232
            back_network.backendtime = etime
233

    
234
    if status == 'success':
235
        back_network.backendtime = etime
236
    back_network.save()
237
    # Also you must update the state of the Network!!
238
    update_network_state(back_network.network)
239

    
240

    
241
@quotas.uses_commission
242
def update_network_state(serials, network):
243
    old_state = network.state
244

    
245
    backend_states = [s.operstate for s in
246
                      network.backend_networks.filter(backend__offline=False)]
247
    if not backend_states:
248
        network.state = 'PENDING'
249
        network.save()
250
        return
251

    
252
    all_equal = len(set(backend_states)) <= 1
253
    network.state = all_equal and backend_states[0] or 'PENDING'
254
    # Release the resources on the deletion of the Network
255
    if old_state != 'DELETED' and network.state == 'DELETED':
256
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
257
                 network.id, network.mac_prefix, network.link)
258
        network.deleted = True
259
        if network.mac_prefix:
260
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
261
                release_resource(res_type="mac_prefix",
262
                                 value=network.mac_prefix)
263
        if network.link:
264
            if network.FLAVORS[network.flavor]["link"] == "pool":
265
                release_resource(res_type="bridge", value=network.link)
266

    
267
        # Issue commission
268
        serial = quotas.issue_network_commission(network.userid, delete=True)
269
        serials.append(serial)
270
        network.serial = serial
271
        serial.accepted = True
272
        serial.save()
273
    network.save()
274

    
275

    
276
@transaction.commit_on_success
277
def process_network_modify(back_network, etime, jobid, opcode, status,
278
                           add_reserved_ips, remove_reserved_ips):
279
    assert (opcode == "OP_NETWORK_SET_PARAMS")
280
    if status not in [x[0] for x in BACKEND_STATUSES]:
281
        raise Network.InvalidBackendMsgError(opcode, status)
282

    
283
    back_network.backendjobid = jobid
284
    back_network.backendjobstatus = status
285
    back_network.opcode = opcode
286

    
287
    if add_reserved_ips or remove_reserved_ips:
288
        net = back_network.network
289
        pool = net.get_pool()
290
        if add_reserved_ips:
291
            for ip in add_reserved_ips:
292
                pool.reserve(ip, external=True)
293
        if remove_reserved_ips:
294
            for ip in remove_reserved_ips:
295
                pool.put(ip, external=True)
296
        pool.save()
297

    
298
    if status == 'success':
299
        back_network.backendtime = etime
300
    back_network.save()
301

    
302

    
303
@transaction.commit_on_success
304
def process_create_progress(vm, etime, progress):
305

    
306
    percentage = int(progress)
307

    
308
    # The percentage may exceed 100%, due to the way
309
    # snf-image:copy-progress tracks bytes read by image handling processes
310
    percentage = 100 if percentage > 100 else percentage
311
    if percentage < 0:
312
        raise ValueError("Percentage cannot be negative")
313

    
314
    # FIXME: log a warning here, see #1033
315
#   if last_update > percentage:
316
#       raise ValueError("Build percentage should increase monotonically " \
317
#                        "(old = %d, new = %d)" % (last_update, percentage))
318

    
319
    # This assumes that no message of type 'ganeti-create-progress' is going to
320
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
321
    # the instance is STARTED.  What if the two messages are processed by two
322
    # separate dispatcher threads, and the 'ganeti-op-status' message for
323
    # successful creation gets processed before the 'ganeti-create-progress'
324
    # message? [vkoukis]
325
    #
326
    #if not vm.operstate == 'BUILD':
327
    #    raise VirtualMachine.IllegalState("VM is not in building state")
328

    
329
    vm.buildpercentage = percentage
330
    vm.backendtime = etime
331
    vm.save()
332

    
333

    
334
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
335
                               details=None):
336
    """
337
    Create virtual machine instance diagnostic entry.
338

339
    :param vm: VirtualMachine instance to create diagnostic for.
340
    :param message: Diagnostic message.
341
    :param source: Diagnostic source identifier (e.g. image-helper).
342
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
343
    :param etime: The time the message occured (if available).
344
    :param details: Additional details or debug information.
345
    """
346
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
347
                                                   source_date=etime,
348
                                                   message=message,
349
                                                   details=details)
350

    
351

    
352
def create_instance(vm, public_nic, flavor, image):
353
    """`image` is a dictionary which should contain the keys:
354
            'backend_id', 'format' and 'metadata'
355

356
        metadata value should be a dictionary.
357
    """
358

    
359
    # Handle arguments to CreateInstance() as a dictionary,
360
    # initialize it based on a deployment-specific value.
361
    # This enables the administrator to override deployment-specific
362
    # arguments, such as the disk template to use, name of os provider
363
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
364
    #
365
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
366
    kw['mode'] = 'create'
367
    kw['name'] = vm.backend_vm_id
368
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
369

    
370
    kw['disk_template'] = flavor.disk_template
371
    kw['disks'] = [{"size": flavor.disk * 1024}]
372
    provider = flavor.disk_provider
373
    if provider:
374
        kw['disks'][0]['provider'] = provider
375

    
376
        if provider == 'vlmc':
377
            kw['disks'][0]['origin'] = flavor.disk_origin
378

    
379
    kw['nics'] = [public_nic]
380
    if settings.GANETI_USE_HOTPLUG:
381
        kw['hotplug'] = True
382
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
383
    # kw['os'] = settings.GANETI_OS_PROVIDER
384
    kw['ip_check'] = False
385
    kw['name_check'] = False
386

    
387
    # Do not specific a node explicitly, have
388
    # Ganeti use an iallocator instead
389
    #kw['pnode'] = rapi.GetNodes()[0]
390

    
391
    kw['dry_run'] = settings.TEST
392

    
393
    kw['beparams'] = {
394
        'auto_balance': True,
395
        'vcpus': flavor.cpu,
396
        'memory': flavor.ram}
397

    
398
    kw['osparams'] = {
399
        'config_url': vm.config_url,
400
        # Store image id and format to Ganeti
401
        'img_id': image['backend_id'],
402
        'img_format': image['format']}
403

    
404
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
405
    # kw['hvparams'] = dict(serial_console=False)
406

    
407
    log.debug("Creating instance %s", utils.hide_pass(kw))
408
    with pooled_rapi_client(vm) as client:
409
        return client.CreateInstance(**kw)
410

    
411

    
412
def delete_instance(vm):
413
    with pooled_rapi_client(vm) as client:
414
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
415

    
416

    
417
def reboot_instance(vm, reboot_type):
418
    assert reboot_type in ('soft', 'hard')
419
    with pooled_rapi_client(vm) as client:
420
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
421
                                     dry_run=settings.TEST)
422

    
423

    
424
def startup_instance(vm):
425
    with pooled_rapi_client(vm) as client:
426
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
427

    
428

    
429
def shutdown_instance(vm):
430
    with pooled_rapi_client(vm) as client:
431
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
432

    
433

    
434
def get_instance_console(vm):
435
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
436
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
437
    # useless (see #783).
438
    #
439
    # Until this is fixed on the Ganeti side, construct a console info reply
440
    # directly.
441
    #
442
    # WARNING: This assumes that VNC runs on port network_port on
443
    #          the instance's primary node, and is probably
444
    #          hypervisor-specific.
445
    #
446
    log.debug("Getting console for vm %s", vm)
447

    
448
    console = {}
449
    console['kind'] = 'vnc'
450

    
451
    with pooled_rapi_client(vm) as client:
452
        i = client.GetInstance(vm.backend_vm_id)
453

    
454
    if i['hvparams']['serial_console']:
455
        raise Exception("hv parameter serial_console cannot be true")
456
    console['host'] = i['pnode']
457
    console['port'] = i['network_port']
458

    
459
    return console
460

    
461

    
462
def get_instance_info(vm):
463
    with pooled_rapi_client(vm) as client:
464
        return client.GetInstanceInfo(vm.backend_vm_id)
465

    
466

    
467
def create_network(network, backends=None, connect=True):
468
    """Create and connect a network."""
469
    if not backends:
470
        backends = Backend.objects.exclude(offline=True)
471

    
472
    log.debug("Creating network %s in backends %s", network, backends)
473

    
474
    for backend in backends:
475
        create_jobID = _create_network(network, backend)
476
        if connect:
477
            connect_network(network, backend, create_jobID)
478

    
479

    
480
def _create_network(network, backend):
481
    """Create a network."""
482

    
483
    network_type = network.public and 'public' or 'private'
484

    
485
    tags = network.backend_tag
486
    if network.dhcp:
487
        tags.append('nfdhcpd')
488

    
489
    if network.public:
490
        conflicts_check = True
491
    else:
492
        conflicts_check = False
493

    
494
    try:
495
        bn = BackendNetwork.objects.get(network=network, backend=backend)
496
        mac_prefix = bn.mac_prefix
497
    except BackendNetwork.DoesNotExist:
498
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
499
                        " does not exist" % (network.id, backend.id))
500

    
501
    with pooled_rapi_client(backend) as client:
502
        return client.CreateNetwork(network_name=network.backend_id,
503
                                    network=network.subnet,
504
                                    network6=network.subnet6,
505
                                    gateway=network.gateway,
506
                                    gateway6=network.gateway6,
507
                                    network_type=network_type,
508
                                    mac_prefix=mac_prefix,
509
                                    conflicts_check=conflicts_check,
510
                                    tags=tags)
511

    
512

    
513
def connect_network(network, backend, depend_job=None, group=None):
514
    """Connect a network to nodegroups."""
515
    log.debug("Connecting network %s to backend %s", network, backend)
516

    
517
    if network.public:
518
        conflicts_check = True
519
    else:
520
        conflicts_check = False
521

    
522
    depend_jobs = [depend_job] if depend_job else []
523
    with pooled_rapi_client(backend) as client:
524
        if group:
525
            client.ConnectNetwork(network.backend_id, group, network.mode,
526
                                  network.link, conflicts_check, depend_jobs)
527
        else:
528
            for group in client.GetGroups():
529
                client.ConnectNetwork(network.backend_id, group, network.mode,
530
                                      network.link, conflicts_check,
531
                                      depend_jobs)
532

    
533

    
534
def delete_network(network, backends=None, disconnect=True):
535
    if not backends:
536
        backends = Backend.objects.exclude(offline=True)
537

    
538
    log.debug("Deleting network %s from backends %s", network, backends)
539

    
540
    for backend in backends:
541
        disconnect_jobIDs = []
542
        if disconnect:
543
            disconnect_jobIDs = disconnect_network(network, backend)
544
        _delete_network(network, backend, disconnect_jobIDs)
545

    
546

    
547
def _delete_network(network, backend, depend_jobs=[]):
548
    with pooled_rapi_client(backend) as client:
549
        return client.DeleteNetwork(network.backend_id, depend_jobs)
550

    
551

    
552
def disconnect_network(network, backend, group=None):
553
    log.debug("Disconnecting network %s to backend %s", network, backend)
554

    
555
    with pooled_rapi_client(backend) as client:
556
        if group:
557
            return [client.DisconnectNetwork(network.backend_id, group)]
558
        else:
559
            jobs = []
560
            for group in client.GetGroups():
561
                job = client.DisconnectNetwork(network.backend_id, group)
562
                jobs.append(job)
563
            return jobs
564

    
565

    
566
def connect_to_network(vm, network, address=None):
567
    nic = {'ip': address, 'network': network.backend_id}
568

    
569
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
570

    
571
    with pooled_rapi_client(vm) as client:
572
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
573
                                     hotplug=settings.GANETI_USE_HOTPLUG,
574
                                     dry_run=settings.TEST)
575

    
576

    
577
def disconnect_from_network(vm, nic):
578
    op = [('remove', nic.index, {})]
579

    
580
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
581

    
582
    with pooled_rapi_client(vm) as client:
583
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
584
                                     hotplug=settings.GANETI_USE_HOTPLUG,
585
                                     dry_run=settings.TEST)
586

    
587

    
588
def set_firewall_profile(vm, profile):
589
    try:
590
        tag = _firewall_tags[profile]
591
    except KeyError:
592
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
593

    
594
    log.debug("Setting tag of VM %s to %s", vm, profile)
595

    
596
    with pooled_rapi_client(vm) as client:
597
        # Delete all firewall tags
598
        for t in _firewall_tags.values():
599
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
600
                                      dry_run=settings.TEST)
601

    
602
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
603

    
604
        # XXX NOP ModifyInstance call to force process_net_status to run
605
        # on the dispatcher
606
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
607
        client.ModifyInstance(vm.backend_vm_id,
608
                              os_name=os_name)
609

    
610

    
611
def get_ganeti_instances(backend=None, bulk=False):
612
    instances = []
613
    for backend in get_backends(backend):
614
        with pooled_rapi_client(backend) as client:
615
            instances.append(client.GetInstances(bulk=bulk))
616

    
617
    return reduce(list.__add__, instances, [])
618

    
619

    
620
def get_ganeti_nodes(backend=None, bulk=False):
621
    nodes = []
622
    for backend in get_backends(backend):
623
        with pooled_rapi_client(backend) as client:
624
            nodes.append(client.GetNodes(bulk=bulk))
625

    
626
    return reduce(list.__add__, nodes, [])
627

    
628

    
629
def get_ganeti_jobs(backend=None, bulk=False):
630
    jobs = []
631
    for backend in get_backends(backend):
632
        with pooled_rapi_client(backend) as client:
633
            jobs.append(client.GetJobs(bulk=bulk))
634
    return reduce(list.__add__, jobs, [])
635

    
636
##
637
##
638
##
639

    
640

    
641
def get_backends(backend=None):
642
    if backend:
643
        if backend.offline:
644
            return []
645
        return [backend]
646
    return Backend.objects.filter(offline=False)
647

    
648

    
649
def get_physical_resources(backend):
650
    """ Get the physical resources of a backend.
651

652
    Get the resources of a backend as reported by the backend (not the db).
653

654
    """
655
    nodes = get_ganeti_nodes(backend, bulk=True)
656
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
657
    res = {}
658
    for a in attr:
659
        res[a] = 0
660
    for n in nodes:
661
        # Filter out drained, offline and not vm_capable nodes since they will
662
        # not take part in the vm allocation process
663
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
664
        if can_host_vms and n['cnodes']:
665
            for a in attr:
666
                res[a] += int(n[a])
667
    return res
668

    
669

    
670
def update_resources(backend, resources=None):
671
    """ Update the state of the backend resources in db.
672

673
    """
674

    
675
    if not resources:
676
        resources = get_physical_resources(backend)
677

    
678
    backend.mfree = resources['mfree']
679
    backend.mtotal = resources['mtotal']
680
    backend.dfree = resources['dfree']
681
    backend.dtotal = resources['dtotal']
682
    backend.pinst_cnt = resources['pinst_cnt']
683
    backend.ctotal = resources['ctotal']
684
    backend.updated = datetime.now()
685
    backend.save()
686

    
687

    
688
def get_memory_from_instances(backend):
689
    """ Get the memory that is used from instances.
690

691
    Get the used memory of a backend. Note: This is different for
692
    the real memory used, due to kvm's memory de-duplication.
693

694
    """
695
    with pooled_rapi_client(backend) as client:
696
        instances = client.GetInstances(bulk=True)
697
    mem = 0
698
    for i in instances:
699
        mem += i['oper_ram']
700
    return mem
701

    
702
##
703
## Synchronized operations for reconciliation
704
##
705

    
706

    
707
def create_network_synced(network, backend):
708
    result = _create_network_synced(network, backend)
709
    if result[0] != 'success':
710
        return result
711
    result = connect_network_synced(network, backend)
712
    return result
713

    
714

    
715
def _create_network_synced(network, backend):
716
    with pooled_rapi_client(backend) as client:
717
        job = _create_network(network, backend)
718
        result = wait_for_job(client, job)
719
    return result
720

    
721

    
722
def connect_network_synced(network, backend):
723
    with pooled_rapi_client(backend) as client:
724
        for group in client.GetGroups():
725
            job = client.ConnectNetwork(network.backend_id, group,
726
                                        network.mode, network.link)
727
            result = wait_for_job(client, job)
728
            if result[0] != 'success':
729
                return result
730

    
731
    return result
732

    
733

    
734
def wait_for_job(client, jobid):
735
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
736
    status = result['job_info'][0]
737
    while status not in ['success', 'error', 'cancel']:
738
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
739
                                         [result], None)
740
        status = result['job_info'][0]
741

    
742
    if status == 'success':
743
        return (status, None)
744
    else:
745
        error = result['job_info'][1]
746
        return (status, error)