Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ b2222a7f

History | View | Annotate | Download (26.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, VirtualMachineDiagnostic)
43
from synnefo.logic import utils
44
from synnefo import quotas
45
from synnefo.api.util import release_resource
46
from synnefo.util.mac2eui64 import mac2eui64
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
@transaction.commit_on_success
61
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None):
62
    """Process a job progress notification from the backend
63

64
    Process an incoming message from the backend (currently Ganeti).
65
    Job notifications with a terminating status (sucess, error, or canceled),
66
    also update the operating state of the VM.
67

68
    """
69
    # See #1492, #1031, #1111 why this line has been removed
70
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
71
    if status not in [x[0] for x in BACKEND_STATUSES]:
72
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
73

    
74
    vm.backendjobid = jobid
75
    vm.backendjobstatus = status
76
    vm.backendopcode = opcode
77
    vm.backendlogmsg = logmsg
78

    
79
    # Notifications of success change the operating state
80
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
81
    if status == 'success' and state_for_success is not None:
82
        vm.operstate = state_for_success
83

    
84
    # Update the NICs of the VM
85
    if status == "success" and nics is not None:
86
        _process_net_status(vm, etime, nics)
87

    
88
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
89
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
90
        vm.operstate = 'ERROR'
91
        vm.backendtime = etime
92
    elif opcode == 'OP_INSTANCE_REMOVE':
93
        # Set the deleted flag explicitly, cater for admin-initiated removals
94
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
95
        # when no instance exists at the Ganeti backend.
96
        # See ticket #799 for all the details.
97
        #
98
        if status == 'success' or (status == 'error' and
99
                                   vm.operstate == 'ERROR'):
100
            release_instance_nics(vm)
101
            vm.nics.all().delete()
102
            vm.deleted = True
103
            vm.operstate = state_for_success
104
            vm.backendtime = etime
105
            # Issue and accept commission to Quotaholder
106
            quotas.issue_and_accept_commission(vm, delete=True)
107

    
108
    # Update backendtime only for jobs that have been successfully completed,
109
    # since only these jobs update the state of the VM. Else a "race condition"
110
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
111
    # before an error job and messages arrive in reversed order.
112
    if status == 'success':
113
        vm.backendtime = etime
114

    
115
    vm.save()
116

    
117

    
118
@transaction.commit_on_success
119
def process_net_status(vm, etime, nics):
120
    """Wrap _process_net_status inside transaction."""
121
    _process_net_status(vm, etime, nics)
122

    
123

    
124
def _process_net_status(vm, etime, nics):
125
    """Process a net status notification from the backend
126

127
    Process an incoming message from the Ganeti backend,
128
    detailing the NIC configuration of a VM instance.
129

130
    Update the state of the VM in the DB accordingly.
131
    """
132

    
133
    ganeti_nics = process_ganeti_nics(nics)
134
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
135
        log.debug("NICs for VM %s have not changed", vm)
136

    
137
    release_instance_nics(vm)
138

    
139
    for nic in ganeti_nics:
140
        ipv4 = nic.get('ipv4', '')
141
        net = nic['network']
142
        if ipv4:
143
            net.reserve_address(ipv4)
144

    
145
        nic['dirty'] = False
146
        vm.nics.create(**nic)
147
        # Dummy save the network, because UI uses changed-since for VMs
148
        # and Networks in order to show the VM NICs
149
        net.save()
150

    
151
    vm.backendtime = etime
152
    vm.save()
153

    
154

    
155
def process_ganeti_nics(ganeti_nics):
156
    """Process NIC dict from ganeti hooks."""
157
    new_nics = []
158
    for i, new_nic in enumerate(ganeti_nics):
159
        network = new_nic.get('network', '')
160
        n = str(network)
161
        pk = utils.id_from_network_name(n)
162

    
163
        net = Network.objects.get(pk=pk)
164

    
165
        # Get the new nic info
166
        mac = new_nic.get('mac', '')
