Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 41a7fae7

History | View | Annotate | Download (29.8 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45

    
46
from logging import getLogger
47
log = getLogger(__name__)
48

    
49

    
50
_firewall_tags = {
51
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
52
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
53
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
54

    
55
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
56

    
57

    
58
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
59
    """Handle quotas for updated VirtualMachine.
60

61
    Update quotas for the updated VirtualMachine based on the job that run on
62
    the Ganeti backend. If a commission has been already issued for this job,
63
    then this commission is just accepted or rejected based on the job status.
64
    Otherwise, a new commission for the given change is issued, that is also in
65
    force and auto-accept mode. In this case, previous commissions are
66
    rejected, since they reflect a previous state of the VM.
67

68
    """
69
    if job_status not in ["success", "error", "canceled"]:
70
        return
71

    
72
    # Check successful completion of a job will trigger any quotable change in
73
    # the VM state.
74
    action = utils.get_action_from_opcode(job_opcode, job_fields)
75
    commission_info = quotas.get_commission_info(vm, action=action,
76
                                                 action_fields=job_fields)
77

    
78
    if vm.task_job_id == job_id and vm.serial is not None:
79
        # Commission for this change has already been issued. So just
80
        # accept/reject it
81
        serial = vm.serial
82
        if job_status == "success":
83
            quotas.accept_serial(serial)
84
        elif job_status in ["error", "canceled"]:
85
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
86
                      serial)
87
            quotas.reject_serial(serial)
88
        vm.serial = None
89
    elif job_status == "success" and commission_info is not None:
90
        log.debug("Expected job was %s. Processing job %s. Commission for"
91
                  " this job: %s", vm.task_job_id, job_id, commission_info)
92
        # Commission for this change has not been issued, or the issued
93
        # commission was unaware of the current change. Reject all previous
94
        # commissions and create a new one in forced mode!
95
        previous_serial = vm.serial
96
        if previous_serial and not previous_serial.resolved:
97
            quotas.resolve_vm_commission(previous_serial)
98
        serial = quotas.issue_commission(user=vm.userid,
99
                                         source=quotas.DEFAULT_SOURCE,
100
                                         provisions=commission_info,
101
                                         force=True,
102
                                         auto_accept=True)
103
        # Clear VM's serial. Expected job may arrive later. However correlated
104
        # serial must not be accepted, since it reflects a previous VM state
105
        vm.serial = None
106

    
107
    return vm
108

    
109

    
110
@transaction.commit_on_success
111
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
112
                      beparams=None):
113
    """Process a job progress notification from the backend
114

115
    Process an incoming message from the backend (currently Ganeti).
116
    Job notifications with a terminating status (sucess, error, or canceled),
117
    also update the operating state of the VM.
118

119
    """
120
    # See #1492, #1031, #1111 why this line has been removed
121
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
122
    if status not in [x[0] for x in BACKEND_STATUSES]:
123
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
124

    
125
    vm.backendjobid = jobid
126
    vm.backendjobstatus = status
127
    vm.backendopcode = opcode
128
    vm.backendlogmsg = logmsg
129

    
130
    if status in ["queued", "waiting", "running"]:
131
        vm.save()
132
        return
133

    
134
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
135
    # Notifications of success change the operating state
136
    if status == "success":
137
        if state_for_success is not None:
138
            vm.operstate = state_for_success
139
        if nics is not None:
140
            # Update the NICs of the VM
141
            _process_net_status(vm, etime, nics)
142
        if beparams:
143
            # Change the flavor of the VM
144
            _process_resize(vm, beparams)
145
        # Update backendtime only for jobs that have been successfully
146
        # completed, since only these jobs update the state of the VM. Else a
147
        # "race condition" may occur when a successful job (e.g.
148
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
149
        # in reversed order.
150
        vm.backendtime = etime
151

    
152
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
153
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
154
        vm.operstate = 'ERROR'
155
        vm.backendtime = etime
156
    elif opcode == 'OP_INSTANCE_REMOVE':
157
        # Set the deleted flag explicitly, cater for admin-initiated removals
158
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
159
        # when no instance exists at the Ganeti backend.
160
        # See ticket #799 for all the details.
161
        if status == 'success' or (status == 'error' and
162
                                   vm.operstate == 'ERROR'):
163
            _process_net_status(vm, etime, nics=[])
164
            vm.deleted = True
165
            vm.operstate = state_for_success
166
            vm.backendtime = etime
