Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 40d53b77

History | View | Annotate | Download (26.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, VirtualMachineDiagnostic)
43
from synnefo.logic import utils
44
from synnefo import quotas
45
from synnefo.api.util import release_resource
46
from synnefo.util.mac2eui64 import mac2eui64
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
@transaction.commit_on_success
61
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None):
62
    """Process a job progress notification from the backend
63

64
    Process an incoming message from the backend (currently Ganeti).
65
    Job notifications with a terminating status (sucess, error, or canceled),
66
    also update the operating state of the VM.
67

68
    """
69
    # See #1492, #1031, #1111 why this line has been removed
70
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
71
    if status not in [x[0] for x in BACKEND_STATUSES]:
72
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
73

    
74
    vm.backendjobid = jobid
75
    vm.backendjobstatus = status
76
    vm.backendopcode = opcode
77
    vm.backendlogmsg = logmsg
78

    
79
    # Notifications of success change the operating state
80
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
81
    if status == 'success' and state_for_success is not None:
82
        vm.operstate = state_for_success
83

    
84
    # Update the NICs of the VM
85
    if status == "success" and nics is not None:
86
        _process_net_status(vm, etime, nics)
87

    
88
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
89
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
90
        vm.operstate = 'ERROR'
91
        vm.backendtime = etime
92
    elif opcode == 'OP_INSTANCE_REMOVE':
93
        # Set the deleted flag explicitly, cater for admin-initiated removals
94
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
95
        # when no instance exists at the Ganeti backend.
96
        # See ticket #799 for all the details.
97
        #
98
        if status == 'success' or (status == 'error' and
99
                                   vm.operstate == 'ERROR'):
100
            _process_net_status(vm, etime, nics=[])
101
            vm.deleted = True
102
            vm.operstate = state_for_success
103
            vm.backendtime = etime
104
            # Issue and accept commission to Quotaholder
105
            quotas.issue_and_accept_commission(vm, delete=True)
106

    
107
    # Update backendtime only for jobs that have been successfully completed,
108
    # since only these jobs update the state of the VM. Else a "race condition"
109
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
110
    # before an error job and messages arrive in reversed order.
111
    if status == 'success':
112
        vm.backendtime = etime
113

    
114
    vm.save()
115

    
116

    
117
@transaction.commit_on_success
118
def process_net_status(vm, etime, nics):
119
    """Wrap _process_net_status inside transaction."""
120
    _process_net_status(vm, etime, nics)
121

    
122

    
123
def _process_net_status(vm, etime, nics):
124
    """Process a net status notification from the backend
125

126
    Process an incoming message from the Ganeti backend,
127
    detailing the NIC configuration of a VM instance.
128

129
    Update the state of the VM in the DB accordingly.
130
    """
131

    
132
    ganeti_nics = process_ganeti_nics(nics)
133
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
134
        log.debug("NICs for VM %s have not changed", vm)
135
        return
136

    
137
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
138
    # guarantee that no deadlock will occur with Backend allocator.
139
    Backend.objects.select_for_update().get(id=vm.backend_id)
140

    
141
    release_instance_nics(vm)
142

    
143
    for nic in ganeti_nics:
144
        ipv4 = nic.get('ipv4', '')
145
        net = nic['network']
146
        if ipv4:
147
            net.reserve_address(ipv4)
148

    
149
        nic['dirty'] = False
150
        vm.nics.create(**nic)
151
        # Dummy save the network, because UI uses changed-since for VMs
152
        # and Networks in order to show the VM NICs
153
        net.save()
154

    
155
    vm.backendtime = etime
156
    vm.save()
157

    
158

    
159
def process_ganeti_nics(ganeti_nics):
160
    """Process NIC dict from ganeti hooks."""
161
    new_nics = []
162
    for i, new_nic in enumerate(ganeti_nics):
163
        network = new_nic.get('network', '')
164
        n = str(network)
165
        pk = utils.id_from_network_name(n)
166

    
167
        net = Network.objects.get(pk=pk)
