Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 10de1102

History | View | Annotate | Download (25.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client, BridgePoolTable,
43
                               MacPrefixPoolTable, VirtualMachineDiagnostic)
44
from synnefo.logic import utils
45
from synnefo import quotas
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
@quotas.uses_commission
60
@transaction.commit_on_success
61
def process_op_status(serials, vm, etime, jobid, opcode, status, logmsg):
62
    """Process a job progress notification from the backend
63

64
    Process an incoming message from the backend (currently Ganeti).
65
    Job notifications with a terminating status (sucess, error, or canceled),
66
    also update the operating state of the VM.
67

68
    """
69
    # See #1492, #1031, #1111 why this line has been removed
70
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
71
    if status not in [x[0] for x in BACKEND_STATUSES]:
72
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
73

    
74
    vm.backendjobid = jobid
75
    vm.backendjobstatus = status
76
    vm.backendopcode = opcode
77
    vm.backendlogmsg = logmsg
78

    
79
    # Notifications of success change the operating state
80
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
81
    if status == 'success' and state_for_success is not None:
82
        vm.operstate = state_for_success
83

    
84
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
85
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
86
        vm.operstate = 'ERROR'
87
        vm.backendtime = etime
88
    elif opcode == 'OP_INSTANCE_REMOVE':
89
        # Set the deleted flag explicitly, cater for admin-initiated removals
90
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
91
        # when no instance exists at the Ganeti backend.
92
        # See ticket #799 for all the details.
93
        #
94
        if status == 'success' or (status == 'error' and
95
                                   vm.operstate == 'ERROR'):
96
            # Issue commission
97
            serial = quotas.issue_vm_commission(vm.userid, vm.flavor,
98
                                                delete=True)
99
            serials.append(serial)
100
            vm.serial = serial
101
            serial.accepted = True
102
            serial.save()
103
            release_instance_nics(vm)
104
            vm.nics.all().delete()
105
            vm.deleted = True
106
            vm.operstate = state_for_success
107
            vm.backendtime = etime
108

    
109
    # Update backendtime only for jobs that have been successfully completed,
110
    # since only these jobs update the state of the VM. Else a "race condition"
111
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
112
    # before an error job and messages arrive in reversed order.
113
    if status == 'success':
114
        vm.backendtime = etime
115

    
116
    vm.save()
117

    
118

    
119
@transaction.commit_on_success
120
def process_net_status(vm, etime, nics):
121
    """Process a net status notification from the backend
122

123
    Process an incoming message from the Ganeti backend,
124
    detailing the NIC configuration of a VM instance.
125

126
    Update the state of the VM in the DB accordingly.
127
    """
128

    
129
    ganeti_nics = process_ganeti_nics(nics)
130
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
131
        log.debug("NICs for VM %s have not changed", vm)
132

    
133
    release_instance_nics(vm)
134

    
135
    for nic in ganeti_nics:
136
        ipv4 = nic.get('ipv4', '')
137
        if ipv4:
138
            net = nic['network']
139
            net.reserve_address(ipv4)
140

    
141
        nic['dirty'] = False
142
        vm.nics.create(**nic)
143
        # Dummy save the network, because UI uses changed-since for VMs
144
        # and Networks in order to show the VM NICs
145
        net.save()
146

    
147
    vm.backendtime = etime
148
    vm.save()
149

    
150

    
151
def process_ganeti_nics(ganeti_nics):
152
    """Process NIC dict from ganeti hooks."""
153
    new_nics = []
154
    for i, new_nic in enumerate(ganeti_nics):
155
        network = new_nic.get('network', '')
156
        n = str(network)
157
        pk = utils.id_from_network_name(n)
158

    
159
        net = Network.objects.get(pk=pk)
160

    
161
        # Get the new nic info
162
        mac = new_nic.get('mac', '')
163
        ipv4 = new_nic.get('ip', '')
164
        ipv6 = new_nic.get('ipv6', '')
165

    
166
        firewall = new_nic.get('firewall', '')
167
        firewall_profile = _reverse_tags.get(firewall, '')