167
            status = "success"
168

    
169
    if status in ["success", "error", "canceled"]:
170
        # Job is finalized: Handle quotas/commissioning
171
        job_fields = {"nics": nics, "beparams": beparams}
172
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
173
                              job_status=status, job_fields=job_fields)
174
        # and clear task fields
175
        if vm.task_job_id == jobid:
176
            vm.task = None
177
            vm.task_job_id = None
178

    
179
    vm.save()
180

    
181

    
182
def _process_resize(vm, beparams):
183
    """Change flavor of a VirtualMachine based on new beparams."""
184
    old_flavor = vm.flavor
185
    vcpus = beparams.get("vcpus", old_flavor.cpu)
186
    ram = beparams.get("maxmem", old_flavor.ram)
187
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
188
        return
189
    try:
190
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
191
                                        disk=old_flavor.disk,
192
                                        disk_template=old_flavor.disk_template)
193
    except Flavor.DoesNotExist:
194
        raise Exception("Can not find flavor for VM")
195
    vm.flavor = new_flavor
196
    vm.save()
197

    
198

    
199
@transaction.commit_on_success
200
def process_net_status(vm, etime, nics):
201
    """Wrap _process_net_status inside transaction."""
202
    _process_net_status(vm, etime, nics)
203

    
204

    
205
def _process_net_status(vm, etime, nics):
206
    """Process a net status notification from the backend
207

208
    Process an incoming message from the Ganeti backend,
209
    detailing the NIC configuration of a VM instance.
210

211
    Update the state of the VM in the DB accordingly.
212
    """
213

    
214
    ganeti_nics = process_ganeti_nics(nics)
215
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
216
        log.debug("NICs for VM %s have not changed", vm)
217
        return
218

    
219
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
220
    # guarantee that no deadlock will occur with Backend allocator.
221
    Backend.objects.select_for_update().get(id=vm.backend_id)
222

    
223
    release_instance_nics(vm)
224

    
225
    for nic in ganeti_nics:
226
        ipv4 = nic.get('ipv4', '')
227
        net = nic['network']
228
        if ipv4:
229
            net.reserve_address(ipv4)
230

    
231
        nic['dirty'] = False
232
        vm.nics.create(**nic)
233
        # Dummy save the network, because UI uses changed-since for VMs
234
        # and Networks in order to show the VM NICs
235
        net.save()
236

    
237
    vm.backendtime = etime
238
    vm.save()
239

    
240

    
241
def process_ganeti_nics(ganeti_nics):
242
    """Process NIC dict from ganeti hooks."""
243
    new_nics = []
244
    for i, new_nic in enumerate(ganeti_nics):
245
        network = new_nic.get('network', '')
246
        n = str(network)
247
        pk = utils.id_from_network_name(n)
248

    
249
        net = Network.objects.get(pk=pk)
250

    
251
        # Get the new nic info
252
        mac = new_nic.get('mac', '')
253
        ipv4 = new_nic.get('ip', '')
254
        if net.subnet6:
255
            ipv6 = mac2eui64(mac, net.subnet6)
256
        else:
257
            ipv6 = ''
258

    
259
        firewall = new_nic.get('firewall', '')
260
        firewall_profile = _reverse_tags.get(firewall, '')
261
        if not firewall_profile and net.public:
262
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
263

    
264
        nic = {
265
            'index': i,
266
            'network': net,
267
            'mac': mac,
268
            'ipv4': ipv4,
269
            'ipv6': ipv6,
270
            'firewall_profile': firewall_profile,
271
            'state': 'ACTIVE'}
272

    
273
        new_nics.append(nic)
274
    return new_nics
275

    
276

    
277
def nics_changed(old_nics, new_nics):
278
    """Return True if NICs have changed in any way."""
279
    if len(old_nics) != len(new_nics):
280
        return True
281
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
282
    for old_nic, new_nic in zip(old_nics, new_nics):
283
        for field in fields:
284
            if getattr(old_nic, field) != new_nic[field]:
285
                return True
286
    return False
287

    
288

    
289
def release_instance_nics(vm):
290
    for nic in vm.nics.all():
291
        net = nic.network
292
        if nic.ipv4:
293
            net.release_address(nic.ipv4)
294
        nic.delete()
295
        net.save()
296

    
297

    
298
@transaction.commit_on_success
299
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
300
    if status not in [x[0] for x in BACKEND_STATUSES]:
301
        raise Network.InvalidBackendMsgError(opcode, status)
302

    
303
    back_network.backendjobid = jobid
304
    back_network.backendjobstatus = status
305
    back_network.backendopcode = opcode
306
    back_network.backendlogmsg = logmsg
307

    
308
    # Notifications of success change the operating state
309
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
310
    if status == 'success' and state_for_success is not None:
311
        back_network.operstate = state_for_success
312

    
313
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
314
        back_network.operstate = 'ERROR'
315
        back_network.backendtime = etime
316

    
317
    if opcode == 'OP_NETWORK_REMOVE':
318
        if status == 'success' or (status == 'error' and
319
                                   back_network.operstate == 'ERROR'):
320
            back_network.operstate = state_for_success
321
            back_network.deleted = True
322
            back_network.backendtime = etime
323

    
324
    if status == 'success':
325
        back_network.backendtime = etime
326
    back_network.save()
327
    # Also you must update the state of the Network!!
328
    update_network_state(back_network.network)
329

    
330

    
331
def update_network_state(network):
332
    """Update the state of a Network based on BackendNetwork states.
333

334
    Update the state of a Network based on the operstate of the networks in the
335
    backends that network exists.
336

337
    The state of the network is:
338
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
339
    * DELETED: If it is is 'DELETED' in all backends that have been created.
340

341
    This function also releases the resources (MAC prefix or Bridge) and the
342
    quotas for the network.
343

344
    """
345
    if network.deleted:
346
        # Network has already been deleted. Just assert that state is also
347
        # DELETED
348
        if not network.state == "DELETED":
349
            network.state = "DELETED"
350
            network.save()
351
        return
352

    
353
    backend_states = [s.operstate for s in network.backend_networks.all()]
354
    if not backend_states and network.action != "DESTROY":
355
        if network.state != "ACTIVE":
356
            network.state = "ACTIVE"
357
            network.save()
358
            return
359

    
360
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
361
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
362
                     "DELETED")
363

    
364
    # Release the resources on the deletion of the Network
365
    if deleted:
366
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
367
                 network.id, network.mac_prefix, network.link)
368
        network.deleted = True
369
        network.state = "DELETED"
370
        if network.mac_prefix:
371
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
372
                release_resource(res_type="mac_prefix",
373
                                 value=network.mac_prefix)
374
        if network.link:
375
            if network.FLAVORS[network.flavor]["link"] == "pool":
376
                release_resource(res_type="bridge", value=network.link)
377

    
378
        # Issue commission
379
        if network.userid:
380
            quotas.issue_and_accept_commission(network, delete=True)
381
        elif not network.public:
382
            log.warning("Network %s does not have an owner!", network.id)
383
    network.save()
384

    
385

    
386
@transaction.commit_on_success
387
def process_network_modify(back_network, etime, jobid, opcode, status,
388
                           add_reserved_ips, remove_reserved_ips):
389
    assert (opcode == "OP_NETWORK_SET_PARAMS")
390
    if status not in [x[0] for x in BACKEND_STATUSES]:
391
        raise Network.InvalidBackendMsgError(opcode, status)
392

    
393
    back_network.backendjobid = jobid
394
    back_network.backendjobstatus = status
395
    back_network.opcode = opcode
396

    
397
    if add_reserved_ips or remove_reserved_ips:
398
        net = back_network.network
399
        pool = net.get_pool()
400
        if add_reserved_ips:
401
            for ip in add_reserved_ips:
402
                pool.reserve(ip, external=True)
403
        if remove_reserved_ips:
404
            for ip in remove_reserved_ips:
405
                pool.put(ip, external=True)
406
        pool.save()
407

    
408
    if status == 'success':
409
        back_network.backendtime = etime
410
    back_network.save()
411

    
412

    
413
@transaction.commit_on_success
414
def process_create_progress(vm, etime, progress):
415

    
416
    percentage = int(progress)
417

    
418
    # The percentage may exceed 100%, due to the way
419
    # snf-image:copy-progress tracks bytes read by image handling processes
420
    percentage = 100 if percentage > 100 else percentage
421
    if percentage < 0:
422
        raise ValueError("Percentage cannot be negative")
423

    
424
    # FIXME: log a warning here, see #1033
425
#   if last_update > percentage:
426
#       raise ValueError("Build percentage should increase monotonically " \
427
#                        "(old = %d, new = %d)" % (last_update, percentage))
428

    
429
    # This assumes that no message of type 'ganeti-create-progress' is going to