168

    
169
        # Get the new nic info
170
        mac = new_nic.get('mac', '')
171
        ipv4 = new_nic.get('ip', '')
172
        if net.subnet6:
173
            ipv6 = mac2eui64(mac, net.subnet6)
174
        else:
175
            ipv6 = ''
176

    
177
        firewall = new_nic.get('firewall', '')
178
        firewall_profile = _reverse_tags.get(firewall, '')
179
        if not firewall_profile and net.public:
180
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
181

    
182
        nic = {
183
            'index': i,
184
            'network': net,
185
            'mac': mac,
186
            'ipv4': ipv4,
187
            'ipv6': ipv6,
188
            'firewall_profile': firewall_profile,
189
            'state': 'ACTIVE'}
190

    
191
        new_nics.append(nic)
192
    return new_nics
193

    
194

    
195
def nics_changed(old_nics, new_nics):
196
    """Return True if NICs have changed in any way."""
197
    if len(old_nics) != len(new_nics):
198
        return True
199
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
200
    for old_nic, new_nic in zip(old_nics, new_nics):
201
        for field in fields:
202
            if getattr(old_nic, field) != new_nic[field]:
203
                return True
204
    return False
205

    
206

    
207
def release_instance_nics(vm):
208
    for nic in vm.nics.all():
209
        net = nic.network
210
        if nic.ipv4:
211
            net.release_address(nic.ipv4)
212
        nic.delete()
213
        net.save()
214

    
215

    
216
@transaction.commit_on_success
217
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
218
    if status not in [x[0] for x in BACKEND_STATUSES]:
219
        raise Network.InvalidBackendMsgError(opcode, status)
220

    
221
    back_network.backendjobid = jobid
222
    back_network.backendjobstatus = status
223
    back_network.backendopcode = opcode
224
    back_network.backendlogmsg = logmsg
225

    
226
    # Notifications of success change the operating state
227
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
228
    if status == 'success' and state_for_success is not None:
229
        back_network.operstate = state_for_success
230

    
231
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
232
        back_network.operstate = 'ERROR'
233
        back_network.backendtime = etime
234

    
235
    if opcode == 'OP_NETWORK_REMOVE':
236
        if status == 'success' or (status == 'error' and
237
                                   back_network.operstate == 'ERROR'):
238
            back_network.operstate = state_for_success
239
            back_network.deleted = True
240
            back_network.backendtime = etime
241

    
242
    if status == 'success':
243
        back_network.backendtime = etime
244
    back_network.save()
245
    # Also you must update the state of the Network!!
246
    update_network_state(back_network.network)
247

    
248

    
249
def update_network_state(network):
250
    """Update the state of a Network based on BackendNetwork states.
251

252
    Update the state of a Network based on the operstate of the networks in the
253
    backends that network exists.
254

255
    The state of the network is:
256
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
257
    * DELETED: If it is is 'DELETED' in all backends that have been created.
258

259
    This function also releases the resources (MAC prefix or Bridge) and the
260
    quotas for the network.
261

262
    """
263
    if network.deleted:
264
        # Network has already been deleted. Just assert that state is also
265
        # DELETED
266
        if not network.state == "DELETED":
267
            network.state = "DELETED"
268
            network.save()
269
        return
270

    
271
    backend_states = [s.operstate for s in network.backend_networks.all()]
272
    if not backend_states and network.action != "DESTROY":
273
        if network.state != "ACTIVE":
274
            network.state = "ACTIVE"
275
            network.save()
276
            return
277

    
278
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
279
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
280
                     "DELETED")
281

    
282
    # Release the resources on the deletion of the Network
283
    if deleted:
284
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
285
                 network.id, network.mac_prefix, network.link)
286
        network.deleted = True
287
        network.state = "DELETED"
288
        if network.mac_prefix:
289
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
290
                release_resource(res_type="mac_prefix",
291
                                 value=network.mac_prefix)
292
        if network.link:
293
            if network.FLAVORS[network.flavor]["link"] == "pool":
294
                release_resource(res_type="bridge", value=network.link)
295

    
296
        # Issue commission
297
        if network.userid:
298
            quotas.issue_and_accept_commission(network, delete=True)
299
        elif not network.public:
300
            log.warning("Network %s does not have an owner!", network.id)
301
    network.save()
302

    
303

    
304
@transaction.commit_on_success
305
def process_network_modify(back_network, etime, jobid, opcode, status,
306
                           add_reserved_ips, remove_reserved_ips):
307
    assert (opcode == "OP_NETWORK_SET_PARAMS")
308
    if status not in [x[0] for x in BACKEND_STATUSES]:
309
        raise Network.InvalidBackendMsgError(opcode, status)
310

    
311
    back_network.backendjobid = jobid
312
    back_network.backendjobstatus = status
313
    back_network.opcode = opcode
314

    
315
    if add_reserved_ips or remove_reserved_ips:
316
        net = back_network.network
317
        pool = net.get_pool()
318
        if add_reserved_ips:
319
            for ip in add_reserved_ips:
320
                pool.reserve(ip, external=True)
321
        if remove_reserved_ips:
322
            for ip in remove_reserved_ips:
323
                pool.put(ip, external=True)
324
        pool.save()
325

    
326
    if status == 'success':
327
        back_network.backendtime = etime
328
    back_network.save()
329

    
330

    
331
@transaction.commit_on_success
332
def process_create_progress(vm, etime, progress):
333

    
334
    percentage = int(progress)
335

    
336
    # The percentage may exceed 100%, due to the way
337
    # snf-image:copy-progress tracks bytes read by image handling processes
338
    percentage = 100 if percentage > 100 else percentage
339
    if percentage < 0:
340
        raise ValueError("Percentage cannot be negative")
341

    
342
    # FIXME: log a warning here, see #1033
343
#   if last_update > percentage:
344
#       raise ValueError("Build percentage should increase monotonically " \
345
#                        "(old = %d, new = %d)" % (last_update, percentage))
346

    
347
    # This assumes that no message of type 'ganeti-create-progress' is going to
348
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
349
    # the instance is STARTED.  What if the two messages are processed by two
350
    # separate dispatcher threads, and the 'ganeti-op-status' message for
351
    # successful creation gets processed before the 'ganeti-create-progress'
352
    # message? [vkoukis]
353
    #
354
    #if not vm.operstate == 'BUILD':
355
    #    raise VirtualMachine.IllegalState("VM is not in building state")
356

    
357
    vm.buildpercentage = percentage
358
    vm.backendtime = etime
359
    vm.save()
360

    
361

    
362
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
363
                               details=None):
364
    """
365
    Create virtual machine instance diagnostic entry.
366

367
    :param vm: VirtualMachine instance to create diagnostic for.
368
    :param message: Diagnostic message.
369
    :param source: Diagnostic source identifier (e.g. image-helper).
370
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
371
    :param etime: The time the message occured (if available).
372
    :param details: Additional details or debug information.
373
    """
374
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
375
                                                   source_date=etime,
376
                                                   message=message,
377
                                                   details=details)
378

    
379

    
380
def create_instance(vm, public_nic, flavor, image):
381
    """`image` is a dictionary which should contain the keys:
382
            'backend_id', 'format' and 'metadata'
383

384
        metadata value should be a dictionary.
385
    """
386

    
387
    # Handle arguments to CreateInstance() as a dictionary,
388
    # initialize it based on a deployment-specific value.
389
    # This enables the administrator to override deployment-specific
390
    # arguments, such as the disk template to use, name of os provider
391
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
392
    #
393
    kw = vm.backend.get_create_params()
394
    kw['mode'] = 'create'
395
    kw['name'] = vm.backend_vm_id
396
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
397

    
398
    kw['disk_template'] = flavor.disk_template
399
    kw['disks'] = [{"size": flavor.disk * 1024}]
