Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 3f339d85

History | View | Annotate | Download (26.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, VirtualMachineDiagnostic)
43
from synnefo.logic import utils
44
from synnefo import quotas
45
from synnefo.api.util import release_resource
46
from synnefo.util.mac2eui64 import mac2eui64
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
@transaction.commit_on_success
61
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None):
62
    """Process a job progress notification from the backend
63

64
    Process an incoming message from the backend (currently Ganeti).
65
    Job notifications with a terminating status (sucess, error, or canceled),
66
    also update the operating state of the VM.
67

68
    """
69
    # See #1492, #1031, #1111 why this line has been removed
70
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
71
    if status not in [x[0] for x in BACKEND_STATUSES]:
72
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
73

    
74
    vm.backendjobid = jobid
75
    vm.backendjobstatus = status
76
    vm.backendopcode = opcode
77
    vm.backendlogmsg = logmsg
78

    
79
    # Notifications of success change the operating state
80
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
81
    if status == 'success' and state_for_success is not None:
82
        vm.operstate = state_for_success
83

    
84
    # Update the NICs of the VM
85
    if status == "success" and nics is not None:
86
        _process_net_status(vm, etime, nics)
87

    
88
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
89
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
90
        vm.operstate = 'ERROR'
91
        vm.backendtime = etime
92
    elif opcode == 'OP_INSTANCE_REMOVE':
93
        # Set the deleted flag explicitly, cater for admin-initiated removals
94
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
95
        # when no instance exists at the Ganeti backend.
96
        # See ticket #799 for all the details.
97
        #
98
        if status == 'success' or (status == 'error' and
99
                                   vm.operstate == 'ERROR'):
100
            release_instance_nics(vm)
101
            vm.nics.all().delete()
102
            vm.deleted = True
103
            vm.operstate = state_for_success
104
            vm.backendtime = etime
105
            # Issue and accept commission to Quotaholder
106
            quotas.issue_and_accept_commission(vm, delete=True)
107

    
108
    # Update backendtime only for jobs that have been successfully completed,
109
    # since only these jobs update the state of the VM. Else a "race condition"
110
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
111
    # before an error job and messages arrive in reversed order.
112
    if status == 'success':
113
        vm.backendtime = etime
114

    
115
    vm.save()
116

    
117

    
118
@transaction.commit_on_success
119
def process_net_status(vm, etime, nics):
120
    """Wrap _process_net_status inside transaction."""
121
    _process_net_status(vm, etime, nics)
122

    
123

    
124
def _process_net_status(vm, etime, nics):
125
    """Process a net status notification from the backend
126

127
    Process an incoming message from the Ganeti backend,
128
    detailing the NIC configuration of a VM instance.
129

130
    Update the state of the VM in the DB accordingly.
131
    """
132

    
133
    ganeti_nics = process_ganeti_nics(nics)
134
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
135
        log.debug("NICs for VM %s have not changed", vm)
136

    
137
    release_instance_nics(vm)
138

    
139
    for nic in ganeti_nics:
140
        ipv4 = nic.get('ipv4', '')
141
        net = nic['network']
142
        if ipv4:
143
            net.reserve_address(ipv4)
144

    
145
        nic['dirty'] = False
146
        vm.nics.create(**nic)
147
        # Dummy save the network, because UI uses changed-since for VMs
148
        # and Networks in order to show the VM NICs
149
        net.save()
150

    
151
    vm.backendtime = etime
152
    vm.save()
153

    
154

    
155
def process_ganeti_nics(ganeti_nics):
156
    """Process NIC dict from ganeti hooks."""
157
    new_nics = []
158
    for i, new_nic in enumerate(ganeti_nics):
159
        network = new_nic.get('network', '')
160
        n = str(network)
161
        pk = utils.id_from_network_name(n)
162

    
163
        net = Network.objects.get(pk=pk)
164

    
165
        # Get the new nic info
166
        mac = new_nic.get('mac', '')
