Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ cb4eee84

History | View | Annotate | Download (25.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, BridgePoolTable,
43
                               MacPrefixPoolTable, VirtualMachineDiagnostic)
44
from synnefo.logic import utils
45
from synnefo import quotas
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
@quotas.uses_commission
60
@transaction.commit_on_success
61
def process_op_status(serials, vm, etime, jobid, opcode, status, logmsg):
62
    """Process a job progress notification from the backend
63

64
    Process an incoming message from the backend (currently Ganeti).
65
    Job notifications with a terminating status (sucess, error, or canceled),
66
    also update the operating state of the VM.
67

68
    """
69
    # See #1492, #1031, #1111 why this line has been removed
70
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
71
    if status not in [x[0] for x in BACKEND_STATUSES]:
72
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
73

    
74
    vm.backendjobid = jobid
75
    vm.backendjobstatus = status
76
    vm.backendopcode = opcode
77
    vm.backendlogmsg = logmsg
78

    
79
    # Notifications of success change the operating state
80
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
81
    if status == 'success' and state_for_success is not None:
82
        vm.operstate = state_for_success
83

    
84
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
85
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
86
        vm.operstate = 'ERROR'
87
        vm.backendtime = etime
88
    elif opcode == 'OP_INSTANCE_REMOVE':
89
        # Set the deleted flag explicitly, cater for admin-initiated removals
90
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
91
        # when no instance exists at the Ganeti backend.
92
        # See ticket #799 for all the details.
93
        #
94
        if status == 'success' or (status == 'error' and
95
                                   vm.operstate == 'ERROR'):
96
            # Issue commission
97
            serial = quotas.issue_vm_commission(vm.userid, vm.flavor,
98
                                                delete=True)
99
            serials.append(serial)
100
            vm.serial = serial
101
            serial.accepted = True
102
            serial.save()
103
            release_instance_nics(vm)
104
            vm.nics.all().delete()
105
            vm.deleted = True
106
            vm.operstate = state_for_success
107
            vm.backendtime = etime
108

    
109
    # Update backendtime only for jobs that have been successfully completed,
110
    # since only these jobs update the state of the VM. Else a "race condition"
111
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
112
    # before an error job and messages arrive in reversed order.
113
    if status == 'success':
114
        vm.backendtime = etime
115

    
116
    vm.save()
117

    
118

    
119
@transaction.commit_on_success
120
def process_net_status(vm, etime, nics):
121
    """Process a net status notification from the backend
122

123
    Process an incoming message from the Ganeti backend,
124
    detailing the NIC configuration of a VM instance.
125

126
    Update the state of the VM in the DB accordingly.
127
    """
128

    
129
    ganeti_nics = process_ganeti_nics(nics)
130
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
131
        log.debug("NICs for VM %s have not changed", vm)
132

    
133
    release_instance_nics(vm)
134

    
135
    for nic in ganeti_nics:
136
        ipv4 = nic.get('ipv4', '')
137
        if ipv4:
138
            net = nic['network']
139
            net.reserve_address(ipv4)
140

    
141
        nic['dirty'] = False
142
        vm.nics.create(**nic)
143
        # Dummy save the network, because UI uses changed-since for VMs
144
        # and Networks in order to show the VM NICs
145
        net.save()
146

    
147
    vm.backendtime = etime
148
    vm.save()
149

    
150

    
151
def process_ganeti_nics(ganeti_nics):
152
    """Process NIC dict from ganeti hooks."""
153
    new_nics = []
154
    for i, new_nic in enumerate(ganeti_nics):
155
        network = new_nic.get('network', '')
156
        n = str(network)
157
        pk = utils.id_from_network_name(n)
158

    
159
        net = Network.objects.get(pk=pk)
160

    
161
        # Get the new nic info
162
        mac = new_nic.get('mac', '')
163
        ipv4 = new_nic.get('ip', '')
164
        ipv6 = new_nic.get('ipv6', '')
165

    
166
        firewall = new_nic.get('firewall', '')
167
        firewall_profile = _reverse_tags.get(firewall, '')