168
        if not firewall_profile and net.public:
169
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
170

    
171
        nic = {
172
               'index': i,
173
               'network': net,
174
               'mac': mac,
175
               'ipv4': ipv4,
176
               'ipv6': ipv6,
177
               'firewall_profile': firewall_profile}
178

    
179
        new_nics.append(nic)
180
    return new_nics
181

    
182

    
183
def nics_changed(old_nics, new_nics):
184
    """Return True if NICs have changed in any way."""
185
    if len(old_nics) != len(new_nics):
186
        return True
187
    for old_nic, new_nic in zip(old_nics, new_nics):
188
        if not (old_nic.ipv4 == new_nic['ipv4'] and\
189
                old_nic.ipv6 == new_nic['ipv6'] and\
190
                old_nic.mac == new_nic['mac'] and\
191
                old_nic.firewall_profile == new_nic['firewall_profile'] and\
192
                old_nic.index == new_nic['index'] and\
193
                old_nic.network == new_nic['network']):
194
            return True
195
    return False
196

    
197

    
198
def release_instance_nics(vm):
199
    for nic in vm.nics.all():
200
        net = nic.network
201
        if nic.ipv4:
202
            net.release_address(nic.ipv4)
203
        nic.delete()
204
        net.save()
205

    
206

    
207
@transaction.commit_on_success
208
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
209
    if status not in [x[0] for x in BACKEND_STATUSES]:
210
        raise Network.InvalidBackendMsgError(opcode, status)
211

    
212
    back_network.backendjobid = jobid
213
    back_network.backendjobstatus = status
214
    back_network.backendopcode = opcode
215
    back_network.backendlogmsg = logmsg
216

    
217
    # Notifications of success change the operating state
218
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
219
    if status == 'success' and state_for_success is not None:
220
        back_network.operstate = state_for_success
221

    
222
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
223
        utils.update_state(back_network, 'ERROR')
224
        back_network.backendtime = etime
225

    
226
    if opcode == 'OP_NETWORK_REMOVE':
227
        if status == 'success' or (status == 'error' and
228
                                   back_network.operstate == 'ERROR'):
229
            back_network.operstate = state_for_success
230
            back_network.deleted = True
231
            back_network.backendtime = etime
232

    
233
    if status == 'success':
234
        back_network.backendtime = etime
235
    back_network.save()
236
    # Also you must update the state of the Network!!
237
    update_network_state(back_network.network)
238

    
239

    
240
@quotas.uses_commission
241
def update_network_state(serials, network):
242
    old_state = network.state
243

    
244
    backend_states = [s.operstate for s in network.backend_networks.all()]
245
    if not backend_states:
246
        network.state = 'PENDING'
247
        network.save()
248
        return
249

    
250
    all_equal = len(set(backend_states)) <= 1
251
    network.state = all_equal and backend_states[0] or 'PENDING'
252

    
253
    # Release the resources on the deletion of the Network
254
    if old_state != 'DELETED' and network.state == 'DELETED':
255
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
256
                 network.id, network.mac_prefix, network.link)
257
        network.deleted = True
258
        if network.mac_prefix and network.type == 'PRIVATE_MAC_FILTERED':
259
            mac_pool = MacPrefixPoolTable.get_pool()
260
            mac_pool.put(network.mac_prefix)
261
            mac_pool.save()
262

    
263
        if network.link and network.type == 'PRIVATE_VLAN':
264
            bridge_pool = BridgePoolTable.get_pool()
265
            bridge_pool.put(network.link)
266
            bridge_pool.save()
267

    
268
        # Issue commission
269
        serial = quotas.issue_network_commission(network.userid, delete=True)
270
        serials.append(serial)
271
        network.serial = serial
272
        serial.accepted = True
273
        serial.save()
274

    
275
    network.save()
276

    
277

    
278
@transaction.commit_on_success
279
def process_network_modify(back_network, etime, jobid, opcode, status,
280
                           add_reserved_ips, remove_reserved_ips):
281
    assert (opcode == "OP_NETWORK_SET_PARAMS")