167
        ipv4 = new_nic.get('ip', '')
168
        if net.subnet6:
169
            ipv6 = mac2eui64(mac, net.subnet6)
170
        else:
171
            ipv6 = ''
172

    
173
        firewall = new_nic.get('firewall', '')
174
        firewall_profile = _reverse_tags.get(firewall, '')
175
        if not firewall_profile and net.public:
176
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
177

    
178
        nic = {
179
            'index': i,
180
            'network': net,
181
            'mac': mac,
182
            'ipv4': ipv4,
183
            'ipv6': ipv6,
184
            'firewall_profile': firewall_profile,
185
            'state': 'ACTIVE'}
186

    
187
        new_nics.append(nic)
188
    return new_nics
189

    
190

    
191
def nics_changed(old_nics, new_nics):
192
    """Return True if NICs have changed in any way."""
193
    if len(old_nics) != len(new_nics):
194
        return True
195
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
196
    for old_nic, new_nic in zip(old_nics, new_nics):
197
        for field in fields:
198
            if getattr(old_nic, field) != new_nic[field]:
199
                return True
200
    return False
201

    
202

    
203
def release_instance_nics(vm):
204
    for nic in vm.nics.all():
205
        net = nic.network
206
        if nic.ipv4:
207
            net.release_address(nic.ipv4)
208
        nic.delete()
209
        net.save()
210

    
211

    
212
@transaction.commit_on_success
213
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
214
    if status not in [x[0] for x in BACKEND_STATUSES]:
215
        raise Network.InvalidBackendMsgError(opcode, status)
216

    
217
    back_network.backendjobid = jobid
218
    back_network.backendjobstatus = status
219
    back_network.backendopcode = opcode
220
    back_network.backendlogmsg = logmsg
221

    
222
    # Notifications of success change the operating state
223
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
224
    if status == 'success' and state_for_success is not None:
225
        back_network.operstate = state_for_success
226

    
227
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
228
        back_network.operstate = 'ERROR'
229
        back_network.backendtime = etime
230

    
231
    if opcode == 'OP_NETWORK_REMOVE':
232
        if status == 'success' or (status == 'error' and
233
                                   back_network.operstate == 'ERROR'):
234
            back_network.operstate = state_for_success
235
            back_network.deleted = True
236
            back_network.backendtime = etime
237

    
238
    if status == 'success':
239
        back_network.backendtime = etime
240
    back_network.save()
241
    # Also you must update the state of the Network!!
242
    update_network_state(back_network.network)
243

    
244

    
245
def update_network_state(network):
246
    """Update the state of a Network based on BackendNetwork states.
247

248
    Update the state of a Network based on the operstate of the networks in the
249
    backends that network exists.
250

251
    The state of the network is:
252
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
253
    * DELETED: If it is is 'DELETED' in all backends that have been created.
254

255
    This function also releases the resources (MAC prefix or Bridge) and the
256
    quotas for the network.
257

258
    """
259
    if network.deleted:
260
        # Network has already been deleted. Just assert that state is also
261
        # DELETED
262
        if not network.state == "DELETED":
263
            network.state = "DELETED"
264
            network.save()
265
        return
266

    
267
    backend_states = [s.operstate for s in network.backend_networks.all()]
268
    if not backend_states and network.action != "DESTROY":
269
        if network.state != "ACTIVE":
270
            network.state = "ACTIVE"
271
            network.save()
272
            return
273

    
274
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
275
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
276
                     "DELETED")
277

    
278
    # Release the resources on the deletion of the Network
279
    if deleted:
280
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
281
                 network.id, network.mac_prefix, network.link)
282
        network.deleted = True
283
        network.state = "DELETED"
284
        if network.mac_prefix:
285
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
286
                release_resource(res_type="mac_prefix",
287
                                 value=network.mac_prefix)
288
        if network.link:
289
            if network.FLAVORS[network.flavor]["link"] == "pool":
290
                release_resource(res_type="bridge", value=network.link)