168
        if not firewall_profile and net.public:
169
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
170

    
171
        nic = {
172
               'index': i,
173
               'network': net,
174
               'mac': mac,
175
               'ipv4': ipv4,
176
               'ipv6': ipv6,
177
               'firewall_profile': firewall_profile}
178

    
179
        new_nics.append(nic)
180
    return new_nics
181

    
182

    
183
def nics_changed(old_nics, new_nics):
184
    """Return True if NICs have changed in any way."""
185
    if len(old_nics) != len(new_nics):
186
        return True
187
    for old_nic, new_nic in zip(old_nics, new_nics):
188
        if not (old_nic.ipv4 == new_nic['ipv4'] and\
189
                old_nic.ipv6 == new_nic['ipv6'] and\
190
                old_nic.mac == new_nic['mac'] and\
191
                old_nic.firewall_profile == new_nic['firewall_profile'] and\
192
                old_nic.index == new_nic['index'] and\
193
                old_nic.network == new_nic['network']):
194
            return True
195
    return False
196

    
197

    
198
def release_instance_nics(vm):
199
    for nic in vm.nics.all():
200
        net = nic.network
201
        if nic.ipv4:
202
            net.release_address(nic.ipv4)
203
        nic.delete()
204
        net.save()
205

    
206

    
207
@transaction.commit_on_success
208
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
209
    if status not in [x[0] for x in BACKEND_STATUSES]:
210
        raise Network.InvalidBackendMsgError(opcode, status)
211

    
212
    back_network.backendjobid = jobid
213
    back_network.backendjobstatus = status
214
    back_network.backendopcode = opcode
215
    back_network.backendlogmsg = logmsg
216

    
217
    # Notifications of success change the operating state
218
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
219
    if status == 'success' and state_for_success is not None:
220
        back_network.operstate = state_for_success
221

    
222
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
223
        utils.update_state(back_network, 'ERROR')
224
        back_network.backendtime = etime
225

    
226
    if opcode == 'OP_NETWORK_REMOVE':
227
        if status == 'success' or (status == 'error' and
228
                                   back_network.operstate == 'ERROR'):
229
            back_network.operstate = state_for_success
230
            back_network.deleted = True
231
            back_network.backendtime = etime
232

    
233
    if status == 'success':
234
        back_network.backendtime = etime
235
    back_network.save()
236
    # Also you must update the state of the Network!!
237
    update_network_state(back_network.network)
238

    
239

    
240
@quotas.uses_commission
241
@transaction.commit_on_success
242
def update_network_state(serials, network):
243
    old_state = network.state
244

    
245
    backend_states = [s.operstate for s in network.backend_networks.all()]
246
    if not backend_states:
247
        network.state = 'PENDING'
248
        network.save()
249
        return
250

    
251
    all_equal = len(set(backend_states)) <= 1
252
    network.state = all_equal and backend_states[0] or 'PENDING'
253

    
254
    # Release the resources on the deletion of the Network
255
    if old_state != 'DELETED' and network.state == 'DELETED':
256
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
257
                 network.id, network.mac_prefix, network.link)
258
        network.deleted = True
259
        if network.mac_prefix and network.type == 'PRIVATE_MAC_FILTERED':
260
            mac_pool = MacPrefixPoolTable.get_pool()
261
            mac_pool.put(network.mac_prefix)
262
            mac_pool.save()
263

    
264
        if network.link and network.type == 'PRIVATE_VLAN':
265
            bridge_pool = BridgePoolTable.get_pool()
266
            bridge_pool.put(network.link)
267
            bridge_pool.save()
268

    
269
        # Issue commission
270
        serial = quotas.issue_network_commission(network.userid, delete=True)
271
        serials.append(serial)
272
        network.serial = serial
273
        serial.accepted = True
274
        serial.save()
275

    
276
    network.save()
277

    
278

    
279
@transaction.commit_on_success
280
def process_network_modify(back_network, etime, jobid, opcode, status,
281
                           add_reserved_ips, remove_reserved_ips):
