Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 05146623

History | View | Annotate | Download (26.6 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic)
40
from synnefo.logic import utils
41
from synnefo import quotas
42
from synnefo.api.util import release_resource
43
from synnefo.util.mac2eui64 import mac2eui64
44

    
45
from logging import getLogger
46
log = getLogger(__name__)
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
@transaction.commit_on_success
58
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None):
59
    """Process a job progress notification from the backend
60

61
    Process an incoming message from the backend (currently Ganeti).
62
    Job notifications with a terminating status (sucess, error, or canceled),
63
    also update the operating state of the VM.
64

65
    """
66
    # See #1492, #1031, #1111 why this line has been removed
67
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
68
    if status not in [x[0] for x in BACKEND_STATUSES]:
69
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
70

    
71
    vm.backendjobid = jobid
72
    vm.backendjobstatus = status
73
    vm.backendopcode = opcode
74
    vm.backendlogmsg = logmsg
75

    
76
    # Notifications of success change the operating state
77
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
78
    if status == 'success' and state_for_success is not None:
79
        vm.operstate = state_for_success
80

    
81
    # Update the NICs of the VM
82
    if status == "success" and nics is not None:
83
        _process_net_status(vm, etime, nics)
84

    
85
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
86
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
87
        vm.operstate = 'ERROR'
88
        vm.backendtime = etime
89
    elif opcode == 'OP_INSTANCE_REMOVE':
90
        # Set the deleted flag explicitly, cater for admin-initiated removals
91
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
92
        # when no instance exists at the Ganeti backend.
93
        # See ticket #799 for all the details.
94
        #
95
        if status == 'success' or (status == 'error' and
96
                                   vm.operstate == 'ERROR'):
97
            _process_net_status(vm, etime, nics=[])
98
            vm.deleted = True
99
            vm.operstate = state_for_success
100
            vm.backendtime = etime
101
            # Issue and accept commission to Quotaholder
102
            quotas.issue_and_accept_commission(vm, delete=True)
103

    
104
    # Update backendtime only for jobs that have been successfully completed,
105
    # since only these jobs update the state of the VM. Else a "race condition"
106
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
107
    # before an error job and messages arrive in reversed order.
108
    if status == 'success':
109
        vm.backendtime = etime
110

    
111
    vm.save()
112

    
113

    
114
@transaction.commit_on_success
115
def process_net_status(vm, etime, nics):
116
    """Wrap _process_net_status inside transaction."""
117
    _process_net_status(vm, etime, nics)
118

    
119

    
120
def _process_net_status(vm, etime, nics):
121
    """Process a net status notification from the backend
122

123
    Process an incoming message from the Ganeti backend,
124
    detailing the NIC configuration of a VM instance.
125

126
    Update the state of the VM in the DB accordingly.
127
    """
128

    
129
    ganeti_nics = process_ganeti_nics(nics)
130
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
131
        log.debug("NICs for VM %s have not changed", vm)
132
        return
133

    
134
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
135
    # guarantee that no deadlock will occur with Backend allocator.
136
    Backend.objects.select_for_update().get(id=vm.backend_id)
137

    
138
    release_instance_nics(vm)
139

    
140
    for nic in ganeti_nics:
141
        ipv4 = nic.get('ipv4', '')
142
        net = nic['network']
143
        if ipv4:
144
            net.reserve_address(ipv4)
145

    
146
        nic['dirty'] = False
147
        vm.nics.create(**nic)
148
        # Dummy save the network, because UI uses changed-since for VMs
149
        # and Networks in order to show the VM NICs
150
        net.save()
151

    
152
    vm.backendtime = etime
153
    vm.save()
154

    
155

    
156
def process_ganeti_nics(ganeti_nics):
157
    """Process NIC dict from ganeti hooks."""
158
    new_nics = []
159
    for i, new_nic in enumerate(ganeti_nics):
160
        network = new_nic.get('network', '')
161
        n = str(network)
162
        pk = utils.id_from_network_name(n)