291

    
292
        # Issue commission
293
        if network.userid:
294
            quotas.issue_and_accept_commission(network, delete=True)
295
        elif not network.public:
296
            log.warning("Network %s does not have an owner!", network.id)
297
    network.save()
298

    
299

    
300
@transaction.commit_on_success
301
def process_network_modify(back_network, etime, jobid, opcode, status,
302
                           add_reserved_ips, remove_reserved_ips):
303
    assert (opcode == "OP_NETWORK_SET_PARAMS")
304
    if status not in [x[0] for x in BACKEND_STATUSES]:
305
        raise Network.InvalidBackendMsgError(opcode, status)
306

    
307
    back_network.backendjobid = jobid
308
    back_network.backendjobstatus = status
309
    back_network.opcode = opcode
310

    
311
    if add_reserved_ips or remove_reserved_ips:
312
        net = back_network.network
313
        pool = net.get_pool()
314
        if add_reserved_ips:
315
            for ip in add_reserved_ips:
316
                pool.reserve(ip, external=True)
317
        if remove_reserved_ips:
318
            for ip in remove_reserved_ips:
319
                pool.put(ip, external=True)
320
        pool.save()
321

    
322
    if status == 'success':
323
        back_network.backendtime = etime
324
    back_network.save()
325

    
326

    
327
@transaction.commit_on_success
328
def process_create_progress(vm, etime, progress):
329

    
330
    percentage = int(progress)
331

    
332
    # The percentage may exceed 100%, due to the way
333
    # snf-image:copy-progress tracks bytes read by image handling processes
334
    percentage = 100 if percentage > 100 else percentage
335
    if percentage < 0:
336
        raise ValueError("Percentage cannot be negative")
337

    
338
    # FIXME: log a warning here, see #1033
339
#   if last_update > percentage:
340
#       raise ValueError("Build percentage should increase monotonically " \
341
#                        "(old = %d, new = %d)" % (last_update, percentage))
342

    
343
    # This assumes that no message of type 'ganeti-create-progress' is going to
344
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
345
    # the instance is STARTED.  What if the two messages are processed by two
346
    # separate dispatcher threads, and the 'ganeti-op-status' message for
347
    # successful creation gets processed before the 'ganeti-create-progress'
348
    # message? [vkoukis]
349
    #
350
    #if not vm.operstate == 'BUILD':
351
    #    raise VirtualMachine.IllegalState("VM is not in building state")
352

    
353
    vm.buildpercentage = percentage
354
    vm.backendtime = etime
355
    vm.save()
356

    
357

    
358
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
359
                               details=None):
360
    """
361
    Create virtual machine instance diagnostic entry.
362

363
    :param vm: VirtualMachine instance to create diagnostic for.
364
    :param message: Diagnostic message.
365
    :param source: Diagnostic source identifier (e.g. image-helper).
366
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
367
    :param etime: The time the message occured (if available).
368
    :param details: Additional details or debug information.
369
    """
370
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
371
                                                   source_date=etime,
372
                                                   message=message,
373
                                                   details=details)
374

    
375

    
376
def create_instance(vm, public_nic, flavor, image):
377
    """`image` is a dictionary which should contain the keys:
378
            'backend_id', 'format' and 'metadata'
379

380
        metadata value should be a dictionary.
381
    """
382

    
383
    # Handle arguments to CreateInstance() as a dictionary,
384
    # initialize it based on a deployment-specific value.
385
    # This enables the administrator to override deployment-specific
386
    # arguments, such as the disk template to use, name of os provider
387
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
388
    #
389
    kw = vm.backend.get_create_params()
390
    kw['mode'] = 'create'
391
    kw['name'] = vm.backend_vm_id
392
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
393

    
394
    kw['disk_template'] = flavor.disk_template
395
    kw['disks'] = [{"size": flavor.disk * 1024}]
396
    provider = flavor.disk_provider
397
    if provider:
398
        kw['disks'][0]['provider'] = provider
399
        kw['disks'][0]['origin'] = flavor.disk_origin
400

    
401
    kw['nics'] = [public_nic]
402
    if vm.backend.use_hotplug():
403
        kw['hotplug'] = True
404
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
405
    # kw['os'] = settings.GANETI_OS_PROVIDER
406
    kw['ip_check'] = False
407
    kw['name_check'] = False
408

    
409
    # Do not specific a node explicitly, have
410
    # Ganeti use an iallocator instead
411
    #kw['pnode'] = rapi.GetNodes()[0]
412

    
413
    kw['dry_run'] = settings.TEST
414

    
415
    kw['beparams'] = {
416
        'auto_balance': True,
417
        'vcpus': flavor.cpu,
418
        'memory': flavor.ram}
419

    
420
    kw['osparams'] = {
421
        'config_url': vm.config_url,
422
        # Store image id and format to Ganeti
423
        'img_id': image['backend_id'],
424
        'img_format': image['format']}
425

    
426
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
427
    # kw['hvparams'] = dict(serial_console=False)
428

    
429
    log.debug("Creating instance %s", utils.hide_pass(kw))
430
    with pooled_rapi_client(vm) as client:
431
        return client.CreateInstance(**kw)
432

    
433

    
434
def delete_instance(vm):
435
    with pooled_rapi_client(vm) as client:
436
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
437

    
438

    
439
def reboot_instance(vm, reboot_type):
440
    assert reboot_type in ('soft', 'hard')
441
    with pooled_rapi_client(vm) as client:
442
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
443
                                     dry_run=settings.TEST)
444

    
445

    
446
def startup_instance(vm):
447
    with pooled_rapi_client(vm) as client:
448
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
449

    
450

    
451
def shutdown_instance(vm):
452
    with pooled_rapi_client(vm) as client:
453
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
454

    
455

    
456
def get_instance_console(vm):
457
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
458
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
459
    # useless (see #783).
460
    #
461
    # Until this is fixed on the Ganeti side, construct a console info reply
462
    # directly.
463
    #
464
    # WARNING: This assumes that VNC runs on port network_port on
465
    #          the instance's primary node, and is probably
466
    #          hypervisor-specific.
467
    #
468
    log.debug("Getting console for vm %s", vm)
469

    
470
    console = {}
471
    console['kind'] = 'vnc'
472

    
473
    with pooled_rapi_client(vm) as client:
474
        i = client.GetInstance(vm.backend_vm_id)
475

    
476
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
477
        raise Exception("hv parameter serial_console cannot be true")
478
    console['host'] = i['pnode']
479
    console['port'] = i['network_port']
480

    
481
    return console
482

    
483

    
484
def get_instance_info(vm):
485
    with pooled_rapi_client(vm) as client:
486
        return client.GetInstanceInfo(vm.backend_vm_id)
487

    
488

    
489
def create_network(network, backend, connect=True):
490
    """Create a network in a Ganeti backend"""
491
    log.debug("Creating network %s in backend %s", network, backend)
492

    
493
    job_id = _create_network(network, backend)
494

    
495
    if connect:
496
        job_ids = connect_network(network, backend, depends=[job_id])
497
        return job_ids
498
    else:
499
        return [job_id]
500

    
501

    
502
def _create_network(network, backend):
503
    """Create a network."""
504

    
505
    network_type = network.public and 'public' or 'private'
506

    
507
    tags = network.backend_tag
508
    if network.dhcp:
509
        tags.append('nfdhcpd')
510

    
511
    if network.public:
512
        conflicts_check = True
513
    else:
514
        conflicts_check = False
515

    
516
    try:
517
        bn = BackendNetwork.objects.get(network=network, backend=backend)
518
        mac_prefix = bn.mac_prefix
519
    except BackendNetwork.DoesNotExist:
520
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
521
                        " does not exist" % (network.id, backend.id))
522

    
523
    with pooled_rapi_client(backend) as client:
524
        return client.CreateNetwork(network_name=network.backend_id,
525
                                    network=network.subnet,
526
                                    network6=network.subnet6,
527
                                    gateway=network.gateway,
528
                                    gateway6=network.gateway6,
529
                                    network_type=network_type,
530
                                    mac_prefix=mac_prefix,
531
                                    conflicts_check=conflicts_check,
532
                                    tags=tags)
533

    
534

    
535
def connect_network(network, backend, depends=[], group=None):
536
    """Connect a network to nodegroups."""
537
    log.debug("Connecting network %s to backend %s", network, backend)
538

    
539
    if network.public:
540
        conflicts_check = True
541
    else:
542
        conflicts_check = False
543

    
544
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
545
    with pooled_rapi_client(backend) as client:
546
        groups = [group] if group is not None else client.GetGroups()
547
        job_ids = []
548
        for group in groups:
549
            job_id = client.ConnectNetwork(network.backend_id, group,
550
                                           network.mode, network.link,
551
                                           conflicts_check,
552
                                           depends=depends)
553
            job_ids.append(job_id)
554
    return job_ids
555

    
556

    
557
def delete_network(network, backend, disconnect=True):
558
    log.debug("Deleting network %s from backend %s", network, backend)
559

    
560
    depends = []
561
    if disconnect:
562
        depends = disconnect_network(network, backend)
563
    _delete_network(network, backend, depends=depends)
564

    
565

    
566
def _delete_network(network, backend, depends=[]):
567
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
568
    with pooled_rapi_client(backend) as client:
569
        return client.DeleteNetwork(network.backend_id, depends)
570

    
571

    
572
def disconnect_network(network, backend, group=None):
573
    log.debug("Disconnecting network %s to backend %s", network, backend)
574

    
575
    with pooled_rapi_client(backend) as client:
576
        groups = [group] if group is not None else client.GetGroups()
577
        job_ids = []
578
        for group in groups:
579
            job_id = client.DisconnectNetwork(network.backend_id, group)
580
            job_ids.append(job_id)
581
    return job_ids
582

    
583

    
584
def connect_to_network(vm, network, address=None):
585
    backend = vm.backend
586
    network = Network.objects.select_for_update().get(id=network.id)
587
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
588
                                                         network=network)
589
    depend_jobs = []
590
    if bnet.operstate != "ACTIVE":
591
        depend_jobs = create_network(network, backend, connect=True)
592

    
593
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
594

    
595
    nic = {'ip': address, 'network': network.backend_id}
596

    
597
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
598

    
599
    with pooled_rapi_client(vm) as client:
600
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
601
                                     hotplug=vm.backend.use_hotplug(),
602
                                     depends=depends,
603
                                     dry_run=settings.TEST)
604

    
605

    
606
def disconnect_from_network(vm, nic):
607
    op = [('remove', nic.index, {})]
608

    
609
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
610

    
611
    with pooled_rapi_client(vm) as client:
612
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
613
                                     hotplug=vm.backend.use_hotplug(),
614
                                     dry_run=settings.TEST)
615

    
616

    
617
def set_firewall_profile(vm, profile):
618
    try:
619
        tag = _firewall_tags[profile]
620
    except KeyError:
621
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
622

    
623
    log.debug("Setting tag of VM %s to %s", vm, profile)
624

    
625
    with pooled_rapi_client(vm) as client:
626
        # Delete all firewall tags
627
        for t in _firewall_tags.values():
628
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
629
                                      dry_run=settings.TEST)
630

    
631
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
632

    
633
        # XXX NOP ModifyInstance call to force process_net_status to run
634
        # on the dispatcher
635
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
636
        client.ModifyInstance(vm.backend_vm_id,
637
                              os_name=os_name)
638

    
639

    
640
def get_ganeti_instances(backend=None, bulk=False):
641
    instances = []
642
    for backend in get_backends(backend):
643
        with pooled_rapi_client(backend) as client:
644
            instances.append(client.GetInstances(bulk=bulk))
645

    
646
    return reduce(list.__add__, instances, [])
647

    
648

    
649
def get_ganeti_nodes(backend=None, bulk=False):
650
    nodes = []
651
    for backend in get_backends(backend):
652
        with pooled_rapi_client(backend) as client:
653
            nodes.append(client.GetNodes(bulk=bulk))
654

    
655
    return reduce(list.__add__, nodes, [])
656

    
657

    
658
def get_ganeti_jobs(backend=None, bulk=False):
659
    jobs = []
660
    for backend in get_backends(backend):
661
        with pooled_rapi_client(backend) as client:
662
            jobs.append(client.GetJobs(bulk=bulk))
663
    return reduce(list.__add__, jobs, [])
664

    
665
##
666
##
667
##
668

    
669

    
670
def get_backends(backend=None):
671
    if backend:
672
        if backend.offline:
673
            return []
674
        return [backend]
675
    return Backend.objects.filter(offline=False)
676

    
677

    
678
def get_physical_resources(backend):
679
    """ Get the physical resources of a backend.
680

681
    Get the resources of a backend as reported by the backend (not the db).
682

683
    """
684
    nodes = get_ganeti_nodes(backend, bulk=True)
685
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
686
    res = {}
687
    for a in attr:
688
        res[a] = 0
689
    for n in nodes:
690
        # Filter out drained, offline and not vm_capable nodes since they will
691
        # not take part in the vm allocation process
692
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
693
        if can_host_vms and n['cnodes']:
694
            for a in attr:
695
                res[a] += int(n[a])
696
    return res
697

    
698

    
699
def update_resources(backend, resources=None):
700
    """ Update the state of the backend resources in db.
701

702
    """
703

    
704
    if not resources:
705
        resources = get_physical_resources(backend)
706

    
707
    backend.mfree = resources['mfree']
708
    backend.mtotal = resources['mtotal']
709
    backend.dfree = resources['dfree']
710
    backend.dtotal = resources['dtotal']
711
    backend.pinst_cnt = resources['pinst_cnt']
712
    backend.ctotal = resources['ctotal']
713
    backend.updated = datetime.now()
714
    backend.save()
715

    
716

    
717
def get_memory_from_instances(backend):
718
    """ Get the memory that is used from instances.
719

720
    Get the used memory of a backend. Note: This is different for
721
    the real memory used, due to kvm's memory de-duplication.
722

723
    """
724
    with pooled_rapi_client(backend) as client:
725
        instances = client.GetInstances(bulk=True)
726
    mem = 0
727
    for i in instances:
728
        mem += i['oper_ram']
729
    return mem
730

    
731
##
732
## Synchronized operations for reconciliation
733
##
734

    
735

    
736
def create_network_synced(network, backend):
737
    result = _create_network_synced(network, backend)
738
    if result[0] != 'success':
739
        return result
740
    result = connect_network_synced(network, backend)
741
    return result
742

    
743

    
744
def _create_network_synced(network, backend):
745
    with pooled_rapi_client(backend) as client:
746
        job = _create_network(network, backend)
747
        result = wait_for_job(client, job)
748
    return result
749

    
750

    
751
def connect_network_synced(network, backend):
752
    with pooled_rapi_client(backend) as client:
753
        for group in client.GetGroups():
754
            job = client.ConnectNetwork(network.backend_id, group,
755
                                        network.mode, network.link)
756
            result = wait_for_job(client, job)
757
            if result[0] != 'success':
758
                return result
759

    
760
    return result
761

    
762

    
763
def wait_for_job(client, jobid):
764
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
765
    status = result['job_info'][0]
766
    while status not in ['success', 'error', 'cancel']:
767
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
768
                                         [result], None)
769
        status = result['job_info'][0]
770

    
771
    if status == 'success':
772
        return (status, None)
773
    else:
774
        error = result['job_info'][1]
775
        return (status, error)