282
    assert (opcode == "OP_NETWORK_SET_PARAMS")
283
    if status not in [x[0] for x in BACKEND_STATUSES]:
284
        raise Network.InvalidBackendMsgError(opcode, status)
285

    
286
    back_network.backendjobid = jobid
287
    back_network.backendjobstatus = status
288
    back_network.opcode = opcode
289

    
290
    if add_reserved_ips or remove_reserved_ips:
291
        net = back_network.network
292
        pool = net.get_pool()
293
        if add_reserved_ips:
294
            for ip in add_reserved_ips:
295
                pool.reserve(ip, external=True)
296
        if remove_reserved_ips:
297
            for ip in remove_reserved_ips:
298
                pool.put(ip, external=True)
299
        pool.save()
300

    
301
    if status == 'success':
302
        back_network.backendtime = etime
303
    back_network.save()
304

    
305

    
306
@transaction.commit_on_success
307
def process_create_progress(vm, etime, progress):
308

    
309
    percentage = int(progress)
310

    
311
    # The percentage may exceed 100%, due to the way
312
    # snf-image:copy-progress tracks bytes read by image handling processes
313
    percentage = 100 if percentage > 100 else percentage
314
    if percentage < 0:
315
        raise ValueError("Percentage cannot be negative")
316

    
317
    # FIXME: log a warning here, see #1033
318
#   if last_update > percentage:
319
#       raise ValueError("Build percentage should increase monotonically " \
320
#                        "(old = %d, new = %d)" % (last_update, percentage))
321

    
322
    # This assumes that no message of type 'ganeti-create-progress' is going to
323
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
324
    # the instance is STARTED.  What if the two messages are processed by two
325
    # separate dispatcher threads, and the 'ganeti-op-status' message for
326
    # successful creation gets processed before the 'ganeti-create-progress'
327
    # message? [vkoukis]
328
    #
329
    #if not vm.operstate == 'BUILD':
330
    #    raise VirtualMachine.IllegalState("VM is not in building state")
331

    
332
    vm.buildpercentage = percentage
333
    vm.backendtime = etime
334
    vm.save()
335

    
336

    
337
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
338
    details=None):
339
    """
340
    Create virtual machine instance diagnostic entry.
341

342
    :param vm: VirtualMachine instance to create diagnostic for.
343
    :param message: Diagnostic message.
344
    :param source: Diagnostic source identifier (e.g. image-helper).
345
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
346
    :param etime: The time the message occured (if available).
347
    :param details: Additional details or debug information.
348
    """
349
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
350
            source_date=etime, message=message, details=details)
351

    
352

    
353
def create_instance(vm, public_nic, flavor, image, password, personality):
354
    """`image` is a dictionary which should contain the keys:
355
            'backend_id', 'format' and 'metadata'
356

357
        metadata value should be a dictionary.
358
    """
359

    
360
    if settings.IGNORE_FLAVOR_DISK_SIZES:
361
        if image['backend_id'].find("windows") >= 0:
362
            sz = 14000
363
        else:
364
            sz = 4000
365
    else:
366
        sz = flavor.disk * 1024
367

    
368
    # Handle arguments to CreateInstance() as a dictionary,
369
    # initialize it based on a deployment-specific value.
370
    # This enables the administrator to override deployment-specific
371
    # arguments, such as the disk template to use, name of os provider
372
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
373
    #
374
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
375
    kw['mode'] = 'create'
376
    kw['name'] = vm.backend_vm_id
377
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
378

    
379
    # Identify if provider parameter should be set in disk options.
380
    # Current implementation support providers only fo ext template.
381
    # To select specific provider for an ext template, template name
382
    # should be formated as `ext_<provider_name>`.
383
    provider = None
384
    disk_template = flavor.disk_template
385
    if flavor.disk_template.startswith("ext"):
386
        disk_template, provider = flavor.disk_template.split("_", 1)
387

    
388
    kw['disk_template'] = disk_template
389
    kw['disks'] = [{"size": sz}]
390
    if provider:
391
        kw['disks'][0]['provider'] = provider