400
    provider = flavor.disk_provider
401
    if provider:
402
        kw['disks'][0]['provider'] = provider
403
        kw['disks'][0]['origin'] = flavor.disk_origin
404

    
405
    kw['nics'] = [public_nic]
406
    if vm.backend.use_hotplug():
407
        kw['hotplug'] = True
408
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
409
    # kw['os'] = settings.GANETI_OS_PROVIDER
410
    kw['ip_check'] = False
411
    kw['name_check'] = False
412

    
413
    # Do not specific a node explicitly, have
414
    # Ganeti use an iallocator instead
415
    #kw['pnode'] = rapi.GetNodes()[0]
416

    
417
    kw['dry_run'] = settings.TEST
418

    
419
    kw['beparams'] = {
420
        'auto_balance': True,
421
        'vcpus': flavor.cpu,
422
        'memory': flavor.ram}
423

    
424
    kw['osparams'] = {
425
        'config_url': vm.config_url,
426
        # Store image id and format to Ganeti
427
        'img_id': image['backend_id'],
428
        'img_format': image['format']}
429

    
430
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
431
    # kw['hvparams'] = dict(serial_console=False)
432

    
433
    log.debug("Creating instance %s", utils.hide_pass(kw))
434
    with pooled_rapi_client(vm) as client:
435
        return client.CreateInstance(**kw)
436

    
437

    
438
def delete_instance(vm):
439
    with pooled_rapi_client(vm) as client:
440
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
441

    
442

    
443
def reboot_instance(vm, reboot_type):
444
    assert reboot_type in ('soft', 'hard')
445
    with pooled_rapi_client(vm) as client:
446
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
447
                                     dry_run=settings.TEST)
448

    
449

    
450
def startup_instance(vm):
451
    with pooled_rapi_client(vm) as client:
452
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
453

    
454

    
455
def shutdown_instance(vm):
456
    with pooled_rapi_client(vm) as client:
457
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
458

    
459

    
460
def get_instance_console(vm):
461
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
462
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
463
    # useless (see #783).
464
    #
465
    # Until this is fixed on the Ganeti side, construct a console info reply
466
    # directly.
467
    #
468
    # WARNING: This assumes that VNC runs on port network_port on
469
    #          the instance's primary node, and is probably
470
    #          hypervisor-specific.
471
    #
472
    log.debug("Getting console for vm %s", vm)
473

    
474
    console = {}
475
    console['kind'] = 'vnc'
476

    
477
    with pooled_rapi_client(vm) as client:
478
        i = client.GetInstance(vm.backend_vm_id)
479

    
480
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
481
        raise Exception("hv parameter serial_console cannot be true")
482
    console['host'] = i['pnode']
483
    console['port'] = i['network_port']
484

    
485
    return console
486

    
487

    
488
def get_instance_info(vm):
489
    with pooled_rapi_client(vm) as client:
490
        return client.GetInstanceInfo(vm.backend_vm_id)
491

    
492

    
493
def create_network(network, backend, connect=True):
494
    """Create a network in a Ganeti backend"""
495
    log.debug("Creating network %s in backend %s", network, backend)
496

    
497
    job_id = _create_network(network, backend)
498

    
499
    if connect:
500
        job_ids = connect_network(network, backend, depends=[job_id])
501
        return job_ids
502
    else:
503
        return [job_id]
504

    
505

    
506
def _create_network(network, backend):
507
    """Create a network."""
508

    
509
    network_type = network.public and 'public' or 'private'
510

    
511
    tags = network.backend_tag
512
    if network.dhcp:
513
        tags.append('nfdhcpd')
514

    
515
    if network.public:
516
        conflicts_check = True
517
    else:
518
        conflicts_check = False
519

    
520
    try:
521
        bn = BackendNetwork.objects.get(network=network, backend=backend)
522
        mac_prefix = bn.mac_prefix
523
    except BackendNetwork.DoesNotExist:
524
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
525
                        " does not exist" % (network.id, backend.id))