163

    
164
        net = Network.objects.get(pk=pk)
165

    
166
        # Get the new nic info
167
        mac = new_nic.get('mac', '')
168
        ipv4 = new_nic.get('ip', '')
169
        if net.subnet6:
170
            ipv6 = mac2eui64(mac, net.subnet6)
171
        else:
172
            ipv6 = ''
173

    
174
        firewall = new_nic.get('firewall', '')
175
        firewall_profile = _reverse_tags.get(firewall, '')
176
        if not firewall_profile and net.public:
177
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
178

    
179
        nic = {
180
            'index': i,
181
            'network': net,
182
            'mac': mac,
183
            'ipv4': ipv4,
184
            'ipv6': ipv6,
185
            'firewall_profile': firewall_profile,
186
            'state': 'ACTIVE'}
187

    
188
        new_nics.append(nic)
189
    return new_nics
190

    
191

    
192
def nics_changed(old_nics, new_nics):
193
    """Return True if NICs have changed in any way."""
194
    if len(old_nics) != len(new_nics):
195
        return True
196
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
197
    for old_nic, new_nic in zip(old_nics, new_nics):
198
        for field in fields:
199
            if getattr(old_nic, field) != new_nic[field]:
200
                return True
201
    return False
202

    
203

    
204
def release_instance_nics(vm):
205
    for nic in vm.nics.all():
206
        net = nic.network
207
        if nic.ipv4:
208
            net.release_address(nic.ipv4)
209
        nic.delete()
210
        net.save()
211

    
212

    
213
@transaction.commit_on_success
214
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
215
    if status not in [x[0] for x in BACKEND_STATUSES]:
216
        raise Network.InvalidBackendMsgError(opcode, status)
217

    
218
    back_network.backendjobid = jobid
219
    back_network.backendjobstatus = status
220
    back_network.backendopcode = opcode
221
    back_network.backendlogmsg = logmsg
222

    
223
    network = back_network.network
224

    
225
    # Notifications of success change the operating state
226
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
227
    if status == 'success' and state_for_success is not None:
228
        back_network.operstate = state_for_success
229

    
230
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
231
        back_network.operstate = 'ERROR'
232
        back_network.backendtime = etime
233

    
234
    if opcode == 'OP_NETWORK_REMOVE':
235
        if (status == 'success' or
236
           (status == 'error' and (back_network.operstate == 'ERROR' or
237
                                   network.action == 'DESTROY'))):
238
            back_network.operstate = state_for_success
239
            back_network.deleted = True
240
            back_network.backendtime = etime
241

    
242
    if status == 'success':
243
        back_network.backendtime = etime
244
    back_network.save()
245
    # Also you must update the state of the Network!!
246
    update_network_state(network)
247

    
248

    
249
def update_network_state(network):
250
    """Update the state of a Network based on BackendNetwork states.
251

252
    Update the state of a Network based on the operstate of the networks in the
253
    backends that network exists.
254

255
    The state of the network is:
256
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
257
    * DELETED: If it is is 'DELETED' in all backends that have been created.
258

259
    This function also releases the resources (MAC prefix or Bridge) and the
260
    quotas for the network.
261

262
    """
263
    if network.deleted:
264
        # Network has already been deleted. Just assert that state is also
265
        # DELETED
266
        if not network.state == "DELETED":
267
            network.state = "DELETED"
268
            network.save()
269
        return
270

    
271
    backend_states = [s.operstate for s in network.backend_networks.all()]
272
    if not backend_states and network.action != "DESTROY":
273
        if network.state != "ACTIVE":
274
            network.state = "ACTIVE"
275
            network.save()
276
            return
277

    
278
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
279
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
280
                     "DELETED")
281

    
282
    # Release the resources on the deletion of the Network
283
    if deleted:
284
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
285
                 network.id, network.mac_prefix, network.link)
286
        network.deleted = True
287
        network.state = "DELETED"
288
        if network.mac_prefix:
289
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
290
                release_resource(res_type="mac_prefix",
291
                                 value=network.mac_prefix)
