Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ cf2241c4

History | View | Annotate | Download (26.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35
import multiprocessing
36
import itertools
37

    
38
from django.conf import settings
39
from django.db import transaction
40
from datetime import datetime
41

    
42
from synnefo.db.models import (Backend, VirtualMachine, Network,
43
                               BackendNetwork, BACKEND_STATUSES,
44
                               pooled_rapi_client, VirtualMachineDiagnostic)
45
from synnefo.logic import utils
46
from synnefo import quotas
47
from synnefo.api.util import release_resource
48
from synnefo.util.mac2eui64 import mac2eui64
49

    
50
from logging import getLogger
51
log = getLogger(__name__)
52

    
53

    
54
_firewall_tags = {
55
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
56
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
57
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
58

    
59
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
60

    
61

    
62
@transaction.commit_on_success
63
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None):
64
    """Process a job progress notification from the backend
65

66
    Process an incoming message from the backend (currently Ganeti).
67
    Job notifications with a terminating status (sucess, error, or canceled),
68
    also update the operating state of the VM.
69

70
    """
71
    # See #1492, #1031, #1111 why this line has been removed
72
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
73
    if status not in [x[0] for x in BACKEND_STATUSES]:
74
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
75

    
76
    vm.backendjobid = jobid
77
    vm.backendjobstatus = status
78
    vm.backendopcode = opcode
79
    vm.backendlogmsg = logmsg
80

    
81
    # Notifications of success change the operating state
82
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
83
    if status == 'success' and state_for_success is not None:
84
        vm.operstate = state_for_success
85

    
86
    # Update the NICs of the VM
87
    if status == "success" and nics is not None:
88
        _process_net_status(vm, etime, nics)
89

    
90
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
91
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
92
        vm.operstate = 'ERROR'
93
        vm.backendtime = etime
94
    elif opcode == 'OP_INSTANCE_REMOVE':
95
        # Set the deleted flag explicitly, cater for admin-initiated removals
96
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
97
        # when no instance exists at the Ganeti backend.
98
        # See ticket #799 for all the details.
99
        #
100
        if status == 'success' or (status == 'error' and
101
                                   vm.operstate == 'ERROR'):
102
            _process_net_status(vm, etime, nics=[])
103
            vm.deleted = True
104
            vm.operstate = state_for_success
105
            vm.backendtime = etime
106
            # Issue and accept commission to Quotaholder
107
            quotas.issue_and_accept_commission(vm, delete=True)
108

    
109
    # Update backendtime only for jobs that have been successfully completed,
110
    # since only these jobs update the state of the VM. Else a "race condition"
111
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
112
    # before an error job and messages arrive in reversed order.
113
    if status == 'success':
114
        vm.backendtime = etime
115

    
116
    vm.save()
117

    
118

    
119
@transaction.commit_on_success
120
def process_net_status(vm, etime, nics):
121
    """Wrap _process_net_status inside transaction."""
122
    _process_net_status(vm, etime, nics)
123

    
124

    
125
def _process_net_status(vm, etime, nics):
126
    """Process a net status notification from the backend
127

128
    Process an incoming message from the Ganeti backend,
129
    detailing the NIC configuration of a VM instance.
130

131
    Update the state of the VM in the DB accordingly.
132
    """
133

    
134
    ganeti_nics = process_ganeti_nics(nics)
135
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
136
        log.debug("NICs for VM %s have not changed", vm)
137
        return
138

    
139
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
140
    # guarantee that no deadlock will occur with Backend allocator.
141
    Backend.objects.select_for_update().get(id=vm.backend_id)
142

    
143
    release_instance_nics(vm)
144

    
145
    for nic in ganeti_nics:
146
        ipv4 = nic.get('ipv4', '')
147
        net = nic['network']
148
        if ipv4:
149
            net.reserve_address(ipv4)
150

    
151
        nic['dirty'] = False
152
        vm.nics.create(**nic)
153
        # Dummy save the network, because UI uses changed-since for VMs
154
        # and Networks in order to show the VM NICs
155
        net.save()
156

    
157
    vm.backendtime = etime
158
    vm.save()
159

    
160

    
161
def process_ganeti_nics(ganeti_nics):
162
    """Process NIC dict from ganeti hooks."""
163
    new_nics = []
164
    for i, new_nic in enumerate(ganeti_nics):
165
        network = new_nic.get('network', '')
166
        n = str(network)
167
        pk = utils.id_from_network_name(n)
168

    
169
        net = Network.objects.get(pk=pk)
170

    
171
        # Get the new nic info
172
        mac = new_nic.get('mac', '')
173
        ipv4 = new_nic.get('ip', '')
174
        if net.subnet6:
175
            ipv6 = mac2eui64(mac, net.subnet6)
176
        else:
177
            ipv6 = ''
178

    
179
        firewall = new_nic.get('firewall', '')
180
        firewall_profile = _reverse_tags.get(firewall, '')
181
        if not firewall_profile and net.public:
182
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
183

    
184
        nic = {
185
            'index': i,
186
            'network': net,
187
            'mac': mac,
188
            'ipv4': ipv4,
189
            'ipv6': ipv6,
190
            'firewall_profile': firewall_profile,
191
            'state': 'ACTIVE'}
192

    
193
        new_nics.append(nic)
194
    return new_nics
195

    
196

    
197
def nics_changed(old_nics, new_nics):
198
    """Return True if NICs have changed in any way."""
199
    if len(old_nics) != len(new_nics):
200
        return True
201
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
202
    for old_nic, new_nic in zip(old_nics, new_nics):
203
        for field in fields:
204
            if getattr(old_nic, field) != new_nic[field]:
205
                return True
206
    return False
207

    
208

    
209
def release_instance_nics(vm):
210
    for nic in vm.nics.all():
211
        net = nic.network
212
        if nic.ipv4:
213
            net.release_address(nic.ipv4)
214
        nic.delete()
215
        net.save()
216

    
217

    
218
@transaction.commit_on_success
219
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
220
    if status not in [x[0] for x in BACKEND_STATUSES]:
221
        raise Network.InvalidBackendMsgError(opcode, status)
222

    
223
    back_network.backendjobid = jobid
224
    back_network.backendjobstatus = status
225
    back_network.backendopcode = opcode
226
    back_network.backendlogmsg = logmsg
227

    
228
    # Notifications of success change the operating state
229
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
230
    if status == 'success' and state_for_success is not None:
231
        back_network.operstate = state_for_success
232

    
233
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
234
        back_network.operstate = 'ERROR'
235
        back_network.backendtime = etime
236

    
237
    if opcode == 'OP_NETWORK_REMOVE':
238
        if status == 'success' or (status == 'error' and
239
                                   back_network.operstate == 'ERROR'):
240
            back_network.operstate = state_for_success
241
            back_network.deleted = True
242
            back_network.backendtime = etime
243

    
244
    if status == 'success':
245
        back_network.backendtime = etime
246
    back_network.save()
247
    # Also you must update the state of the Network!!
248
    update_network_state(back_network.network)
249

    
250

    
251
def update_network_state(network):
252
    """Update the state of a Network based on BackendNetwork states.
253

254
    Update the state of a Network based on the operstate of the networks in the
255
    backends that network exists.
256

257
    The state of the network is:
258
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
259
    * DELETED: If it is is 'DELETED' in all backends that have been created.
260

261
    This function also releases the resources (MAC prefix or Bridge) and the
262
    quotas for the network.
263

264
    """
265
    if network.deleted:
266
        # Network has already been deleted. Just assert that state is also
267
        # DELETED
268
        if not network.state == "DELETED":
269
            network.state = "DELETED"
270
            network.save()
271
        return
272

    
273
    backend_states = [s.operstate for s in network.backend_networks.all()]
274
    if not backend_states and network.action != "DESTROY":
275
        if network.state != "ACTIVE":
276
            network.state = "ACTIVE"
277
            network.save()
278
            return
279

    
280
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
281
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
282
                     "DELETED")