167
        ipv4 = new_nic.get('ip', '')
168
        if net.subnet6:
169
            ipv6 = mac2eui64(mac, net.subnet6)
170
        else:
171
            ipv6 = ''
172

    
173
        firewall = new_nic.get('firewall', '')
174
        firewall_profile = _reverse_tags.get(firewall, '')
175
        if not firewall_profile and net.public:
176
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
177

    
178
        nic = {
179
            'index': i,
180
            'network': net,
181
            'mac': mac,
182
            'ipv4': ipv4,
183
            'ipv6': ipv6,
184
            'firewall_profile': firewall_profile,
185
            'state': 'ACTIVE'}
186

    
187
        new_nics.append(nic)
188
    return new_nics
189

    
190

    
191
def nics_changed(old_nics, new_nics):
192
    """Return True if NICs have changed in any way."""
193
    if len(old_nics) != len(new_nics):
194
        return True
195
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
196
    for old_nic, new_nic in zip(old_nics, new_nics):
197
        for field in fields:
198
            if getattr(old_nic, field) != new_nic[field]:
199
                return True
200
    return False
201

    
202

    
203
def release_instance_nics(vm):
204
    for nic in vm.nics.all():
205
        net = nic.network
206
        if nic.ipv4:
207
            net.release_address(nic.ipv4)
208
        nic.delete()
209
        net.save()
210

    
211

    
212
@transaction.commit_on_success
213
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
214
    if status not in [x[0] for x in BACKEND_STATUSES]:
215
        raise Network.InvalidBackendMsgError(opcode, status)
216

    
217
    back_network.backendjobid = jobid
218
    back_network.backendjobstatus = status
219
    back_network.backendopcode = opcode
220
    back_network.backendlogmsg = logmsg
221

    
222
    # Notifications of success change the operating state
223
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
224
    if status == 'success' and state_for_success is not None:
225
        back_network.operstate = state_for_success
226

    
227
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
228
        back_network.operstate = 'ERROR'
229
        back_network.backendtime = etime
230

    
231
    if opcode == 'OP_NETWORK_REMOVE':
232
        if status == 'success' or (status == 'error' and
233
                                   back_network.operstate == 'ERROR'):
234
            back_network.operstate = state_for_success
235
            back_network.deleted = True
236
            back_network.backendtime = etime
237

    
238
    if status == 'success':
239
        back_network.backendtime = etime
240
    back_network.save()
241
    # Also you must update the state of the Network!!
242
    update_network_state(back_network.network)
243

    
244

    
245
def update_network_state(network):
246
    """Update the state of a Network based on BackendNetwork states.
247

248
    Update the state of a Network based on the operstate of the networks in the
249
    backends that network exists.
250

251
    The state of the network is:
252
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
253
    * DELETED: If it is is 'DELETED' in all backends that have been created.
254

255
    This function also releases the resources (MAC prefix or Bridge) and the
256
    quotas for the network.
257

258
    """
259
    if network.deleted:
260
        # Network has already been deleted. Just assert that state is also
261
        # DELETED
262
        if not network.state == "DELETED":
263
            network.state = "DELETED"
264
            network.save()
265
        return
266

    
267
    backend_states = [s.operstate for s in network.backend_networks.all()]
268
    if not backend_states and network.action != "DESTROY":
269
        if network.state != "ACTIVE":
270
            network.state = "ACTIVE"
271
            network.save()
272
            return
273

    
274
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
275
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
276
                     "DELETED")
277

    
278
    # Release the resources on the deletion of the Network
279
    if deleted:
280
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
281
                 network.id, network.mac_prefix, network.link)
282
        network.deleted = True
283
        network.state = "DELETED"
284
        if network.mac_prefix:
285
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
286
                release_resource(res_type="mac_prefix",
287
                                 value=network.mac_prefix)
288
        if network.link:
289
            if network.FLAVORS[network.flavor]["link"] == "pool":
290
                release_resource(res_type="bridge", value=network.link)
