Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 2c022086

History | View | Annotate | Download (31.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
60
    """Handle quotas for updated VirtualMachine.
61

62
    Update quotas for the updated VirtualMachine based on the job that run on
63
    the Ganeti backend. If a commission has been already issued for this job,
64
    then this commission is just accepted or rejected based on the job status.
65
    Otherwise, a new commission for the given change is issued, that is also in
66
    force and auto-accept mode. In this case, previous commissions are
67
    rejected, since they reflect a previous state of the VM.
68

69
    """
70
    if job_status not in ["success", "error", "canceled"]:
71
        return
72

    
73
    # Check successful completion of a job will trigger any quotable change in
74
    # the VM state.
75
    action = utils.get_action_from_opcode(job_opcode, job_fields)
76
    commission_info = quotas.get_commission_info(vm, action=action,
77
                                                 action_fields=job_fields)
78

    
79
    if vm.task_job_id == job_id and vm.serial is not None:
80
        # Commission for this change has already been issued. So just
81
        # accept/reject it
82
        serial = vm.serial
83
        if job_status == "success":
84
            quotas.accept_serial(serial)
85
        elif job_status in ["error", "canceled"]:
86
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
87
                      serial)
88
            quotas.reject_serial(serial)
89
        vm.serial = None
90
    elif job_status == "success" and commission_info is not None:
91
        log.debug("Expected job was %s. Processing job %s. Commission for"
92
                  " this job: %s", vm.task_job_id, job_id, commission_info)
93
        # Commission for this change has not been issued, or the issued
94
        # commission was unaware of the current change. Reject all previous
95
        # commissions and create a new one in forced mode!
96
        previous_serial = vm.serial
97
        if previous_serial and not previous_serial.resolved:
98
            quotas.resolve_vm_commission(previous_serial)
99
        serial = quotas.issue_commission(user=vm.userid,
100
                                         source=quotas.DEFAULT_SOURCE,
101
                                         provisions=commission_info,
102
                                         force=True,
103
                                         auto_accept=True)
104
        # Clear VM's serial. Expected job may arrive later. However correlated
105
        # serial must not be accepted, since it reflects a previous VM state
106
        vm.serial = None
107

    
108
    return vm
109

    
110

    
111
@transaction.commit_on_success
112
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
113
                      beparams=None):
114
    """Process a job progress notification from the backend
115

116
    Process an incoming message from the backend (currently Ganeti).
117
    Job notifications with a terminating status (sucess, error, or canceled),
118
    also update the operating state of the VM.
119

120
    """
121
    # See #1492, #1031, #1111 why this line has been removed
122
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
123
    if status not in [x[0] for x in BACKEND_STATUSES]:
124
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
125

    
126
    vm.backendjobid = jobid
127
    vm.backendjobstatus = status
128
    vm.backendopcode = opcode
129
    vm.backendlogmsg = logmsg
130

    
131
    if status in ["queued", "waiting", "running"]:
132
        vm.save()
133
        return
134

    
135
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
136
    # Notifications of success change the operating state
137
    if status == "success":
138
        if state_for_success is not None:
139
            vm.operstate = state_for_success
140
        if nics is not None:
141
            # Update the NICs of the VM
142
            _process_net_status(vm, etime, nics)
143
        if beparams:
144
            # Change the flavor of the VM
145
            _process_resize(vm, beparams)
146
        # Update backendtime only for jobs that have been successfully
147
        # completed, since only these jobs update the state of the VM. Else a
148
        # "race condition" may occur when a successful job (e.g.
149
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
150
        # in reversed order.
151
        vm.backendtime = etime
152

    
153
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
154
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
155
        vm.operstate = 'ERROR'
156
        vm.backendtime = etime
157
    elif opcode == 'OP_INSTANCE_REMOVE':
158
        # Set the deleted flag explicitly, cater for admin-initiated removals
159
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
160
        # when no instance exists at the Ganeti backend.