430
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
431
    # the instance is STARTED.  What if the two messages are processed by two
432
    # separate dispatcher threads, and the 'ganeti-op-status' message for
433
    # successful creation gets processed before the 'ganeti-create-progress'
434
    # message? [vkoukis]
435
    #
436
    #if not vm.operstate == 'BUILD':
437
    #    raise VirtualMachine.IllegalState("VM is not in building state")
438

    
439
    vm.buildpercentage = percentage
440
    vm.backendtime = etime
441
    vm.save()
442

    
443

    
444
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
445
                               details=None):
446
    """
447
    Create virtual machine instance diagnostic entry.
448

449
    :param vm: VirtualMachine instance to create diagnostic for.
450
    :param message: Diagnostic message.
451
    :param source: Diagnostic source identifier (e.g. image-helper).
452
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
453
    :param etime: The time the message occured (if available).
454
    :param details: Additional details or debug information.
455
    """
456
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
457
                                                   source_date=etime,
458
                                                   message=message,
459
                                                   details=details)
460

    
461

    
462
def create_instance(vm, public_nic, flavor, image):
463
    """`image` is a dictionary which should contain the keys:
464
            'backend_id', 'format' and 'metadata'
465

466
        metadata value should be a dictionary.
467
    """
468

    
469
    # Handle arguments to CreateInstance() as a dictionary,
470
    # initialize it based on a deployment-specific value.
471
    # This enables the administrator to override deployment-specific
472
    # arguments, such as the disk template to use, name of os provider
473
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
474
    #
475
    kw = vm.backend.get_create_params()
476
    kw['mode'] = 'create'
477
    kw['name'] = vm.backend_vm_id
478
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
479

    
480
    kw['disk_template'] = flavor.disk_template
481
    kw['disks'] = [{"size": flavor.disk * 1024}]
482
    provider = flavor.disk_provider
483
    if provider:
484
        kw['disks'][0]['provider'] = provider
485
        kw['disks'][0]['origin'] = flavor.disk_origin
486

    
487
    kw['nics'] = [public_nic]
488
    if vm.backend.use_hotplug():
489
        kw['hotplug'] = True
490
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
491
    # kw['os'] = settings.GANETI_OS_PROVIDER
492
    kw['ip_check'] = False
493
    kw['name_check'] = False
494

    
495
    # Do not specific a node explicitly, have
496
    # Ganeti use an iallocator instead
497
    #kw['pnode'] = rapi.GetNodes()[0]
498

    
499
    kw['dry_run'] = settings.TEST
500

    
501
    kw['beparams'] = {
502
        'auto_balance': True,
503
        'vcpus': flavor.cpu,
504
        'memory': flavor.ram}
505

    
506
    kw['osparams'] = {
507
        'config_url': vm.config_url,
508
        # Store image id and format to Ganeti
509
        'img_id': image['backend_id'],
510
        'img_format': image['format']}
511

    
512
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
513
    # kw['hvparams'] = dict(serial_console=False)
514

    
515
    log.debug("Creating instance %s", utils.hide_pass(kw))
516
    with pooled_rapi_client(vm) as client:
517
        return client.CreateInstance(**kw)
518

    
519

    
520
def delete_instance(vm):
521
    with pooled_rapi_client(vm) as client:
522
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
523

    
524

    
525
def reboot_instance(vm, reboot_type):
526
    assert reboot_type in ('soft', 'hard')
527
    with pooled_rapi_client(vm) as client:
528
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
529
                                     dry_run=settings.TEST)
530

    
531

    
532
def startup_instance(vm):
533
    with pooled_rapi_client(vm) as client:
534
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
535

    
536

    
537
def shutdown_instance(vm):
538
    with pooled_rapi_client(vm) as client:
539
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
540

    
541

    
542
def resize_instance(vm, vcpus, memory):
543
    beparams = {"vcpus": int(vcpus),
544
                "minmem": int(memory),
545
                "maxmem": int(memory)}
546
    with pooled_rapi_client(vm) as client:
547
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
548

    
549

    
550
def get_instance_console(vm):
551
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
552
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
553
    # useless (see #783).
554
    #
555
    # Until this is fixed on the Ganeti side, construct a console info reply
556
    # directly.
557
    #
558
    # WARNING: This assumes that VNC runs on port network_port on
559
    #          the instance's primary node, and is probably
560
    #          hypervisor-specific.
561
    #