291

    
292
        # Issue commission
293
        if network.userid:
294
            quotas.issue_and_accept_commission(network, delete=True)
295
        elif not network.public:
296
            log.warning("Network %s does not have an owner!", network.id)
297
    network.save()
298

    
299

    
300
@transaction.commit_on_success
301
def process_network_modify(back_network, etime, jobid, opcode, status,
302
                           add_reserved_ips, remove_reserved_ips):
303
    assert (opcode == "OP_NETWORK_SET_PARAMS")
304
    if status not in [x[0] for x in BACKEND_STATUSES]:
305
        raise Network.InvalidBackendMsgError(opcode, status)
306

    
307
    back_network.backendjobid = jobid
308
    back_network.backendjobstatus = status
309
    back_network.opcode = opcode
310

    
311
    if add_reserved_ips or remove_reserved_ips:
312
        net = back_network.network
313
        pool = net.get_pool()
314
        if add_reserved_ips:
315
            for ip in add_reserved_ips:
316
                pool.reserve(ip, external=True)
317
        if remove_reserved_ips:
318
            for ip in remove_reserved_ips:
319
                pool.put(ip, external=True)
320
        pool.save()
321

    
322
    if status == 'success':
323
        back_network.backendtime = etime
324
    back_network.save()
325

    
326

    
327
@transaction.commit_on_success
328
def process_create_progress(vm, etime, progress):
329

    
330
    percentage = int(progress)
331

    
332
    # The percentage may exceed 100%, due to the way
333
    # snf-image:copy-progress tracks bytes read by image handling processes
334
    percentage = 100 if percentage > 100 else percentage
335
    if percentage < 0:
336
        raise ValueError("Percentage cannot be negative")
337

    
338
    # FIXME: log a warning here, see #1033
339
#   if last_update > percentage:
340
#       raise ValueError("Build percentage should increase monotonically " \
341
#                        "(old = %d, new = %d)" % (last_update, percentage))
342

    
343
    # This assumes that no message of type 'ganeti-create-progress' is going to
344
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
345
    # the instance is STARTED.  What if the two messages are processed by two
346
    # separate dispatcher threads, and the 'ganeti-op-status' message for
347
    # successful creation gets processed before the 'ganeti-create-progress'
348
    # message? [vkoukis]
349
    #
350
    #if not vm.operstate == 'BUILD':
351
    #    raise VirtualMachine.IllegalState("VM is not in building state")
352

    
353
    vm.buildpercentage = percentage
354
    vm.backendtime = etime
355
    vm.save()
356

    
357

    
358
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
359
                               details=None):
360
    """
361
    Create virtual machine instance diagnostic entry.
362

363
    :param vm: VirtualMachine instance to create diagnostic for.
364
    :param message: Diagnostic message.
365
    :param source: Diagnostic source identifier (e.g. image-helper).
366
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
367
    :param etime: The time the message occured (if available).
368
    :param details: Additional details or debug information.
369
    """
370
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
371
                                                   source_date=etime,
372
                                                   message=message,
373
                                                   details=details)
374

    
375

    
376
def create_instance(vm, public_nic, flavor, image):
377
    """`image` is a dictionary which should contain the keys:
378
            'backend_id', 'format' and 'metadata'
379

380
        metadata value should be a dictionary.
381
    """
382

    
383
    # Handle arguments to CreateInstance() as a dictionary,
384
    # initialize it based on a deployment-specific value.
385
    # This enables the administrator to override deployment-specific
386
    # arguments, such as the disk template to use, name of os provider
387
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
388
    #
389
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
390
    kw['mode'] = 'create'
391
    kw['name'] = vm.backend_vm_id
392
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
393

    
394
    kw['disk_template'] = flavor.disk_template
395
    kw['disks'] = [{"size": flavor.disk * 1024}]
396
    provider = flavor.disk_provider
397
    if provider:
398
        kw['disks'][0]['provider'] = provider
399

    
400
        if provider == 'vlmc':