282
    if status not in [x[0] for x in BACKEND_STATUSES]:
283
        raise Network.InvalidBackendMsgError(opcode, status)
284

    
285
    back_network.backendjobid = jobid
286
    back_network.backendjobstatus = status
287
    back_network.opcode = opcode
288

    
289
    if add_reserved_ips or remove_reserved_ips:
290
        net = back_network.network
291
        pool = net.get_pool()
292
        if add_reserved_ips:
293
            for ip in add_reserved_ips:
294
                pool.reserve(ip, external=True)
295
        if remove_reserved_ips:
296
            for ip in remove_reserved_ips:
297
                pool.put(ip, external=True)
298
        pool.save()
299

    
300
    if status == 'success':
301
        back_network.backendtime = etime
302
    back_network.save()
303

    
304

    
305
@transaction.commit_on_success
306
def process_create_progress(vm, etime, progress):
307

    
308
    percentage = int(progress)
309

    
310
    # The percentage may exceed 100%, due to the way
311
    # snf-image:copy-progress tracks bytes read by image handling processes
312
    percentage = 100 if percentage > 100 else percentage
313
    if percentage < 0:
314
        raise ValueError("Percentage cannot be negative")
315

    
316
    # FIXME: log a warning here, see #1033
317
#   if last_update > percentage:
318
#       raise ValueError("Build percentage should increase monotonically " \
319
#                        "(old = %d, new = %d)" % (last_update, percentage))
320

    
321
    # This assumes that no message of type 'ganeti-create-progress' is going to
322
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
323
    # the instance is STARTED.  What if the two messages are processed by two
324
    # separate dispatcher threads, and the 'ganeti-op-status' message for
325
    # successful creation gets processed before the 'ganeti-create-progress'
326
    # message? [vkoukis]
327
    #
328
    #if not vm.operstate == 'BUILD':
329
    #    raise VirtualMachine.IllegalState("VM is not in building state")
330

    
331
    vm.buildpercentage = percentage
332
    vm.backendtime = etime
333
    vm.save()
334

    
335

    
336
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
337
    details=None):
338
    """
339
    Create virtual machine instance diagnostic entry.
340

341
    :param vm: VirtualMachine instance to create diagnostic for.
342
    :param message: Diagnostic message.
343
    :param source: Diagnostic source identifier (e.g. image-helper).
344
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
345
    :param etime: The time the message occured (if available).
346
    :param details: Additional details or debug information.
347
    """
348
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
349
            source_date=etime, message=message, details=details)
350

    
351

    
352
def create_instance(vm, public_nic, flavor, image, password, personality):
353
    """`image` is a dictionary which should contain the keys:
354
            'backend_id', 'format' and 'metadata'
355

356
        metadata value should be a dictionary.
357
    """
358

    
359
    if settings.IGNORE_FLAVOR_DISK_SIZES:
360
        if image['backend_id'].find("windows") >= 0:
361
            sz = 14000
362
        else:
363
            sz = 4000
364
    else:
365
        sz = flavor.disk * 1024
366

    
367
    # Handle arguments to CreateInstance() as a dictionary,
368
    # initialize it based on a deployment-specific value.
369
    # This enables the administrator to override deployment-specific
370
    # arguments, such as the disk template to use, name of os provider
371
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
372
    #
373
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
374
    kw['mode'] = 'create'
375
    kw['name'] = vm.backend_vm_id
376
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
377

    
378
    # Identify if provider parameter should be set in disk options.
379
    # Current implementation support providers only fo ext template.
380
    # To select specific provider for an ext template, template name
381
    # should be formated as `ext_<provider_name>`.
382
    provider = None
383
    disk_template = flavor.disk_template
384
    if flavor.disk_template.startswith("ext"):
385
        disk_template, provider = flavor.disk_template.split("_", 1)
386

    
387
    kw['disk_template'] = disk_template
388
    kw['disks'] = [{"size": sz}]
389
    if provider:
390
        kw['disks'][0]['provider'] = provider
391

    
392
        if provider == 'vlmc':