161
        # See ticket #799 for all the details.
162
        if status == 'success' or (status == 'error' and
163
                                   vm.operstate == 'ERROR'):
164
            # VM has been deleted. Release the instance IPs
165
            release_instance_ips(vm, [])
166
            # And delete the releated NICs (must be performed after release!)
167
            vm.nics.all().delete()
168
            vm.deleted = True
169
            vm.operstate = state_for_success
170
            vm.backendtime = etime
171
            status = "success"
172

    
173
    if status in ["success", "error", "canceled"]:
174
        # Job is finalized: Handle quotas/commissioning
175
        job_fields = {"nics": nics, "beparams": beparams}
176
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
177
                              job_status=status, job_fields=job_fields)
178
        # and clear task fields
179
        if vm.task_job_id == jobid:
180
            vm.task = None
181
            vm.task_job_id = None
182

    
183
    vm.save()
184

    
185

    
186
def _process_resize(vm, beparams):
187
    """Change flavor of a VirtualMachine based on new beparams."""
188
    old_flavor = vm.flavor
189
    vcpus = beparams.get("vcpus", old_flavor.cpu)
190
    ram = beparams.get("maxmem", old_flavor.ram)
191
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
192
        return
193
    try:
194
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
195
                                        disk=old_flavor.disk,
196
                                        disk_template=old_flavor.disk_template)
197
    except Flavor.DoesNotExist:
198
        raise Exception("Can not find flavor for VM")
199
    vm.flavor = new_flavor
200
    vm.save()
201

    
202

    
203
@transaction.commit_on_success
204
def process_net_status(vm, etime, nics):
205
    """Wrap _process_net_status inside transaction."""
206
    _process_net_status(vm, etime, nics)
207

    
208

    
209
def _process_net_status(vm, etime, nics):
210
    """Process a net status notification from the backend
211

212
    Process an incoming message from the Ganeti backend,
213
    detailing the NIC configuration of a VM instance.
214

215
    Update the state of the VM in the DB accordingly.
216
    """
217

    
218
    ganeti_nics = process_ganeti_nics(nics)
219
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
220
        log.debug("NICs for VM %s have not changed", vm)
221
        return
222

    
223
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
224
    # guarantee that no deadlock will occur with Backend allocator.
225
    Backend.objects.select_for_update().get(id=vm.backend_id)
226

    
227
    # NICs have changed. Release the instance IPs
228
    release_instance_ips(vm, ganeti_nics)
229
    # And delete the releated NICs (must be performed after release!)
230
    vm.nics.all().delete()
231

    
232
    for nic in ganeti_nics:
233
        ipv4 = nic.get('ipv4', '')
234
        net = nic['network']
235
        if ipv4:
236
            net.reserve_address(ipv4)
237

    
238
        nic['dirty'] = False
239
        vm.nics.create(**nic)
240
        # Dummy save the network, because UI uses changed-since for VMs
241
        # and Networks in order to show the VM NICs
242
        net.save()
243

    
244
    vm.backendtime = etime
245
    vm.save()
246

    
247

    
248
def process_ganeti_nics(ganeti_nics):
249
    """Process NIC dict from ganeti hooks."""
250
    new_nics = []
251
    for i, new_nic in enumerate(ganeti_nics):
252
        network = new_nic.get('network', '')
253
        n = str(network)
254
        pk = utils.id_from_network_name(n)
255

    
256
        net = Network.objects.get(pk=pk)
257

    
258
        # Get the new nic info
259
        mac = new_nic.get('mac', '')
260
        ipv4 = new_nic.get('ip', '')
261
        if net.subnet6:
262
            ipv6 = mac2eui64(mac, net.subnet6)
263
        else:
264
            ipv6 = ''
265

    
266
        firewall = new_nic.get('firewall', '')
267
        firewall_profile = _reverse_tags.get(firewall, '')
268
        if not firewall_profile and net.public:
269
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
270

    
271
        nic = {
272
            'index': i,
273
            'network': net,
274
            'mac': mac,
275
            'ipv4': ipv4,
276
            'ipv6': ipv6,
277
            'firewall_profile': firewall_profile,
278
            'state': 'ACTIVE'}
279

    
280
        new_nics.append(nic)
281
    return new_nics
282

    
283

    
284
def nics_changed(old_nics, new_nics):
285
    """Return True if NICs have changed in any way."""
286
    if len(old_nics) != len(new_nics):
287
        return True
288
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
289
    for old_nic, new_nic in zip(old_nics, new_nics):
290
        for field in fields:
291
            if getattr(old_nic, field) != new_nic[field]:
292
                return True
293
    return False
294

    
295

    
296
def release_instance_ips(vm, ganeti_nics):
297
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
298
    new_addresses = set(map(lambda nic: (nic["network"], nic["ipv4"]),
299
                            ganeti_nics))
300
    to_release = old_addresses - new_addresses
301
    for (network_id, ipv4) in to_release:
302
        if ipv4:
303
            net = Network.objects.get(id=network_id)
304
            # Important: Take exclusive lock in pool before checking if there
305
            # is a floating IP with this ipv4 address, otherwise there is a
306
            # race condition, where you may release a floating IP that has been
307
            # created after search floating IPs and before you get exclusively
308
            # the pool
309
            pool = net.get_pool()
310
            try:
311
                floating_ip = net.floating_ips.select_for_update()\
312
                                              .get(ipv4=ipv4, machine=vm,
313
                                                   deleted=False)
314
                floating_ip.machine = None
315
                floating_ip.save()
316
            except FloatingIP.DoesNotExist:
317
                net.release_address(ipv4)
318
                pool.save()
319

    
320

    
321
@transaction.commit_on_success
322
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
323
    if status not in [x[0] for x in BACKEND_STATUSES]:
324
        raise Network.InvalidBackendMsgError(opcode, status)
325

    
326
    back_network.backendjobid = jobid
327
    back_network.backendjobstatus = status
328
    back_network.backendopcode = opcode
329
    back_network.backendlogmsg = logmsg
330

    
331
    # Notifications of success change the operating state
332
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
333
    if status == 'success' and state_for_success is not None:
334
        back_network.operstate = state_for_success
335

    
336
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
337
        back_network.operstate = 'ERROR'
338
        back_network.backendtime = etime
339

    
340
    if opcode == 'OP_NETWORK_REMOVE':
341
        if status == 'success' or (status == 'error' and
342
                                   back_network.operstate == 'ERROR'):
343
            back_network.operstate = state_for_success
344
            back_network.deleted = True
345
            back_network.backendtime = etime
346

    
347
    if status == 'success':
348
        back_network.backendtime = etime
349
    back_network.save()
350
    # Also you must update the state of the Network!!
351
    update_network_state(back_network.network)
352

    
353

    
354
def update_network_state(network):
355
    """Update the state of a Network based on BackendNetwork states.
356

357
    Update the state of a Network based on the operstate of the networks in the
358
    backends that network exists.
359

360
    The state of the network is:
361
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
362
    * DELETED: If it is is 'DELETED' in all backends that have been created.
363

364
    This function also releases the resources (MAC prefix or Bridge) and the
365
    quotas for the network.
366

367
    """
368
    if network.deleted:
369
        # Network has already been deleted. Just assert that state is also
370
        # DELETED
371
        if not network.state == "DELETED":
372
            network.state = "DELETED"
373
            network.save()
374
        return
375

    
376
    backend_states = [s.operstate for s in network.backend_networks.all()]
377
    if not backend_states and network.action != "DESTROY":
378
        if network.state != "ACTIVE":
379
            network.state = "ACTIVE"
380
            network.save()
381
            return
382

    
383
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
384
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
385
                     "DELETED")
386

    
387
    # Release the resources on the deletion of the Network
388
    if deleted:
389
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
390
                 network.id, network.mac_prefix, network.link)
391
        network.deleted = True
392
        network.state = "DELETED"
393
        if network.mac_prefix:
394
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
395
                release_resource(res_type="mac_prefix",
396
                                 value=network.mac_prefix)
397
        if network.link:
398
            if network.FLAVORS[network.flavor]["link"] == "pool":
399
                release_resource(res_type="bridge", value=network.link)
400

    
401
        # Issue commission
402
        if network.userid:
403
            quotas.issue_and_accept_commission(network, delete=True)
404
        elif not network.public:
405
            log.warning("Network %s does not have an owner!", network.id)
406
    network.save()
407

    
408

    
409
@transaction.commit_on_success
410
def process_network_modify(back_network, etime, jobid, opcode, status,
411
                           add_reserved_ips, remove_reserved_ips):
412
    assert (opcode == "OP_NETWORK_SET_PARAMS")
413
    if status not in [x[0] for x in BACKEND_STATUSES]:
414
        raise Network.InvalidBackendMsgError(opcode, status)
415

    
416
    back_network.backendjobid = jobid
417
    back_network.backendjobstatus = status
418
    back_network.opcode = opcode
419

    
420
    if add_reserved_ips or remove_reserved_ips:
421
        net = back_network.network
422
        pool = net.get_pool()
423
        if add_reserved_ips:
424
            for ip in add_reserved_ips:
425
                pool.reserve(ip, external=True)
426
        if remove_reserved_ips:
427
            for ip in remove_reserved_ips:
428
                pool.put(ip, external=True)
429
        pool.save()
430

    
431
    if status == 'success':
432
        back_network.backendtime = etime
433
    back_network.save()
434

    
435

    
436
@transaction.commit_on_success
437
def process_create_progress(vm, etime, progress):
438

    
439
    percentage = int(progress)
440

    
441
    # The percentage may exceed 100%, due to the way
442
    # snf-image:copy-progress tracks bytes read by image handling processes
443
    percentage = 100 if percentage > 100 else percentage
444
    if percentage < 0:
445
        raise ValueError("Percentage cannot be negative")
446

    
447
    # FIXME: log a warning here, see #1033
448
#   if last_update > percentage:
449
#       raise ValueError("Build percentage should increase monotonically " \
450
#                        "(old = %d, new = %d)" % (last_update, percentage))
451

    
452
    # This assumes that no message of type 'ganeti-create-progress' is going to
453
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
454
    # the instance is STARTED.  What if the two messages are processed by two
455
    # separate dispatcher threads, and the 'ganeti-op-status' message for
456
    # successful creation gets processed before the 'ganeti-create-progress'
457
    # message? [vkoukis]
458
    #
459
    #if not vm.operstate == 'BUILD':
460
    #    raise VirtualMachine.IllegalState("VM is not in building state")
461

    
462
    vm.buildpercentage = percentage
463
    vm.backendtime = etime
464
    vm.save()
465

    
466

    
467
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
468
                               details=None):
469
    """
470
    Create virtual machine instance diagnostic entry.
471

472
    :param vm: VirtualMachine instance to create diagnostic for.
473
    :param message: Diagnostic message.
474
    :param source: Diagnostic source identifier (e.g. image-helper).
475
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
476
    :param etime: The time the message occured (if available).
477
    :param details: Additional details or debug information.
478
    """
479
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
480
                                                   source_date=etime,
481
                                                   message=message,
482
                                                   details=details)
483

    
484

    
485
def create_instance(vm, nics, flavor, image):
486
    """`image` is a dictionary which should contain the keys:
487
            'backend_id', 'format' and 'metadata'
488

489
        metadata value should be a dictionary.
490
    """
491

    
492
    # Handle arguments to CreateInstance() as a dictionary,
493
    # initialize it based on a deployment-specific value.
494
    # This enables the administrator to override deployment-specific
495
    # arguments, such as the disk template to use, name of os provider