401
            kw['disks'][0]['origin'] = flavor.disk_origin
402

    
403
    kw['nics'] = [public_nic]
404
    if settings.GANETI_USE_HOTPLUG:
405
        kw['hotplug'] = True
406
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
407
    # kw['os'] = settings.GANETI_OS_PROVIDER
408
    kw['ip_check'] = False
409
    kw['name_check'] = False
410

    
411
    # Do not specific a node explicitly, have
412
    # Ganeti use an iallocator instead
413
    #kw['pnode'] = rapi.GetNodes()[0]
414

    
415
    kw['dry_run'] = settings.TEST
416

    
417
    kw['beparams'] = {
418
        'auto_balance': True,
419
        'vcpus': flavor.cpu,
420
        'memory': flavor.ram}
421

    
422
    kw['osparams'] = {
423
        'config_url': vm.config_url,
424
        # Store image id and format to Ganeti
425
        'img_id': image['backend_id'],
426
        'img_format': image['format']}
427

    
428
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
429
    # kw['hvparams'] = dict(serial_console=False)
430

    
431
    log.debug("Creating instance %s", utils.hide_pass(kw))
432
    with pooled_rapi_client(vm) as client:
433
        return client.CreateInstance(**kw)
434

    
435

    
436
def delete_instance(vm):
437
    with pooled_rapi_client(vm) as client:
438
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
439

    
440

    
441
def reboot_instance(vm, reboot_type):
442
    assert reboot_type in ('soft', 'hard')
443
    with pooled_rapi_client(vm) as client:
444
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
445
                                     dry_run=settings.TEST)
446

    
447

    
448
def startup_instance(vm):
449
    with pooled_rapi_client(vm) as client:
450
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
451

    
452

    
453
def shutdown_instance(vm):
454
    with pooled_rapi_client(vm) as client:
455
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
456

    
457

    
458
def get_instance_console(vm):
459
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
460
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
461
    # useless (see #783).
462
    #
463
    # Until this is fixed on the Ganeti side, construct a console info reply
464
    # directly.
465
    #
466
    # WARNING: This assumes that VNC runs on port network_port on
467
    #          the instance's primary node, and is probably
468
    #          hypervisor-specific.
469
    #
470
    log.debug("Getting console for vm %s", vm)
471

    
472
    console = {}
473
    console['kind'] = 'vnc'
474

    
475
    with pooled_rapi_client(vm) as client:
476
        i = client.GetInstance(vm.backend_vm_id)
477

    
478
    if i['hvparams']['serial_console']:
479
        raise Exception("hv parameter serial_console cannot be true")
480
    console['host'] = i['pnode']
481
    console['port'] = i['network_port']
482

    
483
    return console
484

    
485

    
486
def get_instance_info(vm):
487
    with pooled_rapi_client(vm) as client:
488
        return client.GetInstanceInfo(vm.backend_vm_id)
489

    
490

    
491
def create_network(network, backend, connect=True):
492
    """Create a network in a Ganeti backend"""
493
    log.debug("Creating network %s in backend %s", network, backend)
494

    
495
    job_id = _create_network(network, backend)
496

    
497
    if connect:
498
        job_ids = connect_network(network, backend, depends=[job_id])
499
        return job_ids
500
    else:
501
        return [job_id]
502

    
503

    
504
def _create_network(network, backend):
505
    """Create a network."""
506

    
507
    network_type = network.public and 'public' or 'private'
508

    
509
    tags = network.backend_tag
510
    if network.dhcp:
511
        tags.append('nfdhcpd')
512

    
513
    if network.public:
514
        conflicts_check = True
515
    else:
516
        conflicts_check = False
517

    
518
    try:
519
        bn = BackendNetwork.objects.get(network=network, backend=backend)
520
        mac_prefix = bn.mac_prefix
521
    except BackendNetwork.DoesNotExist:
522
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
523
                        " does not exist" % (network.id, backend.id))
524

    
525
    with pooled_rapi_client(backend) as client:
526
        return client.CreateNetwork(network_name=network.backend_id,
527
                                    network=network.subnet,
528
                                    network6=network.subnet6,
529
                                    gateway=network.gateway,
530
                                    gateway6=network.gateway6,
531
                                    network_type=network_type,
532
                                    mac_prefix=mac_prefix,
533
                                    conflicts_check=conflicts_check,
534
                                    tags=tags)
535

    
536

    
537
def connect_network(network, backend, depends=[], group=None):
538
    """Connect a network to nodegroups."""
539
    log.debug("Connecting network %s to backend %s", network, backend)
540

    
541
    if network.public:
542
        conflicts_check = True
543
    else:
544
        conflicts_check = False
545

    
546
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
547
    with pooled_rapi_client(backend) as client:
548
        groups = [group] if group is not None else client.GetGroups()
549
        job_ids = []
550
        for group in groups:
551
            job_id = client.ConnectNetwork(network.backend_id, group,
552
                                           network.mode, network.link,
553
                                           conflicts_check,
554
                                           depends=depends)
555
            job_ids.append(job_id)
556
    return job_ids
557

    
558

    
559
def delete_network(network, backend, disconnect=True):
560
    log.debug("Deleting network %s from backend %s", network, backend)
561

    
562
    depends = []
563
    if disconnect:
564
        depends = disconnect_network(network, backend)
565
    _delete_network(network, backend, depends=depends)
566

    
567

    
568
def _delete_network(network, backend, depends=[]):
569
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
570
    with pooled_rapi_client(backend) as client:
571
        return client.DeleteNetwork(network.backend_id, depends)
572

    
573

    
574
def disconnect_network(network, backend, group=None):
575
    log.debug("Disconnecting network %s to backend %s", network, backend)
576

    
577
    with pooled_rapi_client(backend) as client:
578
        groups = [group] if group is not None else client.GetGroups()
579
        job_ids = []
580
        for group in groups:
581
            job_id = client.DisconnectNetwork(network.backend_id, group)
582
            job_ids.append(job_id)
583
    return job_ids
584

    
585

    
586
def connect_to_network(vm, network, address=None):
587
    backend = vm.backend
588
    network = Network.objects.select_for_update().get(id=network.id)
589
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
590
                                                         network=network)
591
    depend_jobs = []
592
    if bnet.operstate != "ACTIVE":
593
        depend_jobs = create_network(network, backend, connect=True)
594

    
595
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
596

    
597
    nic = {'ip': address, 'network': network.backend_id}
598

    
599
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
600

    
601
    with pooled_rapi_client(vm) as client:
602
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
603
                                     hotplug=settings.GANETI_USE_HOTPLUG,
604
                                     depends=depends,
605
                                     dry_run=settings.TEST)
606

    
607

    
608
def disconnect_from_network(vm, nic):
609
    op = [('remove', nic.index, {})]
610

    
611
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
612

    
613
    with pooled_rapi_client(vm) as client:
614
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
615
                                     hotplug=settings.GANETI_USE_HOTPLUG,
616
                                     dry_run=settings.TEST)
617

    
618

    
619
def set_firewall_profile(vm, profile):
620
    try:
621
        tag = _firewall_tags[profile]
622
    except KeyError:
623
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
624

    
625
    log.debug("Setting tag of VM %s to %s", vm, profile)
626

    
627
    with pooled_rapi_client(vm) as client:
628
        # Delete all firewall tags
629
        for t in _firewall_tags.values():
630
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
631
                                      dry_run=settings.TEST)
632

    
633
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
634

    
635
        # XXX NOP ModifyInstance call to force process_net_status to run
636
        # on the dispatcher
637
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
638
        client.ModifyInstance(vm.backend_vm_id,
639
                              os_name=os_name)
640

    
641

    
642
def get_ganeti_instances(backend=None, bulk=False):
643
    instances = []
644
    for backend in get_backends(backend):
645
        with pooled_rapi_client(backend) as client:
646
            instances.append(client.GetInstances(bulk=bulk))
647

    
648
    return reduce(list.__add__, instances, [])
649

    
650

    
651
def get_ganeti_nodes(backend=None, bulk=False):
652
    nodes = []
653
    for backend in get_backends(backend):
654
        with pooled_rapi_client(backend) as client:
655
            nodes.append(client.GetNodes(bulk=bulk))
656

    
657
    return reduce(list.__add__, nodes, [])
658

    
659

    
660
def get_ganeti_jobs(backend=None, bulk=False):
661
    jobs = []
662
    for backend in get_backends(backend):
663
        with pooled_rapi_client(backend) as client:
664
            jobs.append(client.GetJobs(bulk=bulk))
665
    return reduce(list.__add__, jobs, [])
666

    
667
##
668
##
669
##
670

    
671

    
672
def get_backends(backend=None):
673
    if backend:
674
        if backend.offline:
675
            return []
676
        return [backend]
677
    return Backend.objects.filter(offline=False)
678

    
679

    
680
def get_physical_resources(backend):
681
    """ Get the physical resources of a backend.
682

683
    Get the resources of a backend as reported by the backend (not the db).
684

685
    """
686
    nodes = get_ganeti_nodes(backend, bulk=True)
687
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
688
    res = {}
689
    for a in attr:
690
        res[a] = 0
691
    for n in nodes:
692
        # Filter out drained, offline and not vm_capable nodes since they will
693
        # not take part in the vm allocation process
694
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
695
        if can_host_vms and n['cnodes']:
696
            for a in attr:
697
                res[a] += int(n[a])
698
    return res
699

    
700

    
701
def update_resources(backend, resources=None):
702
    """ Update the state of the backend resources in db.
703

704
    """
705

    
706
    if not resources:
707
        resources = get_physical_resources(backend)
708

    
709
    backend.mfree = resources['mfree']
710
    backend.mtotal = resources['mtotal']
711
    backend.dfree = resources['dfree']
712
    backend.dtotal = resources['dtotal']
713
    backend.pinst_cnt = resources['pinst_cnt']
714
    backend.ctotal = resources['ctotal']
715
    backend.updated = datetime.now()
716
    backend.save()
717

    
718

    
719
def get_memory_from_instances(backend):
720
    """ Get the memory that is used from instances.
721

722
    Get the used memory of a backend. Note: This is different for
723
    the real memory used, due to kvm's memory de-duplication.
724

725
    """
726
    with pooled_rapi_client(backend) as client:
727
        instances = client.GetInstances(bulk=True)
728
    mem = 0
729
    for i in instances:
730
        mem += i['oper_ram']
731
    return mem
732

    
733
##
734
## Synchronized operations for reconciliation
735
##
736

    
737

    
738
def create_network_synced(network, backend):
739
    result = _create_network_synced(network, backend)
740
    if result[0] != 'success':
741
        return result
742
    result = connect_network_synced(network, backend)
743
    return result
744

    
745

    
746
def _create_network_synced(network, backend):
747
    with pooled_rapi_client(backend) as client:
748
        job = _create_network(network, backend)
749
        result = wait_for_job(client, job)
750
    return result
751

    
752

    
753
def connect_network_synced(network, backend):
754
    with pooled_rapi_client(backend) as client:
755
        for group in client.GetGroups():
756
            job = client.ConnectNetwork(network.backend_id, group,
757
                                        network.mode, network.link)
758
            result = wait_for_job(client, job)
759
            if result[0] != 'success':
760
                return result
761

    
762
    return result
763

    
764

    
765
def wait_for_job(client, jobid):
766
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
767
    status = result['job_info'][0]
768
    while status not in ['success', 'error', 'cancel']:
769
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
770
                                         [result], None)
771
        status = result['job_info'][0]
772

    
773
    if status == 'success':
774
        return (status, None)
775
    else:
776
        error = result['job_info'][1]
777
        return (status, error)