393
            kw['disks'][0]['origin'] = image['backend_id']
394

    
395
    kw['nics'] = [public_nic]
396
    if settings.GANETI_USE_HOTPLUG:
397
        kw['hotplug'] = True
398
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
399
    # kw['os'] = settings.GANETI_OS_PROVIDER
400
    kw['ip_check'] = False
401
    kw['name_check'] = False
402
    # Do not specific a node explicitly, have
403
    # Ganeti use an iallocator instead
404
    #
405
    #kw['pnode'] = rapi.GetNodes()[0]
406
    kw['dry_run'] = settings.TEST
407

    
408
    kw['beparams'] = {
409
        'auto_balance': True,
410
        'vcpus': flavor.cpu,
411
        'memory': flavor.ram}
412

    
413
    kw['osparams'] = {
414
        'img_id': image['backend_id'],
415
        'img_passwd': password,
416
        'img_format': image['format']}
417
    if personality:
418
        kw['osparams']['img_personality'] = json.dumps(personality)
419

    
420
    if provider != None and provider == 'vlmc':
421
        kw['osparams']['img_id'] = 'null'
422

    
423
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
424

    
425
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
426
    # kw['hvparams'] = dict(serial_console=False)
427
    log.debug("Creating instance %s", utils.hide_pass(kw))
428
    with pooled_rapi_client(vm) as client:
429
        return client.CreateInstance(**kw)
430

    
431

    
432
def delete_instance(vm):
433
    with pooled_rapi_client(vm) as client:
434
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
435

    
436

    
437
def reboot_instance(vm, reboot_type):
438
    assert reboot_type in ('soft', 'hard')
439
    with pooled_rapi_client(vm) as client:
440
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
441
                                     dry_run=settings.TEST)
442

    
443

    
444
def startup_instance(vm):
445
    with pooled_rapi_client(vm) as client:
446
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
447

    
448

    
449
def shutdown_instance(vm):
450
    with pooled_rapi_client(vm) as client:
451
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
452

    
453

    
454
def get_instance_console(vm):
455
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
456
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
457
    # useless (see #783).
458
    #
459
    # Until this is fixed on the Ganeti side, construct a console info reply
460
    # directly.
461
    #
462
    # WARNING: This assumes that VNC runs on port network_port on
463
    #          the instance's primary node, and is probably
464
    #          hypervisor-specific.
465
    #
466
    log.debug("Getting console for vm %s", vm)
467

    
468
    console = {}
469
    console['kind'] = 'vnc'
470

    
471
    with pooled_rapi_client(vm) as client:
472
        i = client.GetInstance(vm.backend_vm_id)
473

    
474
    if i['hvparams']['serial_console']:
475
        raise Exception("hv parameter serial_console cannot be true")
476
    console['host'] = i['pnode']
477
    console['port'] = i['network_port']
478

    
479
    return console
480

    
481

    
482
def get_instance_info(vm):
483
    with pooled_rapi_client(vm) as client:
484
        return client.GetInstanceInfo(vm.backend_vm_id)
485

    
486

    
487
def create_network(network, backends=None, connect=True):
488
    """Create and connect a network."""
489
    if not backends:
490
        backends = Backend.objects.exclude(offline=True)
491

    
492
    log.debug("Creating network %s in backends %s", network, backends)
493

    
494
    for backend in backends:
495
        create_jobID = _create_network(network, backend)
496
        if connect:
497
            connect_network(network, backend, create_jobID)
498

    
499

    
500
def _create_network(network, backend):
501
    """Create a network."""
502

    
503
    network_type = network.public and 'public' or 'private'
504

    
505
    tags = network.backend_tag
506
    if network.dhcp:
507
        tags.append('nfdhcpd')
508

    
509
    if network.public:
510
        conflicts_check = True
511
    else:
512
        conflicts_check = False
513

    
514
    try:
515
        bn = BackendNetwork.objects.get(network=network, backend=backend)
516
        mac_prefix = bn.mac_prefix
517
    except BackendNetwork.DoesNotExist:
518
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
519
                        " does not exist" % (network.id, backend.id))
520

    
521
    with pooled_rapi_client(backend) as client:
522
        return client.CreateNetwork(network_name=network.backend_id,
523
                                    network=network.subnet,
524
                                    network6=network.subnet6,
525
                                    gateway=network.gateway,
526
                                    gateway6=network.gateway6,
527
                                    network_type=network_type,
528
                                    mac_prefix=mac_prefix,
529
                                    conflicts_check=conflicts_check,
530
                                    tags=tags)
531

    
532

    
533
def connect_network(network, backend, depend_job=None, group=None):
534
    """Connect a network to nodegroups."""
535
    log.debug("Connecting network %s to backend %s", network, backend)
536

    
537
    mode = "routed" if "ROUTED" in network.type else "bridged"
538

    
539
    if network.public:
540
        conflicts_check = True
541
    else:
542
        conflicts_check = False
543

    
544
    depend_jobs = [depend_job] if depend_job else []
545
    with pooled_rapi_client(backend) as client:
546
        if group:
547
            client.ConnectNetwork(network.backend_id, group, mode,
548
                                  network.link, conflicts_check, depend_jobs)
549
        else:
550
            for group in client.GetGroups():
551
                client.ConnectNetwork(network.backend_id, group, mode,
552
                                      network.link, conflicts_check,
553
                                      depend_jobs)
554

    
555

    
556
def delete_network(network, backends=None, disconnect=True):
557
    if not backends:
558
        backends = Backend.objects.exclude(offline=True)
559

    
560
    log.debug("Deleting network %s from backends %s", network, backends)
561

    
562
    for backend in backends:
563
        disconnect_jobIDs = []
564
        if disconnect:
565
            disconnect_jobIDs = disconnect_network(network, backend)
566
        _delete_network(network, backend, disconnect_jobIDs)
567

    
568

    
569
def _delete_network(network, backend, depend_jobs=[]):
570
    with pooled_rapi_client(backend) as client:
571
        return client.DeleteNetwork(network.backend_id, depend_jobs)
572

    
573

    
574
def disconnect_network(network, backend, group=None):
575
    log.debug("Disconnecting network %s to backend %s", network, backend)
576

    
577
    with pooled_rapi_client(backend) as client:
578
        if group:
579
            return [client.DisconnectNetwork(network.backend_id, group)]
580
        else:
581
            jobs = []
582
            for group in client.GetGroups():
583
                job = client.DisconnectNetwork(network.backend_id, group)
584
                jobs.append(job)
585
            return jobs
586

    
587

    
588
def connect_to_network(vm, network, address=None):
589
    nic = {'ip': address, 'network': network.backend_id}
590

    
591
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
592

    
593
    with pooled_rapi_client(vm) as client:
594
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
595
                                     hotplug=settings.GANETI_USE_HOTPLUG,
596
                                     dry_run=settings.TEST)
597

    
598

    
599
def disconnect_from_network(vm, nic):
600
    op = [('remove', nic.index, {})]
601

    
602
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
603

    
604
    with pooled_rapi_client(vm) as client:
605
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
606
                                     hotplug=settings.GANETI_USE_HOTPLUG,
607
                                     dry_run=settings.TEST)
608

    
609

    
610
def set_firewall_profile(vm, profile):
611
    try:
612
        tag = _firewall_tags[profile]
613
    except KeyError:
614
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
615

    
616
    log.debug("Setting tag of VM %s to %s", vm, profile)
617

    
618
    with pooled_rapi_client(vm) as client:
619
        # Delete all firewall tags
620
        for t in _firewall_tags.values():
621
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
622
                                      dry_run=settings.TEST)
623

    
624
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
625

    
626
        # XXX NOP ModifyInstance call to force process_net_status to run
627
        # on the dispatcher