562
    log.debug("Getting console for vm %s", vm)
563

    
564
    console = {}
565
    console['kind'] = 'vnc'
566

    
567
    with pooled_rapi_client(vm) as client:
568
        i = client.GetInstance(vm.backend_vm_id)
569

    
570
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
571
        raise Exception("hv parameter serial_console cannot be true")
572
    console['host'] = i['pnode']
573
    console['port'] = i['network_port']
574

    
575
    return console
576

    
577

    
578
def get_instance_info(vm):
579
    with pooled_rapi_client(vm) as client:
580
        return client.GetInstanceInfo(vm.backend_vm_id)
581

    
582

    
583
def create_network(network, backend, connect=True):
584
    """Create a network in a Ganeti backend"""
585
    log.debug("Creating network %s in backend %s", network, backend)
586

    
587
    job_id = _create_network(network, backend)
588

    
589
    if connect:
590
        job_ids = connect_network(network, backend, depends=[job_id])
591
        return job_ids
592
    else:
593
        return [job_id]
594

    
595

    
596
def _create_network(network, backend):
597
    """Create a network."""
598

    
599
    network_type = network.public and 'public' or 'private'
600

    
601
    tags = network.backend_tag
602
    if network.dhcp:
603
        tags.append('nfdhcpd')
604

    
605
    if network.public:
606
        conflicts_check = True
607
    else:
608
        conflicts_check = False
609

    
610
    try:
611
        bn = BackendNetwork.objects.get(network=network, backend=backend)
612
        mac_prefix = bn.mac_prefix
613
    except BackendNetwork.DoesNotExist:
614
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
615
                        " does not exist" % (network.id, backend.id))
616

    
617
    with pooled_rapi_client(backend) as client:
618
        return client.CreateNetwork(network_name=network.backend_id,
619
                                    network=network.subnet,
620
                                    network6=network.subnet6,
621
                                    gateway=network.gateway,
622
                                    gateway6=network.gateway6,
623
                                    network_type=network_type,
624
                                    mac_prefix=mac_prefix,
625
                                    conflicts_check=conflicts_check,
626
                                    tags=tags)
627

    
628

    
629
def connect_network(network, backend, depends=[], group=None):
630
    """Connect a network to nodegroups."""
631
    log.debug("Connecting network %s to backend %s", network, backend)
632

    
633
    if network.public:
634
        conflicts_check = True
635
    else:
636
        conflicts_check = False
637

    
638
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
639
    with pooled_rapi_client(backend) as client:
640
        groups = [group] if group is not None else client.GetGroups()
641
        job_ids = []
642
        for group in groups:
643
            job_id = client.ConnectNetwork(network.backend_id, group,
644
                                           network.mode, network.link,
645
                                           conflicts_check,
646
                                           depends=depends)
647
            job_ids.append(job_id)
648
    return job_ids
649

    
650

    
651
def delete_network(network, backend, disconnect=True):
652
    log.debug("Deleting network %s from backend %s", network, backend)
653

    
654
    depends = []
655
    if disconnect:
656
        depends = disconnect_network(network, backend)
657
    _delete_network(network, backend, depends=depends)
658

    
659

    
660
def _delete_network(network, backend, depends=[]):
661
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
662
    with pooled_rapi_client(backend) as client:
663
        return client.DeleteNetwork(network.backend_id, depends)
664

    
665

    
666
def disconnect_network(network, backend, group=None):
667
    log.debug("Disconnecting network %s to backend %s", network, backend)
668

    
669
    with pooled_rapi_client(backend) as client:
670
        groups = [group] if group is not None else client.GetGroups()
671
        job_ids = []
672
        for group in groups:
673
            job_id = client.DisconnectNetwork(network.backend_id, group)
674
            job_ids.append(job_id)
675
    return job_ids
676

    
677

    
678
def connect_to_network(vm, network, address=None):
679
    backend = vm.backend
680
    network = Network.objects.select_for_update().get(id=network.id)
681
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
682
                                                         network=network)
683
    depend_jobs = []
684
    if bnet.operstate != "ACTIVE":
685
        depend_jobs = create_network(network, backend, connect=True)
686

    
687
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
688

    
689
    nic = {'ip': address, 'network': network.backend_id}
690

    
691
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
692

    
693
    with pooled_rapi_client(vm) as client:
694
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
695
                                     hotplug=vm.backend.use_hotplug(),
696
                                     depends=depends,
697
                                     dry_run=settings.TEST)