292
        if network.link:
293
            if network.FLAVORS[network.flavor]["link"] == "pool":
294
                release_resource(res_type="bridge", value=network.link)
295

    
296
        # Issue commission
297
        if network.userid:
298
            quotas.issue_and_accept_commission(network, delete=True)
299
        elif not network.public:
300
            log.warning("Network %s does not have an owner!", network.id)
301
    network.save()
302

    
303

    
304
@transaction.commit_on_success
305
def process_network_modify(back_network, etime, jobid, opcode, status,
306
                           add_reserved_ips, remove_reserved_ips):
307
    assert (opcode == "OP_NETWORK_SET_PARAMS")
308
    if status not in [x[0] for x in BACKEND_STATUSES]:
309
        raise Network.InvalidBackendMsgError(opcode, status)
310

    
311
    back_network.backendjobid = jobid
312
    back_network.backendjobstatus = status
313
    back_network.opcode = opcode
314

    
315
    if add_reserved_ips or remove_reserved_ips:
316
        net = back_network.network
317
        pool = net.get_pool()
318
        if add_reserved_ips:
319
            for ip in add_reserved_ips:
320
                pool.reserve(ip, external=True)
321
        if remove_reserved_ips:
322
            for ip in remove_reserved_ips:
323
                pool.put(ip, external=True)
324
        pool.save()
325

    
326
    if status == 'success':
327
        back_network.backendtime = etime
328
    back_network.save()
329

    
330

    
331
@transaction.commit_on_success
332
def process_create_progress(vm, etime, progress):
333

    
334
    percentage = int(progress)
335

    
336
    # The percentage may exceed 100%, due to the way
337
    # snf-image:copy-progress tracks bytes read by image handling processes
338
    percentage = 100 if percentage > 100 else percentage
339
    if percentage < 0:
340
        raise ValueError("Percentage cannot be negative")
341

    
342
    # FIXME: log a warning here, see #1033
343
#   if last_update > percentage:
344
#       raise ValueError("Build percentage should increase monotonically " \
345
#                        "(old = %d, new = %d)" % (last_update, percentage))
346

    
347
    # This assumes that no message of type 'ganeti-create-progress' is going to
348
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
349
    # the instance is STARTED.  What if the two messages are processed by two
350
    # separate dispatcher threads, and the 'ganeti-op-status' message for
351
    # successful creation gets processed before the 'ganeti-create-progress'
352
    # message? [vkoukis]
353
    #
354
    #if not vm.operstate == 'BUILD':
355
    #    raise VirtualMachine.IllegalState("VM is not in building state")
356

    
357
    vm.buildpercentage = percentage
358
    vm.backendtime = etime
359
    vm.save()
360

    
361

    
362
@transaction.commit_on_success
363
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
364
                               details=None):
365
    """
366
    Create virtual machine instance diagnostic entry.
367

368
    :param vm: VirtualMachine instance to create diagnostic for.
369
    :param message: Diagnostic message.
370
    :param source: Diagnostic source identifier (e.g. image-helper).
371
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
372
    :param etime: The time the message occured (if available).
373
    :param details: Additional details or debug information.
374
    """
375
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
376
                                                   source_date=etime,
377
                                                   message=message,
378
                                                   details=details)
379

    
380

    
381
def create_instance(vm, public_nic, flavor, image):
382
    """`image` is a dictionary which should contain the keys:
383
            'backend_id', 'format' and 'metadata'
384

385
        metadata value should be a dictionary.
386
    """
387

    
388
    # Handle arguments to CreateInstance() as a dictionary,
389
    # initialize it based on a deployment-specific value.
390
    # This enables the administrator to override deployment-specific
391
    # arguments, such as the disk template to use, name of os provider
392
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
393
    #
394
    kw = vm.backend.get_create_params()
395
    kw['mode'] = 'create'
396
    kw['name'] = vm.backend_vm_id
397
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
398

    
399
    kw['disk_template'] = flavor.disk_template