628
        client.ModifyInstance(vm.backend_vm_id,
629
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
630

    
631

    
632
def get_ganeti_instances(backend=None, bulk=False):
633
    instances = []
634
    for backend in get_backends(backend):
635
        with pooled_rapi_client(backend) as client:
636
            instances.append(client.GetInstances(bulk=bulk))
637

    
638
    return reduce(list.__add__, instances, [])
639

    
640

    
641
def get_ganeti_nodes(backend=None, bulk=False):
642
    nodes = []
643
    for backend in get_backends(backend):
644
        with pooled_rapi_client(backend) as client:
645
            nodes.append(client.GetNodes(bulk=bulk))
646

    
647
    return reduce(list.__add__, nodes, [])
648

    
649

    
650
def get_ganeti_jobs(backend=None, bulk=False):
651
    jobs = []
652
    for backend in get_backends(backend):
653
        with pooled_rapi_client(backend) as client:
654
            jobs.append(client.GetJobs(bulk=bulk))
655
    return reduce(list.__add__, jobs, [])
656

    
657
##
658
##
659
##
660

    
661

    
662
def get_backends(backend=None):
663
    if backend:
664
        return [backend]
665
    return Backend.objects.filter(offline=False)
666

    
667

    
668
def get_physical_resources(backend):
669
    """ Get the physical resources of a backend.
670

671
    Get the resources of a backend as reported by the backend (not the db).
672

673
    """
674
    nodes = get_ganeti_nodes(backend, bulk=True)
675
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
676
    res = {}
677
    for a in attr:
678
        res[a] = 0
679
    for n in nodes:
680
        # Filter out drained, offline and not vm_capable nodes since they will
681
        # not take part in the vm allocation process
682
        if n['vm_capable'] and not n['drained'] and not n['offline']\
683
           and n['cnodes']:
684
            for a in attr:
685
                res[a] += int(n[a])
686
    return res
687

    
688

    
689
def update_resources(backend, resources=None):
690
    """ Update the state of the backend resources in db.
691

692
    """
693

    
694
    if not resources:
695
        resources = get_physical_resources(backend)
696

    
697
    backend.mfree = resources['mfree']
698
    backend.mtotal = resources['mtotal']
699
    backend.dfree = resources['dfree']
700
    backend.dtotal = resources['dtotal']
701
    backend.pinst_cnt = resources['pinst_cnt']
702
    backend.ctotal = resources['ctotal']
703
    backend.updated = datetime.now()
704
    backend.save()
705

    
706

    
707
def get_memory_from_instances(backend):
708
    """ Get the memory that is used from instances.
709

710
    Get the used memory of a backend. Note: This is different for
711
    the real memory used, due to kvm's memory de-duplication.
712

713
    """
714
    with pooled_rapi_client(backend) as client:
715
        instances = client.GetInstances(bulk=True)
716
    mem = 0
717
    for i in instances:
718
        mem += i['oper_ram']
719
    return mem
720

    
721
##
722
## Synchronized operations for reconciliation
723
##
724

    
725

    
726
def create_network_synced(network, backend):
727
    result = _create_network_synced(network, backend)
728
    if result[0] != 'success':
729
        return result
730
    result = connect_network_synced(network, backend)
731
    return result
732

    
733

    
734
def _create_network_synced(network, backend):
735
    with pooled_rapi_client(backend) as client:
736
        job = _create_network(network, backend)
737
        result = wait_for_job(client, job)
738
    return result
739

    
740

    
741
def connect_network_synced(network, backend):
742
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
743
        mode = 'routed'
744
    else:
745
        mode = 'bridged'
746
    with pooled_rapi_client(backend) as client:
747
        for group in client.GetGroups():
748
            job = client.ConnectNetwork(network.backend_id, group, mode,
749
                                        network.link)
750
            result = wait_for_job(client, job)
751
            if result[0] != 'success':
752
                return result
753

    
754
    return result
755

    
756

    
757
def wait_for_job(client, jobid):
758
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
759
    status = result['job_info'][0]
760
    while status not in ['success', 'error', 'cancel']:
761
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
762
                                        [result], None)
763
        status = result['job_info'][0]
764

    
765
    if status == 'success':
766
        return (status, None)
767
    else:
768
        error = result['job_info'][1]
769
        return (status, error)