283

    
284
    # Release the resources on the deletion of the Network
285
    if deleted:
286
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
287
                 network.id, network.mac_prefix, network.link)
288
        network.deleted = True
289
        network.state = "DELETED"
290
        if network.mac_prefix:
291
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
292
                release_resource(res_type="mac_prefix",
293
                                 value=network.mac_prefix)
294
        if network.link:
295
            if network.FLAVORS[network.flavor]["link"] == "pool":
296
                release_resource(res_type="bridge", value=network.link)
297

    
298
        # Issue commission
299
        if network.userid:
300
            quotas.issue_and_accept_commission(network, delete=True)
301
        elif not network.public:
302
            log.warning("Network %s does not have an owner!", network.id)
303
    network.save()
304

    
305

    
306
@transaction.commit_on_success
307
def process_network_modify(back_network, etime, jobid, opcode, status,
308
                           add_reserved_ips, remove_reserved_ips):
309
    assert (opcode == "OP_NETWORK_SET_PARAMS")
310
    if status not in [x[0] for x in BACKEND_STATUSES]:
311
        raise Network.InvalidBackendMsgError(opcode, status)
312

    
313
    back_network.backendjobid = jobid
314
    back_network.backendjobstatus = status
315
    back_network.opcode = opcode
316

    
317
    if add_reserved_ips or remove_reserved_ips:
318
        net = back_network.network
319
        pool = net.get_pool()
320
        if add_reserved_ips:
321
            for ip in add_reserved_ips:
322
                pool.reserve(ip, external=True)
323
        if remove_reserved_ips:
324
            for ip in remove_reserved_ips:
325
                pool.put(ip, external=True)
326
        pool.save()
327

    
328
    if status == 'success':
329
        back_network.backendtime = etime
330
    back_network.save()
331

    
332

    
333
@transaction.commit_on_success
334
def process_create_progress(vm, etime, progress):
335

    
336
    percentage = int(progress)
337

    
338
    # The percentage may exceed 100%, due to the way
339
    # snf-image:copy-progress tracks bytes read by image handling processes
340
    percentage = 100 if percentage > 100 else percentage
341
    if percentage < 0:
342
        raise ValueError("Percentage cannot be negative")
343

    
344
    # FIXME: log a warning here, see #1033
345
#   if last_update > percentage:
346
#       raise ValueError("Build percentage should increase monotonically " \
347
#                        "(old = %d, new = %d)" % (last_update, percentage))
348

    
349
    # This assumes that no message of type 'ganeti-create-progress' is going to
350
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
351
    # the instance is STARTED.  What if the two messages are processed by two
352
    # separate dispatcher threads, and the 'ganeti-op-status' message for
353
    # successful creation gets processed before the 'ganeti-create-progress'
354
    # message? [vkoukis]
355
    #
356
    #if not vm.operstate == 'BUILD':
357
    #    raise VirtualMachine.IllegalState("VM is not in building state")
358

    
359
    vm.buildpercentage = percentage
360
    vm.backendtime = etime
361
    vm.save()
362

    
363

    
364
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
365
                               details=None):
366
    """
367
    Create virtual machine instance diagnostic entry.
368

369
    :param vm: VirtualMachine instance to create diagnostic for.
370
    :param message: Diagnostic message.
371
    :param source: Diagnostic source identifier (e.g. image-helper).
372
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
373
    :param etime: The time the message occured (if available).
374
    :param details: Additional details or debug information.
375
    """
376
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
377
                                                   source_date=etime,
378
                                                   message=message,
379
                                                   details=details)
380

    
381

    
382
def create_instance(vm, public_nic, flavor, image):
383
    """`image` is a dictionary which should contain the keys:
384
            'backend_id', 'format' and 'metadata'
385

386
        metadata value should be a dictionary.
387
    """
388

    
389
    # Handle arguments to CreateInstance() as a dictionary,
390
    # initialize it based on a deployment-specific value.
391
    # This enables the administrator to override deployment-specific
392
    # arguments, such as the disk template to use, name of os provider
393
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
394
    #
395
    kw = vm.backend.get_create_params()
396
    kw['mode'] = 'create'
397
    kw['name'] = vm.backend_vm_id
398
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
399

    
400
    kw['disk_template'] = flavor.disk_template