392

    
393
        if provider == 'vlmc':
394
            kw['disks'][0]['origin'] = image['backend_id']
395

    
396
    kw['nics'] = [public_nic]
397
    if settings.GANETI_USE_HOTPLUG:
398
        kw['hotplug'] = True
399
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
400
    # kw['os'] = settings.GANETI_OS_PROVIDER
401
    kw['ip_check'] = False
402
    kw['name_check'] = False
403
    # Do not specific a node explicitly, have
404
    # Ganeti use an iallocator instead
405
    #
406
    #kw['pnode'] = rapi.GetNodes()[0]
407
    kw['dry_run'] = settings.TEST
408

    
409
    kw['beparams'] = {
410
        'auto_balance': True,
411
        'vcpus': flavor.cpu,
412
        'memory': flavor.ram}
413

    
414
    kw['osparams'] = {
415
        'img_id': image['backend_id'],
416
        'img_passwd': password,
417
        'img_format': image['format']}
418
    if personality:
419
        kw['osparams']['img_personality'] = json.dumps(personality)
420

    
421
    if provider != None and provider == 'vlmc':
422
        kw['osparams']['img_id'] = 'null'
423

    
424
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
425

    
426
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
427
    # kw['hvparams'] = dict(serial_console=False)
428
    log.debug("Creating instance %s", utils.hide_pass(kw))
429
    with pooled_rapi_client(vm) as client:
430
        return client.CreateInstance(**kw)
431

    
432

    
433
def delete_instance(vm):
434
    with pooled_rapi_client(vm) as client:
435
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
436

    
437

    
438
def reboot_instance(vm, reboot_type):
439
    assert reboot_type in ('soft', 'hard')
440
    with pooled_rapi_client(vm) as client:
441
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
442
                                     dry_run=settings.TEST)
443

    
444

    
445
def startup_instance(vm):
446
    with pooled_rapi_client(vm) as client:
447
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
448

    
449

    
450
def shutdown_instance(vm):
451
    with pooled_rapi_client(vm) as client:
452
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
453

    
454

    
455
def get_instance_console(vm):
456
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
457
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
458
    # useless (see #783).
459
    #
460
    # Until this is fixed on the Ganeti side, construct a console info reply
461
    # directly.
462
    #
463
    # WARNING: This assumes that VNC runs on port network_port on
464
    #          the instance's primary node, and is probably
465
    #          hypervisor-specific.
466
    #
467
    log.debug("Getting console for vm %s", vm)
468

    
469
    console = {}
470
    console['kind'] = 'vnc'
471

    
472
    with pooled_rapi_client(vm) as client:
473
        i = client.GetInstance(vm.backend_vm_id)
474

    
475
    if i['hvparams']['serial_console']:
476
        raise Exception("hv parameter serial_console cannot be true")
477
    console['host'] = i['pnode']
478
    console['port'] = i['network_port']
479

    
480
    return console
481

    
482

    
483
def get_instance_info(vm):
484
    with pooled_rapi_client(vm) as client:
485
        return client.GetInstanceInfo(vm.backend_vm_id)
486

    
487

    
488
def create_network(network, backends=None, connect=True):
489
    """Create and connect a network."""
490
    if not backends:
491
        backends = Backend.objects.exclude(offline=True)
492

    
493
    log.debug("Creating network %s in backends %s", network, backends)
494

    
495
    for backend in backends:
496
        create_jobID = _create_network(network, backend)
497
        if connect:
498
            connect_network(network, backend, create_jobID)
499

    
500

    
501
def _create_network(network, backend):
502
    """Create a network."""
503

    
504
    network_type = network.public and 'public' or 'private'
505

    
506
    tags = network.backend_tag
507
    if network.dhcp:
508
        tags.append('nfdhcpd')
509

    
510
    if network.public:
511
        conflicts_check = True
512
    else:
513
        conflicts_check = False
514

    
515
    try:
516
        bn = BackendNetwork.objects.get(network=network, backend=backend)
517
        mac_prefix = bn.mac_prefix
518
    except BackendNetwork.DoesNotExist:
519
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
520
                        " does not exist" % (network.id, backend.id))
521

    
522
    with pooled_rapi_client(backend) as client:
523
        return client.CreateNetwork(network_name=network.backend_id,
524
                                    network=network.subnet,
525
                                    network6=network.subnet6,
526
                                    gateway=network.gateway,
527
                                    gateway6=network.gateway6,
528
                                    network_type=network_type,
529
                                    mac_prefix=mac_prefix,
530
                                    conflicts_check=conflicts_check,
531
                                    tags=tags)
532

    
533

    
534
def connect_network(network, backend, depend_job=None, group=None):
535
    """Connect a network to nodegroups."""
536
    log.debug("Connecting network %s to backend %s", network, backend)
537

    
538
    mode = "routed" if "ROUTED" in network.type else "bridged"
539

    
540
    if network.public:
541
        conflicts_check = True
542
    else:
543
        conflicts_check = False
544

    
545
    depend_jobs = [depend_job] if depend_job else []
546
    with pooled_rapi_client(backend) as client:
547
        if group:
548
            client.ConnectNetwork(network.backend_id, group, mode,
549
                                  network.link, conflicts_check, depend_jobs)
550
        else:
551
            for group in client.GetGroups():
552
                client.ConnectNetwork(network.backend_id, group, mode,
553
                                      network.link, conflicts_check,
554
                                      depend_jobs)
555

    
556

    
557
def delete_network(network, backends=None, disconnect=True):
558
    if not backends:
559
        backends = Backend.objects.exclude(offline=True)
560

    
561
    log.debug("Deleting network %s from backends %s", network, backends)
562

    
563
    for backend in backends:
564
        disconnect_jobIDs = []
565
        if disconnect:
566
            disconnect_jobIDs = disconnect_network(network, backend)
567
        _delete_network(network, backend, disconnect_jobIDs)
568

    
569

    
570
def _delete_network(network, backend, depend_jobs=[]):
571
    with pooled_rapi_client(backend) as client:
572
        return client.DeleteNetwork(network.backend_id, depend_jobs)
573

    
574

    
575
def disconnect_network(network, backend, group=None):
576
    log.debug("Disconnecting network %s to backend %s", network, backend)
577

    
578
    with pooled_rapi_client(backend) as client:
579
        if group:
580
            return [client.DisconnectNetwork(network.backend_id, group)]
581
        else:
582
            jobs = []
583
            for group in client.GetGroups():
584
                job = client.DisconnectNetwork(network.backend_id, group)
585
                jobs.append(job)
586
            return jobs
587

    
588

    
589
def connect_to_network(vm, network, address=None):
590
    nic = {'ip': address, 'network': network.backend_id}
591

    
592
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
593

    
594
    with pooled_rapi_client(vm) as client:
595
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
596
                                     hotplug=settings.GANETI_USE_HOTPLUG,
597
                                     dry_run=settings.TEST)
598

    
599

    
600
def disconnect_from_network(vm, nic):
601
    op = [('remove', nic.index, {})]
602

    
603
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
604

    
605
    with pooled_rapi_client(vm) as client:
606
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
607
                                     hotplug=settings.GANETI_USE_HOTPLUG,
608
                                     dry_run=settings.TEST)
609

    
610

    
611
def set_firewall_profile(vm, profile):
612
    try:
613
        tag = _firewall_tags[profile]
614
    except KeyError:
615
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
616

    
617
    log.debug("Setting tag of VM %s to %s", vm, profile)
618

    
619
    with pooled_rapi_client(vm) as client:
620
        # Delete all firewall tags
621
        for t in _firewall_tags.values():
622
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
623
                                      dry_run=settings.TEST)
624

    
625
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
626

    
627
        # XXX NOP ModifyInstance call to force process_net_status to run
628
        # on the dispatcher