400
    kw['disks'] = [{"size": flavor.disk * 1024}]
401
    provider = flavor.disk_provider
402
    if provider:
403
        kw['disks'][0]['provider'] = provider
404
        kw['disks'][0]['origin'] = flavor.disk_origin
405

    
406
    kw['nics'] = [public_nic]
407
    if vm.backend.use_hotplug():
408
        kw['hotplug'] = True
409
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
410
    # kw['os'] = settings.GANETI_OS_PROVIDER
411
    kw['ip_check'] = False
412
    kw['name_check'] = False
413

    
414
    # Do not specific a node explicitly, have
415
    # Ganeti use an iallocator instead
416
    #kw['pnode'] = rapi.GetNodes()[0]
417

    
418
    kw['dry_run'] = settings.TEST
419

    
420
    kw['beparams'] = {
421
        'auto_balance': True,
422
        'vcpus': flavor.cpu,
423
        'memory': flavor.ram}
424

    
425
    kw['osparams'] = {
426
        'config_url': vm.config_url,
427
        # Store image id and format to Ganeti
428
        'img_id': image['backend_id'],
429
        'img_format': image['format']}
430

    
431
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
432
    # kw['hvparams'] = dict(serial_console=False)
433

    
434
    log.debug("Creating instance %s", utils.hide_pass(kw))
435
    with pooled_rapi_client(vm) as client:
436
        return client.CreateInstance(**kw)
437

    
438

    
439
def delete_instance(vm):
440
    with pooled_rapi_client(vm) as client:
441
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
442

    
443

    
444
def reboot_instance(vm, reboot_type):
445
    assert reboot_type in ('soft', 'hard')
446
    with pooled_rapi_client(vm) as client:
447
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
448
                                     dry_run=settings.TEST)
449

    
450

    
451
def startup_instance(vm):
452
    with pooled_rapi_client(vm) as client:
453
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
454

    
455

    
456
def shutdown_instance(vm):
457
    with pooled_rapi_client(vm) as client:
458
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
459

    
460

    
461
def get_instance_console(vm):
462
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
463
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
464
    # useless (see #783).
465
    #
466
    # Until this is fixed on the Ganeti side, construct a console info reply
467
    # directly.
468
    #
469
    # WARNING: This assumes that VNC runs on port network_port on
470
    #          the instance's primary node, and is probably
471
    #          hypervisor-specific.
472
    #
473
    log.debug("Getting console for vm %s", vm)
474

    
475
    console = {}
476
    console['kind'] = 'vnc'
477

    
478
    with pooled_rapi_client(vm) as client:
479
        i = client.GetInstance(vm.backend_vm_id)
480

    
481
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
482
        raise Exception("hv parameter serial_console cannot be true")
483
    console['host'] = i['pnode']
484
    console['port'] = i['network_port']
485

    
486
    return console
487

    
488

    
489
def get_instance_info(vm):
490
    with pooled_rapi_client(vm) as client:
491
        return client.GetInstanceInfo(vm.backend_vm_id)
492

    
493

    
494
def create_network(network, backend, connect=True):
495
    """Create a network in a Ganeti backend"""
496
    log.debug("Creating network %s in backend %s", network, backend)
497

    
498
    job_id = _create_network(network, backend)
499

    
500
    if connect:
501
        job_ids = connect_network(network, backend, depends=[job_id])
502
        return job_ids
503
    else:
504
        return [job_id]
505

    
506

    
507
def _create_network(network, backend):
508
    """Create a network."""
509

    
510
    network_type = network.public and 'public' or 'private'
511

    
512
    tags = network.backend_tag
513
    if network.dhcp:
514
        tags.append('nfdhcpd')
515

    
516
    if network.public:
517
        conflicts_check = True
518
    else:
519
        conflicts_check = False
520

    
521
    try:
522
        bn = BackendNetwork.objects.get(network=network, backend=backend)
523
        mac_prefix = bn.mac_prefix
524
    except BackendNetwork.DoesNotExist:
525
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
526
                        " does not exist" % (network.id, backend.id))
527

    
528
    with pooled_rapi_client(backend) as client:
529
        return client.CreateNetwork(network_name=network.backend_id,
530
                                    network=network.subnet,
531
                                    network6=network.subnet6,
532
                                    gateway=network.gateway,
533
                                    gateway6=network.gateway6,
534
                                    network_type=network_type,
535
                                    mac_prefix=mac_prefix,
536
                                    conflicts_check=conflicts_check,
537
                                    tags=tags)
538

    
539

    
540
def connect_network(network, backend, depends=[], group=None):
541
    """Connect a network to nodegroups."""
542
    log.debug("Connecting network %s to backend %s", network, backend)
543

    
544
    if network.public:
545
        conflicts_check = True
546
    else:
547
        conflicts_check = False
548

    
549
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
550
    with pooled_rapi_client(backend) as client:
551
        groups = [group] if group is not None else client.GetGroups()
552
        job_ids = []
553
        for group in groups:
554
            job_id = client.ConnectNetwork(network.backend_id, group,
555
                                           network.mode, network.link,
556
                                           conflicts_check,
557
                                           depends=depends)
558
            job_ids.append(job_id)
559
    return job_ids
560

    
561

    
562
def delete_network(network, backend, disconnect=True):
563
    log.debug("Deleting network %s from backend %s", network, backend)
564

    
565
    depends = []
566
    if disconnect:
567
        depends = disconnect_network(network, backend)
568
    _delete_network(network, backend, depends=depends)
569

    
570

    
571
def _delete_network(network, backend, depends=[]):
572
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
573
    with pooled_rapi_client(backend) as client:
574
        return client.DeleteNetwork(network.backend_id, depends)
575

    
576

    
577
def disconnect_network(network, backend, group=None):
578
    log.debug("Disconnecting network %s to backend %s", network, backend)
579

    
580
    with pooled_rapi_client(backend) as client:
581
        groups = [group] if group is not None else client.GetGroups()
582
        job_ids = []
583
        for group in groups:
584
            job_id = client.DisconnectNetwork(network.backend_id, group)
585
            job_ids.append(job_id)
586
    return job_ids
587

    
588

    
589
def connect_to_network(vm, network, address=None):
590
    backend = vm.backend
591
    network = Network.objects.select_for_update().get(id=network.id)
592
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
593
                                                         network=network)
594
    depend_jobs = []
595
    if bnet.operstate != "ACTIVE":
596
        depend_jobs = create_network(network, backend, connect=True)
597

    
598
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
599

    
600
    nic = {'ip': address, 'network': network.backend_id}
601

    
602
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
603

    
604
    with pooled_rapi_client(vm) as client:
605
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
606
                                     hotplug=vm.backend.use_hotplug(),
607
                                     depends=depends,
608
                                     dry_run=settings.TEST)
609

    
610

    
611
def disconnect_from_network(vm, nic):
612
    op = [('remove', nic.index, {})]
613

    
614
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
615

    
616
    with pooled_rapi_client(vm) as client:
617
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
618
                                     hotplug=vm.backend.use_hotplug(),
619
                                     dry_run=settings.TEST)
620

    
621

    
622
def set_firewall_profile(vm, profile):
623
    try:
624
        tag = _firewall_tags[profile]
625
    except KeyError:
626
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
627

    
628
    log.debug("Setting tag of VM %s to %s", vm, profile)
629

    
630
    with pooled_rapi_client(vm) as client:
631
        # Delete all firewall tags
632
        for t in _firewall_tags.values():
633
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
634
                                      dry_run=settings.TEST)
635

    
636
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
637

    
638
        # XXX NOP ModifyInstance call to force process_net_status to run
639
        # on the dispatcher
640
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
641
        client.ModifyInstance(vm.backend_vm_id,
642
                              os_name=os_name)
643

    
644

    
645
def get_ganeti_instances(backend=None, bulk=False):
646
    instances = []
647
    for backend in get_backends(backend):