401
    kw['disks'] = [{"size": flavor.disk * 1024}]
402
    provider = flavor.disk_provider
403
    if provider:
404
        kw['disks'][0]['provider'] = provider
405
        kw['disks'][0]['origin'] = flavor.disk_origin
406

    
407
    kw['nics'] = [public_nic]
408
    if vm.backend.use_hotplug():
409
        kw['hotplug'] = True
410
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
411
    # kw['os'] = settings.GANETI_OS_PROVIDER
412
    kw['ip_check'] = False
413
    kw['name_check'] = False
414

    
415
    # Do not specific a node explicitly, have
416
    # Ganeti use an iallocator instead
417
    #kw['pnode'] = rapi.GetNodes()[0]
418

    
419
    kw['dry_run'] = settings.TEST
420

    
421
    kw['beparams'] = {
422
        'auto_balance': True,
423
        'vcpus': flavor.cpu,
424
        'memory': flavor.ram}
425

    
426
    kw['osparams'] = {
427
        'config_url': vm.config_url,
428
        # Store image id and format to Ganeti
429
        'img_id': image['backend_id'],
430
        'img_format': image['format']}
431

    
432
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
433
    # kw['hvparams'] = dict(serial_console=False)
434

    
435
    log.debug("Creating instance %s", utils.hide_pass(kw))
436
    with pooled_rapi_client(vm) as client:
437
        return client.CreateInstance(**kw)
438

    
439

    
440
def delete_instance(vm):
441
    with pooled_rapi_client(vm) as client:
442
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
443

    
444

    
445
def reboot_instance(vm, reboot_type):
446
    assert reboot_type in ('soft', 'hard')
447
    with pooled_rapi_client(vm) as client:
448
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
449
                                     dry_run=settings.TEST)
450

    
451

    
452
def startup_instance(vm):
453
    with pooled_rapi_client(vm) as client:
454
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
455

    
456

    
457
def shutdown_instance(vm):
458
    with pooled_rapi_client(vm) as client:
459
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
460

    
461

    
462
def get_instance_console(vm):
463
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
464
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
465
    # useless (see #783).
466
    #
467
    # Until this is fixed on the Ganeti side, construct a console info reply
468
    # directly.
469
    #
470
    # WARNING: This assumes that VNC runs on port network_port on
471
    #          the instance's primary node, and is probably
472
    #          hypervisor-specific.
473
    #
474
    log.debug("Getting console for vm %s", vm)
475

    
476
    console = {}
477
    console['kind'] = 'vnc'
478

    
479
    with pooled_rapi_client(vm) as client:
480
        i = client.GetInstance(vm.backend_vm_id)
481

    
482
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
483
        raise Exception("hv parameter serial_console cannot be true")
484
    console['host'] = i['pnode']
485
    console['port'] = i['network_port']
486

    
487
    return console
488

    
489

    
490
def get_instance_info(vm):
491
    with pooled_rapi_client(vm) as client:
492
        return client.GetInstanceInfo(vm.backend_vm_id)
493

    
494

    
495
def create_network(network, backend, connect=True):
496
    """Create a network in a Ganeti backend"""
497
    log.debug("Creating network %s in backend %s", network, backend)
498

    
499
    job_id = _create_network(network, backend)
500

    
501
    if connect:
502
        job_ids = connect_network(network, backend, depends=[job_id])
503
        return job_ids
504
    else:
505
        return [job_id]
506

    
507

    
508
def _create_network(network, backend):
509
    """Create a network."""
510

    
511
    network_type = network.public and 'public' or 'private'
512

    
513
    tags = network.backend_tag
514
    if network.dhcp:
515
        tags.append('nfdhcpd')
516

    
517
    if network.public:
518
        conflicts_check = True
519
    else:
520
        conflicts_check = False
521

    
522
    try:
523
        bn = BackendNetwork.objects.get(network=network, backend=backend)
524
        mac_prefix = bn.mac_prefix
525
    except BackendNetwork.DoesNotExist:
526
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
527
                        " does not exist" % (network.id, backend.id))
528

    
529
    with pooled_rapi_client(backend) as client:
530
        return client.CreateNetwork(network_name=network.backend_id,
531
                                    network=network.subnet,
532
                                    network6=network.subnet6,
533
                                    gateway=network.gateway,
534
                                    gateway6=network.gateway6,
535
                                    network_type=network_type,
536
                                    mac_prefix=mac_prefix,
537
                                    conflicts_check=conflicts_check,
538
                                    tags=tags)
539

    
540

    
541
def connect_network(network, backend, depends=[], group=None):
542
    """Connect a network to nodegroups."""
543
    log.debug("Connecting network %s to backend %s", network, backend)
544

    
545
    if network.public:
546
        conflicts_check = True
547
    else:
548
        conflicts_check = False
549

    
550
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
551
    with pooled_rapi_client(backend) as client:
552
        groups = [group] if group is not None else client.GetGroups()
553
        job_ids = []
554
        for group in groups:
555
            job_id = client.ConnectNetwork(network.backend_id, group,
556
                                           network.mode, network.link,
557
                                           conflicts_check,
558
                                           depends=depends)
559
            job_ids.append(job_id)
560
    return job_ids
561

    
562

    
563
def delete_network(network, backend, disconnect=True):
564
    log.debug("Deleting network %s from backend %s", network, backend)
565

    
566
    depends = []
567
    if disconnect:
568
        depends = disconnect_network(network, backend)
569
    _delete_network(network, backend, depends=depends)
570

    
571

    
572
def _delete_network(network, backend, depends=[]):
573
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
574
    with pooled_rapi_client(backend) as client:
575
        return client.DeleteNetwork(network.backend_id, depends)
576

    
577

    
578
def disconnect_network(network, backend, group=None):
579
    log.debug("Disconnecting network %s to backend %s", network, backend)
580

    
581
    with pooled_rapi_client(backend) as client:
582
        groups = [group] if group is not None else client.GetGroups()
583
        job_ids = []
584
        for group in groups:
585
            job_id = client.DisconnectNetwork(network.backend_id, group)
586
            job_ids.append(job_id)
587
    return job_ids
588

    
589

    
590
def connect_to_network(vm, network, address=None):
591
    backend = vm.backend
592
    network = Network.objects.select_for_update().get(id=network.id)
593
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
594
                                                         network=network)
595
    depend_jobs = []
596
    if bnet.operstate != "ACTIVE":
597
        depend_jobs = create_network(network, backend, connect=True)
598

    
599
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
600

    
601
    nic = {'ip': address, 'network': network.backend_id}
602

    
603
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
604

    
605
    with pooled_rapi_client(vm) as client:
606
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
607
                                     hotplug=vm.backend.use_hotplug(),
608
                                     depends=depends,
609
                                     dry_run=settings.TEST)
610

    
611

    
612
def disconnect_from_network(vm, nic):
613
    op = [('remove', nic.index, {})]
614

    
615
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
616

    
617
    with pooled_rapi_client(vm) as client:
618
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
619
                                     hotplug=vm.backend.use_hotplug(),
620
                                     dry_run=settings.TEST)
621

    
622

    
623
def set_firewall_profile(vm, profile):
624
    try:
625
        tag = _firewall_tags[profile]
626
    except KeyError:
627
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
628

    
629
    log.debug("Setting tag of VM %s to %s", vm, profile)
630

    
631
    with pooled_rapi_client(vm) as client:
632
        # Delete all firewall tags
633
        for t in _firewall_tags.values():
634
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
635
                                      dry_run=settings.TEST)
636

    
637
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
638

    
639
        # XXX NOP ModifyInstance call to force process_net_status to run
640
        # on the dispatcher
641
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
642
        client.ModifyInstance(vm.backend_vm_id,
643
                              os_name=os_name)
644

    
645

    
646
def get_instances(backend, bulk, queue):
647
    with pooled_rapi_client(backend) as client:
648
        instances = client.GetInstances(bulk=bulk)
649
    queue.put(instances)
650

    
651

    
652
def get_ganeti_instances(backend=None, bulk=False):
653
    instances = []
654
    backends = get_backends(backend)
655
    queue = multiprocessing.Queue()
656
    processes = []