496
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
497
    #
498
    kw = vm.backend.get_create_params()
499
    kw['mode'] = 'create'
500
    kw['name'] = vm.backend_vm_id
501
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
502

    
503
    kw['disk_template'] = flavor.disk_template
504
    kw['disks'] = [{"size": flavor.disk * 1024}]
505
    provider = flavor.disk_provider
506
    if provider:
507
        kw['disks'][0]['provider'] = provider
508
        kw['disks'][0]['origin'] = flavor.disk_origin
509

    
510
    kw['nics'] = nics
511
    if vm.backend.use_hotplug():
512
        kw['hotplug'] = True
513
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
514
    # kw['os'] = settings.GANETI_OS_PROVIDER
515
    kw['ip_check'] = False
516
    kw['name_check'] = False
517

    
518
    # Do not specific a node explicitly, have
519
    # Ganeti use an iallocator instead
520
    #kw['pnode'] = rapi.GetNodes()[0]
521

    
522
    kw['dry_run'] = settings.TEST
523

    
524
    kw['beparams'] = {
525
        'auto_balance': True,
526
        'vcpus': flavor.cpu,
527
        'memory': flavor.ram}
528

    
529
    kw['osparams'] = {
530
        'config_url': vm.config_url,
531
        # Store image id and format to Ganeti
532
        'img_id': image['backend_id'],
533
        'img_format': image['format']}
534

    
535
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
536
    # kw['hvparams'] = dict(serial_console=False)
537

    
538
    log.debug("Creating instance %s", utils.hide_pass(kw))
539
    with pooled_rapi_client(vm) as client:
540
        return client.CreateInstance(**kw)
541

    
542

    
543
def delete_instance(vm):
544
    with pooled_rapi_client(vm) as client:
545
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
546

    
547

    
548
def reboot_instance(vm, reboot_type):
549
    assert reboot_type in ('soft', 'hard')
550
    with pooled_rapi_client(vm) as client:
551
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
552
                                     dry_run=settings.TEST)
553

    
554

    
555
def startup_instance(vm):
556
    with pooled_rapi_client(vm) as client:
557
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
558

    
559

    
560
def shutdown_instance(vm):
561
    with pooled_rapi_client(vm) as client:
562
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
563

    
564

    
565
def resize_instance(vm, vcpus, memory):
566
    beparams = {"vcpus": int(vcpus),
567
                "minmem": int(memory),
568
                "maxmem": int(memory)}
569
    with pooled_rapi_client(vm) as client:
570
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
571

    
572

    
573
def get_instance_console(vm):
574
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
575
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
576
    # useless (see #783).
577
    #
578
    # Until this is fixed on the Ganeti side, construct a console info reply
579
    # directly.
580
    #
581
    # WARNING: This assumes that VNC runs on port network_port on
582
    #          the instance's primary node, and is probably
583
    #          hypervisor-specific.
584
    #
585
    log.debug("Getting console for vm %s", vm)
586

    
587
    console = {}
588
    console['kind'] = 'vnc'
589

    
590
    with pooled_rapi_client(vm) as client:
591
        i = client.GetInstance(vm.backend_vm_id)
592

    
593
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
594
        raise Exception("hv parameter serial_console cannot be true")
595
    console['host'] = i['pnode']
596
    console['port'] = i['network_port']
597

    
598
    return console
599

    
600

    
601
def get_instance_info(vm):
602
    with pooled_rapi_client(vm) as client:
603
        return client.GetInstanceInfo(vm.backend_vm_id)
604

    
605

    
606
def create_network(network, backend, connect=True):
607
    """Create a network in a Ganeti backend"""
608
    log.debug("Creating network %s in backend %s", network, backend)
609

    
610
    job_id = _create_network(network, backend)
611

    
612
    if connect:
613
        job_ids = connect_network(network, backend, depends=[job_id])
614
        return job_ids
615
    else:
616
        return [job_id]