526

    
527
    with pooled_rapi_client(backend) as client:
528
        return client.CreateNetwork(network_name=network.backend_id,
529
                                    network=network.subnet,
530
                                    network6=network.subnet6,
531
                                    gateway=network.gateway,
532
                                    gateway6=network.gateway6,
533
                                    network_type=network_type,
534
                                    mac_prefix=mac_prefix,
535
                                    conflicts_check=conflicts_check,
536
                                    tags=tags)
537

    
538

    
539
def connect_network(network, backend, depends=[], group=None):
540
    """Connect a network to nodegroups."""
541
    log.debug("Connecting network %s to backend %s", network, backend)
542

    
543
    if network.public:
544
        conflicts_check = True
545
    else:
546
        conflicts_check = False
547

    
548
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
549
    with pooled_rapi_client(backend) as client:
550
        groups = [group] if group is not None else client.GetGroups()
551
        job_ids = []
552
        for group in groups:
553
            job_id = client.ConnectNetwork(network.backend_id, group,
554
                                           network.mode, network.link,
555
                                           conflicts_check,
556
                                           depends=depends)
557
            job_ids.append(job_id)
558
    return job_ids
559

    
560

    
561
def delete_network(network, backend, disconnect=True):
562
    log.debug("Deleting network %s from backend %s", network, backend)
563

    
564
    depends = []
565
    if disconnect:
566
        depends = disconnect_network(network, backend)
567
    _delete_network(network, backend, depends=depends)
568

    
569

    
570
def _delete_network(network, backend, depends=[]):
571
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
572
    with pooled_rapi_client(backend) as client:
573
        return client.DeleteNetwork(network.backend_id, depends)
574

    
575

    
576
def disconnect_network(network, backend, group=None):
577
    log.debug("Disconnecting network %s to backend %s", network, backend)
578

    
579
    with pooled_rapi_client(backend) as client:
580
        groups = [group] if group is not None else client.GetGroups()
581
        job_ids = []
582
        for group in groups:
583
            job_id = client.DisconnectNetwork(network.backend_id, group)
584
            job_ids.append(job_id)
585
    return job_ids
586

    
587

    
588
def connect_to_network(vm, network, address=None):
589
    backend = vm.backend
590
    network = Network.objects.select_for_update().get(id=network.id)
591
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
592
                                                         network=network)
593
    depend_jobs = []
594
    if bnet.operstate != "ACTIVE":
595
        depend_jobs = create_network(network, backend, connect=True)
596

    
597
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
598

    
599
    nic = {'ip': address, 'network': network.backend_id}
600

    
601
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
602

    
603
    with pooled_rapi_client(vm) as client:
604
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
605
                                     hotplug=vm.backend.use_hotplug(),
606
                                     depends=depends,
607
                                     dry_run=settings.TEST)
608

    
609

    
610
def disconnect_from_network(vm, nic):
611
    op = [('remove', nic.index, {})]
612

    
613
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
614

    
615
    with pooled_rapi_client(vm) as client:
616
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
617
                                     hotplug=vm.backend.use_hotplug(),
618
                                     dry_run=settings.TEST)
619

    
620

    
621
def set_firewall_profile(vm, profile):
622
    try:
623
        tag = _firewall_tags[profile]
624
    except KeyError:
625
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
626

    
627
    log.debug("Setting tag of VM %s to %s", vm, profile)
628

    
629
    with pooled_rapi_client(vm) as client:
630
        # Delete all firewall tags
631
        for t in _firewall_tags.values():
632
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
633
                                      dry_run=settings.TEST)
634

    
635
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
636

    
637
        # XXX NOP ModifyInstance call to force process_net_status to run
638
        # on the dispatcher
639
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
640
        client.ModifyInstance(vm.backend_vm_id,
641
                              os_name=os_name)
642

    
643

    
644
def get_ganeti_instances(backend=None, bulk=False):
645
    instances = []
646
    for backend in get_backends(backend):