698

    
699

    
700
def disconnect_from_network(vm, nic):
701
    op = [('remove', nic.index, {})]
702

    
703
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
704

    
705
    with pooled_rapi_client(vm) as client:
706
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
707
                                     hotplug=vm.backend.use_hotplug(),
708
                                     dry_run=settings.TEST)
709

    
710

    
711
def set_firewall_profile(vm, profile):
712
    try:
713
        tag = _firewall_tags[profile]
714
    except KeyError:
715
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
716

    
717
    log.debug("Setting tag of VM %s to %s", vm, profile)
718

    
719
    with pooled_rapi_client(vm) as client:
720
        # Delete all firewall tags
721
        for t in _firewall_tags.values():
722
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
723
                                      dry_run=settings.TEST)
724

    
725
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
726

    
727
        # XXX NOP ModifyInstance call to force process_net_status to run
728
        # on the dispatcher
729
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
730
        client.ModifyInstance(vm.backend_vm_id,
731
                              os_name=os_name)
732
    return None
733

    
734

    
735
def get_instances(backend, bulk=True):
736
    with pooled_rapi_client(backend) as c:
737
        return c.GetInstances(bulk=bulk)
738

    
739

    
740
def get_nodes(backend, bulk=True):
741
    with pooled_rapi_client(backend) as c:
742
        return c.GetNodes(bulk=bulk)
743

    
744

    
745
def get_jobs(backend):
746
    with pooled_rapi_client(backend) as c:
747
        return c.GetJobs()
748

    
749

    
750
def get_physical_resources(backend):
751
    """ Get the physical resources of a backend.
752

753
    Get the resources of a backend as reported by the backend (not the db).
754

755
    """
756
    nodes = get_nodes(backend, bulk=True)
757
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
758
    res = {}
759
    for a in attr:
760
        res[a] = 0
761
    for n in nodes:
762
        # Filter out drained, offline and not vm_capable nodes since they will
763
        # not take part in the vm allocation process
764
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
765
        if can_host_vms and n['cnodes']:
766
            for a in attr:
767
                res[a] += int(n[a])
768
    return res
769

    
770

    
771
def update_resources(backend, resources=None):
772
    """ Update the state of the backend resources in db.
773

774
    """
775

    
776
    if not resources:
777
        resources = get_physical_resources(backend)
778

    
779
    backend.mfree = resources['mfree']
780
    backend.mtotal = resources['mtotal']
781
    backend.dfree = resources['dfree']
782
    backend.dtotal = resources['dtotal']
783
    backend.pinst_cnt = resources['pinst_cnt']
784
    backend.ctotal = resources['ctotal']
785
    backend.updated = datetime.now()
786
    backend.save()
787

    
788

    
789
def get_memory_from_instances(backend):
790
    """ Get the memory that is used from instances.
791

792
    Get the used memory of a backend. Note: This is different for
793
    the real memory used, due to kvm's memory de-duplication.
794

795
    """
796
    with pooled_rapi_client(backend) as client:
797
        instances = client.GetInstances(bulk=True)
798
    mem = 0
799
    for i in instances:
800
        mem += i['oper_ram']
801
    return mem
802

    
803
##
804
## Synchronized operations for reconciliation
805
##
806

    
807

    
808
def create_network_synced(network, backend):
809
    result = _create_network_synced(network, backend)
810
    if result[0] != 'success':
811
        return result
812
    result = connect_network_synced(network, backend)
813
    return result
814

    
815

    
816
def _create_network_synced(network, backend):
817
    with pooled_rapi_client(backend) as client:
818
        job = _create_network(network, backend)
819
        result = wait_for_job(client, job)
820
    return result
821

    
822

    
823
def connect_network_synced(network, backend):
824
    with pooled_rapi_client(backend) as client:
825
        for group in client.GetGroups():
826
            job = client.ConnectNetwork(network.backend_id, group,
827
                                        network.mode, network.link)
828
            result = wait_for_job(client, job)
829
            if result[0] != 'success':
830
                return result
831

    
832
    return result
833

    
834

    
835
def wait_for_job(client, jobid):
836
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
837
    status = result['job_info'][0]
838
    while status not in ['success', 'error', 'cancel']:
839
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
840
                                         [result], None)
841
        status = result['job_info'][0]
842

    
843
    if status == 'success':
844
        return (status, None)
845
    else:
846
        error = result['job_info'][1]
847
        return (status, error)