617

    
618

    
619
def _create_network(network, backend):
620
    """Create a network."""
621

    
622
    network_type = network.public and 'public' or 'private'
623

    
624
    tags = network.backend_tag
625
    if network.dhcp:
626
        tags.append('nfdhcpd')
627

    
628
    if network.public:
629
        conflicts_check = True
630
    else:
631
        conflicts_check = False
632

    
633
    try:
634
        bn = BackendNetwork.objects.get(network=network, backend=backend)
635
        mac_prefix = bn.mac_prefix
636
    except BackendNetwork.DoesNotExist:
637
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
638
                        " does not exist" % (network.id, backend.id))
639

    
640
    with pooled_rapi_client(backend) as client:
641
        return client.CreateNetwork(network_name=network.backend_id,
642
                                    network=network.subnet,
643
                                    network6=network.subnet6,
644
                                    gateway=network.gateway,
645
                                    gateway6=network.gateway6,
646
                                    network_type=network_type,
647
                                    mac_prefix=mac_prefix,
648
                                    conflicts_check=conflicts_check,
649
                                    tags=tags)
650

    
651

    
652
def connect_network(network, backend, depends=[], group=None):
653
    """Connect a network to nodegroups."""
654
    log.debug("Connecting network %s to backend %s", network, backend)
655

    
656
    if network.public:
657
        conflicts_check = True
658
    else:
659
        conflicts_check = False
660

    
661
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
662
    with pooled_rapi_client(backend) as client:
663
        groups = [group] if group is not None else client.GetGroups()
664
        job_ids = []
665
        for group in groups:
666
            job_id = client.ConnectNetwork(network.backend_id, group,
667
                                           network.mode, network.link,
668
                                           conflicts_check,
669
                                           depends=depends)
670
            job_ids.append(job_id)
671
    return job_ids
672

    
673

    
674
def delete_network(network, backend, disconnect=True):
675
    log.debug("Deleting network %s from backend %s", network, backend)
676

    
677
    depends = []
678
    if disconnect:
679
        depends = disconnect_network(network, backend)
680
    _delete_network(network, backend, depends=depends)
681

    
682

    
683
def _delete_network(network, backend, depends=[]):
684
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
685
    with pooled_rapi_client(backend) as client:
686
        return client.DeleteNetwork(network.backend_id, depends)
687

    
688

    
689
def disconnect_network(network, backend, group=None):
690
    log.debug("Disconnecting network %s to backend %s", network, backend)
691

    
692
    with pooled_rapi_client(backend) as client:
693
        groups = [group] if group is not None else client.GetGroups()
694
        job_ids = []
695
        for group in groups:
696
            job_id = client.DisconnectNetwork(network.backend_id, group)
697
            job_ids.append(job_id)
698
    return job_ids
699

    
700

    
701
def connect_to_network(vm, network, address=None):
702
    backend = vm.backend
703
    network = Network.objects.select_for_update().get(id=network.id)
704
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
705
                                                         network=network)
706
    depend_jobs = []
707
    if bnet.operstate != "ACTIVE":
708
        depend_jobs = create_network(network, backend, connect=True)
709

    
710
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
711

    
712
    nic = {'ip': address, 'network': network.backend_id}
713

    
714
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
715

    
716
    with pooled_rapi_client(vm) as client:
717
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
718
                                     hotplug=vm.backend.use_hotplug(),
719
                                     depends=depends,
720
                                     dry_run=settings.TEST)
721

    
722

    
723
def disconnect_from_network(vm, nic):
724
    op = [('remove', nic.index, {})]
725

    
726
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
727

    
728
    with pooled_rapi_client(vm) as client:
729
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
730
                                     hotplug=vm.backend.use_hotplug(),
731
                                     dry_run=settings.TEST)
732

    
733

    
734
def set_firewall_profile(vm, profile):
735
    try:
736
        tag = _firewall_tags[profile]
737
    except KeyError:
738
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
739

    
740
    log.debug("Setting tag of VM %s to %s", vm, profile)
741

    
742
    with pooled_rapi_client(vm) as client:
743
        # Delete all firewall tags
744
        for t in _firewall_tags.values():
745
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
746
                                      dry_run=settings.TEST)
747

    
748
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
749

    
750
        # XXX NOP ModifyInstance call to force process_net_status to run
751
        # on the dispatcher
752
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
753
        client.ModifyInstance(vm.backend_vm_id,
754
                              os_name=os_name)
755
    return None
756

    
757

    
758
def get_instances(backend, bulk=True):
759
    with pooled_rapi_client(backend) as c:
760
        return c.GetInstances(bulk=bulk)
761

    
762

    
763
def get_nodes(backend, bulk=True):
764
    with pooled_rapi_client(backend) as c:
765
        return c.GetNodes(bulk=bulk)
766

    
767

    
768
def get_jobs(backend):
769
    with pooled_rapi_client(backend) as c:
770
        return c.GetJobs()
771

    
772

    
773
def get_physical_resources(backend):
774
    """ Get the physical resources of a backend.
775

776
    Get the resources of a backend as reported by the backend (not the db).
777

778
    """
779
    nodes = get_nodes(backend, bulk=True)
780
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
781
    res = {}
782
    for a in attr:
783
        res[a] = 0
784
    for n in nodes:
785
        # Filter out drained, offline and not vm_capable nodes since they will
786
        # not take part in the vm allocation process
787
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
788
        if can_host_vms and n['cnodes']:
789
            for a in attr:
790
                res[a] += int(n[a])
791
    return res
792

    
793

    
794
def update_resources(backend, resources=None):
795
    """ Update the state of the backend resources in db.
796

797
    """
798

    
799
    if not resources:
800
        resources = get_physical_resources(backend)
801

    
802
    backend.mfree = resources['mfree']
803
    backend.mtotal = resources['mtotal']
804
    backend.dfree = resources['dfree']
805
    backend.dtotal = resources['dtotal']
806
    backend.pinst_cnt = resources['pinst_cnt']
807
    backend.ctotal = resources['ctotal']
808
    backend.updated = datetime.now()
809
    backend.save()
810

    
811

    
812
def get_memory_from_instances(backend):
813
    """ Get the memory that is used from instances.
814

815
    Get the used memory of a backend. Note: This is different for
816
    the real memory used, due to kvm's memory de-duplication.
817

818
    """
819
    with pooled_rapi_client(backend) as client:
820
        instances = client.GetInstances(bulk=True)
821
    mem = 0
822
    for i in instances:
823
        mem += i['oper_ram']
824
    return mem
825

    
826
##
827
## Synchronized operations for reconciliation
828
##
829

    
830

    
831
def create_network_synced(network, backend):
832
    result = _create_network_synced(network, backend)
833
    if result[0] != 'success':
834
        return result
835
    result = connect_network_synced(network, backend)
836
    return result
837

    
838

    
839
def _create_network_synced(network, backend):
840
    with pooled_rapi_client(backend) as client:
841
        job = _create_network(network, backend)
842
        result = wait_for_job(client, job)
843
    return result
844

    
845

    
846
def connect_network_synced(network, backend):
847
    with pooled_rapi_client(backend) as client:
848
        for group in client.GetGroups():
849
            job = client.ConnectNetwork(network.backend_id, group,
850
                                        network.mode, network.link)
851
            result = wait_for_job(client, job)
852
            if result[0] != 'success':
853
                return result
854

    
855
    return result
856

    
857

    
858
def wait_for_job(client, jobid):
859
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
860
    status = result['job_info'][0]
861
    while status not in ['success', 'error', 'cancel']:
862
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
863
                                         [result], None)
864
        status = result['job_info'][0]
865

    
866
    if status == 'success':
867
        return (status, None)
868
    else:
869
        error = result['job_info'][1]
870
        return (status, error)