648
        with pooled_rapi_client(backend) as client:
649
            instances.append(client.GetInstances(bulk=bulk))
650

    
651
    return reduce(list.__add__, instances, [])
652

    
653

    
654
def get_ganeti_nodes(backend=None, bulk=False):
655
    nodes = []
656
    for backend in get_backends(backend):
657
        with pooled_rapi_client(backend) as client:
658
            nodes.append(client.GetNodes(bulk=bulk))
659

    
660
    return reduce(list.__add__, nodes, [])
661

    
662

    
663
def get_ganeti_jobs(backend=None, bulk=False):
664
    jobs = []
665
    for backend in get_backends(backend):
666
        with pooled_rapi_client(backend) as client:
667
            jobs.append(client.GetJobs(bulk=bulk))
668
    return reduce(list.__add__, jobs, [])
669

    
670
##
671
##
672
##
673

    
674

    
675
def get_backends(backend=None):
676
    if backend:
677
        if backend.offline:
678
            return []
679
        return [backend]
680
    return Backend.objects.filter(offline=False)
681

    
682

    
683
def get_physical_resources(backend):
684
    """ Get the physical resources of a backend.
685

686
    Get the resources of a backend as reported by the backend (not the db).
687

688
    """
689
    nodes = get_ganeti_nodes(backend, bulk=True)
690
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
691
    res = {}
692
    for a in attr:
693
        res[a] = 0
694
    for n in nodes:
695
        # Filter out drained, offline and not vm_capable nodes since they will
696
        # not take part in the vm allocation process
697
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
698
        if can_host_vms and n['cnodes']:
699
            for a in attr:
700
                res[a] += int(n[a])
701
    return res
702

    
703

    
704
def update_resources(backend, resources=None):
705
    """ Update the state of the backend resources in db.
706

707
    """
708

    
709
    if not resources:
710
        resources = get_physical_resources(backend)
711

    
712
    backend.mfree = resources['mfree']
713
    backend.mtotal = resources['mtotal']
714
    backend.dfree = resources['dfree']
715
    backend.dtotal = resources['dtotal']
716
    backend.pinst_cnt = resources['pinst_cnt']
717
    backend.ctotal = resources['ctotal']
718
    backend.updated = datetime.now()
719
    backend.save()
720

    
721

    
722
def get_memory_from_instances(backend):
723
    """ Get the memory that is used from instances.
724

725
    Get the used memory of a backend. Note: This is different for
726
    the real memory used, due to kvm's memory de-duplication.
727

728
    """
729
    with pooled_rapi_client(backend) as client:
730
        instances = client.GetInstances(bulk=True)
731
    mem = 0
732
    for i in instances:
733
        mem += i['oper_ram']
734
    return mem
735

    
736
##
737
## Synchronized operations for reconciliation
738
##
739

    
740

    
741
def create_network_synced(network, backend):
742
    result = _create_network_synced(network, backend)
743
    if result[0] != 'success':
744
        return result
745
    result = connect_network_synced(network, backend)
746
    return result
747

    
748

    
749
def _create_network_synced(network, backend):
750
    with pooled_rapi_client(backend) as client:
751
        job = _create_network(network, backend)
752
        result = wait_for_job(client, job)
753
    return result
754

    
755

    
756
def connect_network_synced(network, backend):
757
    with pooled_rapi_client(backend) as client:
758
        for group in client.GetGroups():
759
            job = client.ConnectNetwork(network.backend_id, group,
760
                                        network.mode, network.link)
761
            result = wait_for_job(client, job)
762
            if result[0] != 'success':
763
                return result
764

    
765
    return result
766

    
767

    
768
def wait_for_job(client, jobid):
769
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
770
    status = result['job_info'][0]
771
    while status not in ['success', 'error', 'cancel']:
772
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
773
                                         [result], None)
774
        status = result['job_info'][0]
775

    
776
    if status == 'success':
777
        return (status, None)
778
    else:
779
        error = result['job_info'][1]
780
        return (status, error)