657
    for backend in backends:
658
        p = multiprocessing.Process(target=get_instances,
659
                                    args=(backend, bulk, queue))
660
        processes.append(p)
661
        p.start()
662
    [p.join() for p in processes]
663
    [instances.extend(queue.get()) for p in processes]
664
    return instances
665

    
666

    
667
def get_ganeti_nodes(backend=None, bulk=False):
668
    nodes = []
669
    for backend in get_backends(backend):
670
        with pooled_rapi_client(backend) as client:
671
            nodes.append(client.GetNodes(bulk=bulk))
672

    
673
    return reduce(list.__add__, nodes, [])
674

    
675

    
676
def get_ganeti_jobs(backend=None, bulk=False):
677
    jobs = []
678
    for backend in get_backends(backend):
679
        with pooled_rapi_client(backend) as client:
680
            jobs.append(client.GetJobs(bulk=bulk))
681
    return reduce(list.__add__, jobs, [])
682

    
683
##
684
##
685
##
686

    
687

    
688
def get_backends(backend=None):
689
    if backend:
690
        if backend.offline:
691
            return []
692
        return [backend]
693
    return Backend.objects.filter(offline=False)
694

    
695

    
696
def get_physical_resources(backend):
697
    """ Get the physical resources of a backend.
698

699
    Get the resources of a backend as reported by the backend (not the db).
700

701
    """
702
    nodes = get_ganeti_nodes(backend, bulk=True)
703
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
704
    res = {}
705
    for a in attr:
706
        res[a] = 0
707
    for n in nodes:
708
        # Filter out drained, offline and not vm_capable nodes since they will
709
        # not take part in the vm allocation process
710
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
711
        if can_host_vms and n['cnodes']:
712
            for a in attr:
713
                res[a] += int(n[a])
714
    return res
715

    
716

    
717
def update_resources(backend, resources=None):
718
    """ Update the state of the backend resources in db.
719

720
    """
721

    
722
    if not resources:
723
        resources = get_physical_resources(backend)
724

    
725
    backend.mfree = resources['mfree']
726
    backend.mtotal = resources['mtotal']
727
    backend.dfree = resources['dfree']
728
    backend.dtotal = resources['dtotal']
729
    backend.pinst_cnt = resources['pinst_cnt']
730
    backend.ctotal = resources['ctotal']
731
    backend.updated = datetime.now()
732
    backend.save()
733

    
734

    
735
def get_memory_from_instances(backend):
736
    """ Get the memory that is used from instances.
737

738
    Get the used memory of a backend. Note: This is different for
739
    the real memory used, due to kvm's memory de-duplication.
740

741
    """
742
    with pooled_rapi_client(backend) as client:
743
        instances = client.GetInstances(bulk=True)
744
    mem = 0
745
    for i in instances:
746
        mem += i['oper_ram']
747
    return mem
748

    
749
##
750
## Synchronized operations for reconciliation
751
##
752

    
753

    
754
def create_network_synced(network, backend):
755
    result = _create_network_synced(network, backend)
756
    if result[0] != 'success':
757
        return result
758
    result = connect_network_synced(network, backend)
759
    return result
760

    
761

    
762
def _create_network_synced(network, backend):
763
    with pooled_rapi_client(backend) as client:
764
        job = _create_network(network, backend)
765
        result = wait_for_job(client, job)
766
    return result
767

    
768

    
769
def connect_network_synced(network, backend):
770
    with pooled_rapi_client(backend) as client:
771
        for group in client.GetGroups():
772
            job = client.ConnectNetwork(network.backend_id, group,
773
                                        network.mode, network.link)
774
            result = wait_for_job(client, job)
775
            if result[0] != 'success':
776
                return result
777

    
778
    return result
779

    
780

    
781
def wait_for_job(client, jobid):
782
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
783
    status = result['job_info'][0]
784
    while status not in ['success', 'error', 'cancel']:
785
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
786
                                         [result], None)
787
        status = result['job_info'][0]
788

    
789
    if status == 'success':
790
        return (status, None)
791
    else:
792
        error = result['job_info'][1]
793
        return (status, error)