629
        client.ModifyInstance(vm.backend_vm_id,
630
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
631

    
632

    
633
def get_ganeti_instances(backend=None, bulk=False):
634
    instances = []
635
    for backend in get_backends(backend):
636
        with pooled_rapi_client(backend) as client:
637
            instances.append(client.GetInstances(bulk=bulk))
638

    
639
    return reduce(list.__add__, instances, [])
640

    
641

    
642
def get_ganeti_nodes(backend=None, bulk=False):
643
    nodes = []
644
    for backend in get_backends(backend):
645
        with pooled_rapi_client(backend) as client:
646
            nodes.append(client.GetNodes(bulk=bulk))
647

    
648
    return reduce(list.__add__, nodes, [])
649

    
650

    
651
def get_ganeti_jobs(backend=None, bulk=False):
652
    jobs = []
653
    for backend in get_backends(backend):
654
        with pooled_rapi_client(backend) as client:
655
            jobs.append(client.GetJobs(bulk=bulk))
656
    return reduce(list.__add__, jobs, [])
657

    
658
##
659
##
660
##
661

    
662

    
663
def get_backends(backend=None):
664
    if backend:
665
        return [backend]
666
    return Backend.objects.filter(offline=False)
667

    
668

    
669
def get_physical_resources(backend):
670
    """ Get the physical resources of a backend.
671

672
    Get the resources of a backend as reported by the backend (not the db).
673

674
    """
675
    nodes = get_ganeti_nodes(backend, bulk=True)
676
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
677
    res = {}
678
    for a in attr:
679
        res[a] = 0
680
    for n in nodes:
681
        # Filter out drained, offline and not vm_capable nodes since they will
682
        # not take part in the vm allocation process
683
        if n['vm_capable'] and not n['drained'] and not n['offline']\
684
           and n['cnodes']:
685
            for a in attr:
686
                res[a] += int(n[a])
687
    return res
688

    
689

    
690
def update_resources(backend, resources=None):
691
    """ Update the state of the backend resources in db.
692

693
    """
694

    
695
    if not resources:
696
        resources = get_physical_resources(backend)
697

    
698
    backend.mfree = resources['mfree']
699
    backend.mtotal = resources['mtotal']
700
    backend.dfree = resources['dfree']
701
    backend.dtotal = resources['dtotal']
702
    backend.pinst_cnt = resources['pinst_cnt']
703
    backend.ctotal = resources['ctotal']
704
    backend.updated = datetime.now()
705
    backend.save()
706

    
707

    
708
def get_memory_from_instances(backend):
709
    """ Get the memory that is used from instances.
710

711
    Get the used memory of a backend. Note: This is different for
712
    the real memory used, due to kvm's memory de-duplication.
713

714
    """
715
    with pooled_rapi_client(backend) as client:
716
        instances = client.GetInstances(bulk=True)
717
    mem = 0
718
    for i in instances:
719
        mem += i['oper_ram']
720
    return mem
721

    
722
##
723
## Synchronized operations for reconciliation
724
##
725

    
726

    
727
def create_network_synced(network, backend):
728
    result = _create_network_synced(network, backend)
729
    if result[0] != 'success':
730
        return result
731
    result = connect_network_synced(network, backend)
732
    return result
733

    
734

    
735
def _create_network_synced(network, backend):
736
    with pooled_rapi_client(backend) as client:
737
        job = _create_network(network, backend)
738
        result = wait_for_job(client, job)
739
    return result
740

    
741

    
742
def connect_network_synced(network, backend):
743
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
744
        mode = 'routed'
745
    else:
746
        mode = 'bridged'
747
    with pooled_rapi_client(backend) as client:
748
        for group in client.GetGroups():
749
            job = client.ConnectNetwork(network.backend_id, group, mode,
750
                                        network.link)
751
            result = wait_for_job(client, job)
752
            if result[0] != 'success':
753
                return result
754

    
755
    return result
756

    
757

    
758
def wait_for_job(client, jobid):
759
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
760
    status = result['job_info'][0]
761
    while status not in ['success', 'error', 'cancel']:
762
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
763
                                        [result], None)
764
        status = result['job_info'][0]
765

    
766
    if status == 'success':
767
        return (status, None)
768
    else:
769
        error = result['job_info'][1]
770
        return (status, error)