647
        with pooled_rapi_client(backend) as client:
648
            instances.append(client.GetInstances(bulk=bulk))
649

    
650
    return reduce(list.__add__, instances, [])
651

    
652

    
653
def get_ganeti_nodes(backend=None, bulk=False):
654
    nodes = []
655
    for backend in get_backends(backend):
656
        with pooled_rapi_client(backend) as client:
657
            nodes.append(client.GetNodes(bulk=bulk))
658

    
659
    return reduce(list.__add__, nodes, [])
660

    
661

    
662
def get_ganeti_jobs(backend=None, bulk=False):
663
    jobs = []
664
    for backend in get_backends(backend):
665
        with pooled_rapi_client(backend) as client:
666
            jobs.append(client.GetJobs(bulk=bulk))
667
    return reduce(list.__add__, jobs, [])
668

    
669
##
670
##
671
##
672

    
673

    
674
def get_backends(backend=None):
675
    if backend:
676
        if backend.offline:
677
            return []
678
        return [backend]
679
    return Backend.objects.filter(offline=False)
680

    
681

    
682
def get_physical_resources(backend):
683
    """ Get the physical resources of a backend.
684

685
    Get the resources of a backend as reported by the backend (not the db).
686

687
    """
688
    nodes = get_ganeti_nodes(backend, bulk=True)
689
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
690
    res = {}
691
    for a in attr:
692
        res[a] = 0
693
    for n in nodes:
694
        # Filter out drained, offline and not vm_capable nodes since they will
695
        # not take part in the vm allocation process
696
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
697
        if can_host_vms and n['cnodes']:
698
            for a in attr:
699
                res[a] += int(n[a])
700
    return res
701

    
702

    
703
def update_resources(backend, resources=None):
704
    """ Update the state of the backend resources in db.
705

706
    """
707

    
708
    if not resources:
709
        resources = get_physical_resources(backend)
710

    
711
    backend.mfree = resources['mfree']
712
    backend.mtotal = resources['mtotal']
713
    backend.dfree = resources['dfree']
714
    backend.dtotal = resources['dtotal']
715
    backend.pinst_cnt = resources['pinst_cnt']
716
    backend.ctotal = resources['ctotal']
717
    backend.updated = datetime.now()
718
    backend.save()
719

    
720

    
721
def get_memory_from_instances(backend):
722
    """ Get the memory that is used from instances.
723

724
    Get the used memory of a backend. Note: This is different for
725
    the real memory used, due to kvm's memory de-duplication.
726

727
    """
728
    with pooled_rapi_client(backend) as client:
729
        instances = client.GetInstances(bulk=True)
730
    mem = 0
731
    for i in instances:
732
        mem += i['oper_ram']
733
    return mem
734

    
735
##
736
## Synchronized operations for reconciliation
737
##
738

    
739

    
740
def create_network_synced(network, backend):
741
    result = _create_network_synced(network, backend)
742
    if result[0] != 'success':
743
        return result
744
    result = connect_network_synced(network, backend)
745
    return result
746

    
747

    
748
def _create_network_synced(network, backend):
749
    with pooled_rapi_client(backend) as client:
750
        job = _create_network(network, backend)
751
        result = wait_for_job(client, job)
752
    return result
753

    
754

    
755
def connect_network_synced(network, backend):
756
    with pooled_rapi_client(backend) as client:
757
        for group in client.GetGroups():
758
            job = client.ConnectNetwork(network.backend_id, group,
759
                                        network.mode, network.link)
760
            result = wait_for_job(client, job)
761
            if result[0] != 'success':
762
                return result
763

    
764
    return result
765

    
766

    
767
def wait_for_job(client, jobid):
768
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
769
    status = result['job_info'][0]
770
    while status not in ['success', 'error', 'cancel']:
771
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
772
                                         [result], None)
773
        status = result['job_info'][0]
774

    
775
    if status == 'success':
776
        return (status, None)
777
    else:
778
        error = result['job_info'][1]
779
        return (status, error)