Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 527c7a7b

History | View | Annotate | Download (32 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
60
    """Handle quotas for updated VirtualMachine.
61

62
    Update quotas for the updated VirtualMachine based on the job that run on
63
    the Ganeti backend. If a commission has been already issued for this job,
64
    then this commission is just accepted or rejected based on the job status.
65
    Otherwise, a new commission for the given change is issued, that is also in
66
    force and auto-accept mode. In this case, previous commissions are
67
    rejected, since they reflect a previous state of the VM.
68

69
    """
70
    if job_status not in ["success", "error", "canceled"]:
71
        return
72

    
73
    # Check successful completion of a job will trigger any quotable change in
74
    # the VM state.
75
    action = utils.get_action_from_opcode(job_opcode, job_fields)
76
    commission_info = quotas.get_commission_info(vm, action=action,
77
                                                 action_fields=job_fields)
78

    
79
    if vm.task_job_id == job_id and vm.serial is not None:
80
        # Commission for this change has already been issued. So just
81
        # accept/reject it
82
        serial = vm.serial
83
        if job_status == "success":
84
            quotas.accept_serial(serial)
85
        elif job_status in ["error", "canceled"]:
86
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
87
                      serial)
88
            quotas.reject_serial(serial)
89
        vm.serial = None
90
    elif job_status == "success" and commission_info is not None:
91
        log.debug("Expected job was %s. Processing job %s. Commission for"
92
                  " this job: %s", vm.task_job_id, job_id, commission_info)
93
        # Commission for this change has not been issued, or the issued
94
        # commission was unaware of the current change. Reject all previous
95
        # commissions and create a new one in forced mode!
96
        previous_serial = vm.serial
97
        if previous_serial and not previous_serial.resolved:
98
            quotas.resolve_vm_commission(previous_serial)
99
        serial = quotas.issue_commission(user=vm.userid,
100
                                         source=quotas.DEFAULT_SOURCE,
101
                                         provisions=commission_info,
102
                                         force=True,
103
                                         auto_accept=True)
104
        # Clear VM's serial. Expected job may arrive later. However correlated
105
        # serial must not be accepted, since it reflects a previous VM state
106
        vm.serial = None
107

    
108
    return vm
109

    
110

    
111
@transaction.commit_on_success
112
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
113
                      beparams=None):
114
    """Process a job progress notification from the backend
115

116
    Process an incoming message from the backend (currently Ganeti).
117
    Job notifications with a terminating status (sucess, error, or canceled),
118
    also update the operating state of the VM.
119

120
    """
121
    # See #1492, #1031, #1111 why this line has been removed
122
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
123
    if status not in [x[0] for x in BACKEND_STATUSES]:
124
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
125

    
126
    vm.backendjobid = jobid
127
    vm.backendjobstatus = status
128
    vm.backendopcode = opcode
129
    vm.backendlogmsg = logmsg
130

    
131
    if status in ["queued", "waiting", "running"]:
132
        vm.save()
133
        return
134

    
135
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
136
    # Notifications of success change the operating state
137
    if status == "success":
138
        if state_for_success is not None:
139
            vm.operstate = state_for_success
140
        if nics is not None:
141
            # Update the NICs of the VM
142
            _process_net_status(vm, etime, nics)
143
        if beparams:
144
            # Change the flavor of the VM
145
            _process_resize(vm, beparams)
146
        # Update backendtime only for jobs that have been successfully
147
        # completed, since only these jobs update the state of the VM. Else a
148
        # "race condition" may occur when a successful job (e.g.
149
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
150
        # in reversed order.
151
        vm.backendtime = etime
152

    
153
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
154
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
155
        vm.operstate = 'ERROR'
156
        vm.backendtime = etime
157
    elif opcode == 'OP_INSTANCE_REMOVE':
158
        # Set the deleted flag explicitly, cater for admin-initiated removals
159
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
160
        # when no instance exists at the Ganeti backend.
161
        # See ticket #799 for all the details.
162
        if status == 'success' or (status == 'error' and
163
                                   vm.operstate == 'ERROR'):
164
            # VM has been deleted. Release the instance IPs
165
            release_instance_ips(vm, [])
166
            # And delete the releated NICs (must be performed after release!)
167
            vm.nics.all().delete()
168
            vm.deleted = True
169
            vm.operstate = state_for_success
170
            vm.backendtime = etime
171
            status = "success"
172

    
173
    if status in ["success", "error", "canceled"]:
174
        # Job is finalized: Handle quotas/commissioning
175
        job_fields = {"nics": nics, "beparams": beparams}
176
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
177
                              job_status=status, job_fields=job_fields)
178
        # and clear task fields
179
        if vm.task_job_id == jobid:
180
            vm.task = None
181
            vm.task_job_id = None
182

    
183
    vm.save()
184

    
185

    
186
def _process_resize(vm, beparams):
187
    """Change flavor of a VirtualMachine based on new beparams."""
188
    old_flavor = vm.flavor
189
    vcpus = beparams.get("vcpus", old_flavor.cpu)
190
    ram = beparams.get("maxmem", old_flavor.ram)
191
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
192
        return
193
    try:
194
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
195
                                        disk=old_flavor.disk,
196
                                        disk_template=old_flavor.disk_template)
197
    except Flavor.DoesNotExist:
198
        raise Exception("Can not find flavor for VM")
199
    vm.flavor = new_flavor
200
    vm.save()
201

    
202

    
203
@transaction.commit_on_success
204
def process_net_status(vm, etime, nics):
205
    """Wrap _process_net_status inside transaction."""
206
    _process_net_status(vm, etime, nics)
207

    
208

    
209
def _process_net_status(vm, etime, nics):
210
    """Process a net status notification from the backend
211

212
    Process an incoming message from the Ganeti backend,
213
    detailing the NIC configuration of a VM instance.
214

215
    Update the state of the VM in the DB accordingly.
216
    """
217

    
218
    ganeti_nics = process_ganeti_nics(nics)
219
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
220
        log.debug("NICs for VM %s have not changed", vm)
221
        return
222

    
223
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
224
    # guarantee that no deadlock will occur with Backend allocator.
225
    Backend.objects.select_for_update().get(id=vm.backend_id)
226

    
227
    # NICs have changed. Release the instance IPs
228
    release_instance_ips(vm, ganeti_nics)
229
    # And delete the releated NICs (must be performed after release!)
230
    vm.nics.all().delete()
231

    
232
    for nic in ganeti_nics:
233
        ipv4 = nic.get('ipv4', '')
234
        net = nic['network']
235
        if ipv4:
236
            net.reserve_address(ipv4)
237

    
238
        nic['dirty'] = False
239
        vm.nics.create(**nic)
240
        # Dummy save the network, because UI uses changed-since for VMs
241
        # and Networks in order to show the VM NICs
242
        net.save()
243

    
244
    vm.backendtime = etime
245
    vm.save()
246

    
247

    
248
def process_ganeti_nics(ganeti_nics):
249
    """Process NIC dict from ganeti hooks."""
250
    new_nics = []
251
    for i, new_nic in enumerate(ganeti_nics):
252
        network = new_nic.get('network', '')
253
        n = str(network)
254
        pk = utils.id_from_network_name(n)
255

    
256
        net = Network.objects.get(pk=pk)
257

    
258
        # Get the new nic info
259
        mac = new_nic.get('mac', '')
260
        ipv4 = new_nic.get('ip', '')
261
        if net.subnet6:
262
            ipv6 = mac2eui64(mac, net.subnet6)
263
        else:
264
            ipv6 = ''
265

    
266
        firewall = new_nic.get('firewall', '')
267
        firewall_profile = _reverse_tags.get(firewall, '')
268
        if not firewall_profile and net.public:
269
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
270

    
271
        nic = {
272
            'index': i,
273
            'network': net,
274
            'mac': mac,
275
            'ipv4': ipv4,
276
            'ipv6': ipv6,
277
            'firewall_profile': firewall_profile,
278
            'state': 'ACTIVE'}
279

    
280
        new_nics.append(nic)
281
    return new_nics
282

    
283

    
284
def nics_changed(old_nics, new_nics):
285
    """Return True if NICs have changed in any way."""
286
    if len(old_nics) != len(new_nics):
287
        return True
288
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
289
    for old_nic, new_nic in zip(old_nics, new_nics):
290
        for field in fields:
291
            if getattr(old_nic, field) != new_nic[field]:
292
                return True
293
    return False
294

    
295

    
296
def release_instance_ips(vm, ganeti_nics):
297
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
298
    new_addresses = set(map(lambda nic: (nic["network"].id, nic["ipv4"]),
299
                            ganeti_nics))
300
    to_release = old_addresses - new_addresses
301
    for (network_id, ipv4) in to_release:
302
        if ipv4:
303
            # Get X-Lock before searching floating IP, to exclusively search
304
            # and release floating IP. Otherwise you may release a floating IP
305
            # that has been just reserved.
306
            net = Network.objects.select_for_update().get(id=network_id)
307
            if net.floating_ip_pool:
308
                try:
309
                    floating_ip = net.floating_ips.select_for_update()\
310
                                                  .get(ipv4=ipv4, machine=vm,
311
                                                       deleted=False)
312
                    floating_ip.machine = None
313
                    floating_ip.save()
314
                except FloatingIP.DoesNotExist:
315
                    net.release_address(ipv4)
316
            else:
317
                net.release_address(ipv4)
318

    
319

    
320
@transaction.commit_on_success
321
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
322
    if status not in [x[0] for x in BACKEND_STATUSES]:
323
        raise Network.InvalidBackendMsgError(opcode, status)
324

    
325
    back_network.backendjobid = jobid
326
    back_network.backendjobstatus = status
327
    back_network.backendopcode = opcode
328
    back_network.backendlogmsg = logmsg
329

    
330
    # Notifications of success change the operating state
331
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
332
    if status == 'success' and state_for_success is not None:
333
        back_network.operstate = state_for_success
334

    
335
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
336
        back_network.operstate = 'ERROR'
337
        back_network.backendtime = etime
338

    
339
    if opcode == 'OP_NETWORK_REMOVE':
340
        if status == 'success' or (status == 'error' and
341
                                   back_network.operstate == 'ERROR'):
342
            back_network.operstate = state_for_success
343
            back_network.deleted = True
344
            back_network.backendtime = etime
345

    
346
    if status == 'success':
347
        back_network.backendtime = etime
348
    back_network.save()
349
    # Also you must update the state of the Network!!
350
    update_network_state(back_network.network)
351

    
352

    
353
def update_network_state(network):
354
    """Update the state of a Network based on BackendNetwork states.
355

356
    Update the state of a Network based on the operstate of the networks in the
357
    backends that network exists.
358

359
    The state of the network is:
360
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
361
    * DELETED: If it is is 'DELETED' in all backends that have been created.
362

363
    This function also releases the resources (MAC prefix or Bridge) and the
364
    quotas for the network.
365

366
    """
367
    if network.deleted:
368
        # Network has already been deleted. Just assert that state is also
369
        # DELETED
370
        if not network.state == "DELETED":
371
            network.state = "DELETED"
372
            network.save()
373
        return
374

    
375
    backend_states = [s.operstate for s in network.backend_networks.all()]
376
    if not backend_states and network.action != "DESTROY":
377
        if network.state != "ACTIVE":
378
            network.state = "ACTIVE"
379
            network.save()
380
            return
381

    
382
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
383
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
384
                     "DELETED")
385

    
386
    # Release the resources on the deletion of the Network
387
    if deleted:
388
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
389
                 network.id, network.mac_prefix, network.link)
390
        network.deleted = True
391
        network.state = "DELETED"
392
        if network.mac_prefix:
393
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
394
                release_resource(res_type="mac_prefix",
395
                                 value=network.mac_prefix)
396
        if network.link:
397
            if network.FLAVORS[network.flavor]["link"] == "pool":
398
                release_resource(res_type="bridge", value=network.link)
399

    
400
        # Issue commission
401
        if network.userid:
402
            quotas.issue_and_accept_commission(network, delete=True)
403
        elif not network.public:
404
            log.warning("Network %s does not have an owner!", network.id)
405
    network.save()
406

    
407

    
408
@transaction.commit_on_success
409
def process_network_modify(back_network, etime, jobid, opcode, status,
410
                           add_reserved_ips, remove_reserved_ips):
411
    assert (opcode == "OP_NETWORK_SET_PARAMS")
412
    if status not in [x[0] for x in BACKEND_STATUSES]:
413
        raise Network.InvalidBackendMsgError(opcode, status)
414

    
415
    back_network.backendjobid = jobid
416
    back_network.backendjobstatus = status
417
    back_network.opcode = opcode
418

    
419
    if add_reserved_ips or remove_reserved_ips:
420
        net = back_network.network
421
        pool = net.get_pool()
422
        if add_reserved_ips:
423
            for ip in add_reserved_ips:
424
                pool.reserve(ip, external=True)
425
        if remove_reserved_ips:
426
            for ip in remove_reserved_ips:
427
                pool.put(ip, external=True)
428
        pool.save()
429

    
430
    if status == 'success':
431
        back_network.backendtime = etime
432
    back_network.save()
433

    
434

    
435
@transaction.commit_on_success
436
def process_create_progress(vm, etime, progress):
437

    
438
    percentage = int(progress)
439

    
440
    # The percentage may exceed 100%, due to the way
441
    # snf-image:copy-progress tracks bytes read by image handling processes
442
    percentage = 100 if percentage > 100 else percentage
443
    if percentage < 0:
444
        raise ValueError("Percentage cannot be negative")
445

    
446
    # FIXME: log a warning here, see #1033
447
#   if last_update > percentage:
448
#       raise ValueError("Build percentage should increase monotonically " \
449
#                        "(old = %d, new = %d)" % (last_update, percentage))
450

    
451
    # This assumes that no message of type 'ganeti-create-progress' is going to
452
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
453
    # the instance is STARTED.  What if the two messages are processed by two
454
    # separate dispatcher threads, and the 'ganeti-op-status' message for
455
    # successful creation gets processed before the 'ganeti-create-progress'
456
    # message? [vkoukis]
457
    #
458
    #if not vm.operstate == 'BUILD':
459
    #    raise VirtualMachine.IllegalState("VM is not in building state")
460

    
461
    vm.buildpercentage = percentage
462
    vm.backendtime = etime
463
    vm.save()
464

    
465

    
466
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
467
                               details=None):
468
    """
469
    Create virtual machine instance diagnostic entry.
470

471
    :param vm: VirtualMachine instance to create diagnostic for.
472
    :param message: Diagnostic message.
473
    :param source: Diagnostic source identifier (e.g. image-helper).
474
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
475
    :param etime: The time the message occured (if available).
476
    :param details: Additional details or debug information.
477
    """
478
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
479
                                                   source_date=etime,
480
                                                   message=message,
481
                                                   details=details)
482

    
483

    
484
def create_instance(vm, nics, flavor, image):
485
    """`image` is a dictionary which should contain the keys:
486
            'backend_id', 'format' and 'metadata'
487

488
        metadata value should be a dictionary.
489
    """
490

    
491
    # Handle arguments to CreateInstance() as a dictionary,
492
    # initialize it based on a deployment-specific value.
493
    # This enables the administrator to override deployment-specific
494
    # arguments, such as the disk template to use, name of os provider
495
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
496
    #
497
    kw = vm.backend.get_create_params()
498
    kw['mode'] = 'create'
499
    kw['name'] = vm.backend_vm_id
500
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
501

    
502
    kw['disk_template'] = flavor.disk_template
503
    kw['disks'] = [{"size": flavor.disk * 1024}]
504
    provider = flavor.disk_provider
505
    if provider:
506
        kw['disks'][0]['provider'] = provider
507
        kw['disks'][0]['origin'] = flavor.disk_origin
508

    
509
    kw['nics'] = [{"network": nic.network.backend_id, "ip": nic.ipv4}
510
                  for nic in nics]
511
    backend = vm.backend
512
    depend_jobs = []
513
    for nic in nics:
514
        network = Network.objects.select_for_update().get(id=nic.network.id)
515
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
516
                                                             network=network)
517
        if bnet.operstate != "ACTIVE":
518
            if network.public:
519
                msg = "Can not connect instance to network %s. Network is not"\
520
                      " ACTIVE in backend %s." % (network, backend)
521
                raise Exception(msg)
522
            else:
523
                jobs = create_network(network, backend, connect=True)
524
                if isinstance(jobs, list):
525
                    depend_jobs.extend(jobs)
526
                else:
527
                    depend_jobs.append(jobs)
528
    kw["depends"] = [[job, ["success", "error", "canceled"]]
529
                     for job in depend_jobs]
530

    
531
    if vm.backend.use_hotplug():
532
        kw['hotplug'] = True
533
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
534
    # kw['os'] = settings.GANETI_OS_PROVIDER
535
    kw['ip_check'] = False
536
    kw['name_check'] = False
537

    
538
    # Do not specific a node explicitly, have
539
    # Ganeti use an iallocator instead
540
    #kw['pnode'] = rapi.GetNodes()[0]
541

    
542
    kw['dry_run'] = settings.TEST
543

    
544
    kw['beparams'] = {
545
        'auto_balance': True,
546
        'vcpus': flavor.cpu,
547
        'memory': flavor.ram}
548

    
549
    kw['osparams'] = {
550
        'config_url': vm.config_url,
551
        # Store image id and format to Ganeti
552
        'img_id': image['backend_id'],
553
        'img_format': image['format']}
554

    
555
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
556
    # kw['hvparams'] = dict(serial_console=False)
557

    
558
    log.debug("Creating instance %s", utils.hide_pass(kw))
559
    with pooled_rapi_client(vm) as client:
560
        return client.CreateInstance(**kw)
561

    
562

    
563
def delete_instance(vm):
564
    with pooled_rapi_client(vm) as client:
565
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
566

    
567

    
568
def reboot_instance(vm, reboot_type):
569
    assert reboot_type in ('soft', 'hard')
570
    with pooled_rapi_client(vm) as client:
571
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
572
                                     dry_run=settings.TEST)
573

    
574

    
575
def startup_instance(vm):
576
    with pooled_rapi_client(vm) as client:
577
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
578

    
579

    
580
def shutdown_instance(vm):
581
    with pooled_rapi_client(vm) as client:
582
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
583

    
584

    
585
def resize_instance(vm, vcpus, memory):
586
    beparams = {"vcpus": int(vcpus),
587
                "minmem": int(memory),
588
                "maxmem": int(memory)}
589
    with pooled_rapi_client(vm) as client:
590
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
591

    
592

    
593
def get_instance_console(vm):
594
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
595
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
596
    # useless (see #783).
597
    #
598
    # Until this is fixed on the Ganeti side, construct a console info reply
599
    # directly.
600
    #
601
    # WARNING: This assumes that VNC runs on port network_port on
602
    #          the instance's primary node, and is probably
603
    #          hypervisor-specific.
604
    #
605
    log.debug("Getting console for vm %s", vm)
606

    
607
    console = {}
608
    console['kind'] = 'vnc'
609

    
610
    with pooled_rapi_client(vm) as client:
611
        i = client.GetInstance(vm.backend_vm_id)
612

    
613
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
614
        raise Exception("hv parameter serial_console cannot be true")
615
    console['host'] = i['pnode']
616
    console['port'] = i['network_port']
617

    
618
    return console
619

    
620

    
621
def get_instance_info(vm):
622
    with pooled_rapi_client(vm) as client:
623
        return client.GetInstanceInfo(vm.backend_vm_id)
624

    
625

    
626
def create_network(network, backend, connect=True):
627
    """Create a network in a Ganeti backend"""
628
    log.debug("Creating network %s in backend %s", network, backend)
629

    
630
    job_id = _create_network(network, backend)
631

    
632
    if connect:
633
        job_ids = connect_network(network, backend, depends=[job_id])
634
        return job_ids
635
    else:
636
        return [job_id]
637

    
638

    
639
def _create_network(network, backend):
640
    """Create a network."""
641

    
642
    network_type = network.public and 'public' or 'private'
643

    
644
    tags = network.backend_tag
645
    if network.dhcp:
646
        tags.append('nfdhcpd')
647

    
648
    if network.public:
649
        conflicts_check = True
650
    else:
651
        conflicts_check = False
652

    
653
    try:
654
        bn = BackendNetwork.objects.get(network=network, backend=backend)
655
        mac_prefix = bn.mac_prefix
656
    except BackendNetwork.DoesNotExist:
657
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
658
                        " does not exist" % (network.id, backend.id))
659

    
660
    with pooled_rapi_client(backend) as client:
661
        return client.CreateNetwork(network_name=network.backend_id,
662
                                    network=network.subnet,
663
                                    network6=network.subnet6,
664
                                    gateway=network.gateway,
665
                                    gateway6=network.gateway6,
666
                                    network_type=network_type,
667
                                    mac_prefix=mac_prefix,
668
                                    conflicts_check=conflicts_check,
669
                                    tags=tags)
670

    
671

    
672
def connect_network(network, backend, depends=[], group=None):
673
    """Connect a network to nodegroups."""
674
    log.debug("Connecting network %s to backend %s", network, backend)
675

    
676
    if network.public:
677
        conflicts_check = True
678
    else:
679
        conflicts_check = False
680

    
681
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
682
    with pooled_rapi_client(backend) as client:
683
        groups = [group] if group is not None else client.GetGroups()
684
        job_ids = []
685
        for group in groups:
686
            job_id = client.ConnectNetwork(network.backend_id, group,
687
                                           network.mode, network.link,
688
                                           conflicts_check,
689
                                           depends=depends)
690
            job_ids.append(job_id)
691
    return job_ids
692

    
693

    
694
def delete_network(network, backend, disconnect=True):
695
    log.debug("Deleting network %s from backend %s", network, backend)
696

    
697
    depends = []
698
    if disconnect:
699
        depends = disconnect_network(network, backend)
700
    _delete_network(network, backend, depends=depends)
701

    
702

    
703
def _delete_network(network, backend, depends=[]):
704
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
705
    with pooled_rapi_client(backend) as client:
706
        return client.DeleteNetwork(network.backend_id, depends)
707

    
708

    
709
def disconnect_network(network, backend, group=None):
710
    log.debug("Disconnecting network %s to backend %s", network, backend)
711

    
712
    with pooled_rapi_client(backend) as client:
713
        groups = [group] if group is not None else client.GetGroups()
714
        job_ids = []
715
        for group in groups:
716
            job_id = client.DisconnectNetwork(network.backend_id, group)
717
            job_ids.append(job_id)
718
    return job_ids
719

    
720

    
721
def connect_to_network(vm, network, address=None):
722
    backend = vm.backend
723
    network = Network.objects.select_for_update().get(id=network.id)
724
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
725
                                                         network=network)
726
    depend_jobs = []
727
    if bnet.operstate != "ACTIVE":
728
        depend_jobs = create_network(network, backend, connect=True)
729

    
730
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
731

    
732
    nic = {'ip': address, 'network': network.backend_id}
733

    
734
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
735

    
736
    with pooled_rapi_client(vm) as client:
737
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
738
                                     hotplug=vm.backend.use_hotplug(),
739
                                     depends=depends,
740
                                     dry_run=settings.TEST)
741

    
742

    
743
def disconnect_from_network(vm, nic):
744
    op = [('remove', nic.index, {})]
745

    
746
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
747

    
748
    with pooled_rapi_client(vm) as client:
749
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
750
                                     hotplug=vm.backend.use_hotplug(),
751
                                     dry_run=settings.TEST)
752

    
753

    
754
def set_firewall_profile(vm, profile):
755
    try:
756
        tag = _firewall_tags[profile]
757
    except KeyError:
758
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
759

    
760
    log.debug("Setting tag of VM %s to %s", vm, profile)
761

    
762
    with pooled_rapi_client(vm) as client:
763
        # Delete all firewall tags
764
        for t in _firewall_tags.values():
765
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
766
                                      dry_run=settings.TEST)
767

    
768
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
769

    
770
        # XXX NOP ModifyInstance call to force process_net_status to run
771
        # on the dispatcher
772
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
773
        client.ModifyInstance(vm.backend_vm_id,
774
                              os_name=os_name)
775
    return None
776

    
777

    
778
def get_instances(backend, bulk=True):
779
    with pooled_rapi_client(backend) as c:
780
        return c.GetInstances(bulk=bulk)
781

    
782

    
783
def get_nodes(backend, bulk=True):
784
    with pooled_rapi_client(backend) as c:
785
        return c.GetNodes(bulk=bulk)
786

    
787

    
788
def get_jobs(backend):
789
    with pooled_rapi_client(backend) as c:
790
        return c.GetJobs()
791

    
792

    
793
def get_physical_resources(backend):
794
    """ Get the physical resources of a backend.
795

796
    Get the resources of a backend as reported by the backend (not the db).
797

798
    """
799
    nodes = get_nodes(backend, bulk=True)
800
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
801
    res = {}
802
    for a in attr:
803
        res[a] = 0
804
    for n in nodes:
805
        # Filter out drained, offline and not vm_capable nodes since they will
806
        # not take part in the vm allocation process
807
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
808
        if can_host_vms and n['cnodes']:
809
            for a in attr:
810
                res[a] += int(n[a])
811
    return res
812

    
813

    
814
def update_resources(backend, resources=None):
815
    """ Update the state of the backend resources in db.
816

817
    """
818

    
819
    if not resources:
820
        resources = get_physical_resources(backend)
821

    
822
    backend.mfree = resources['mfree']
823
    backend.mtotal = resources['mtotal']
824
    backend.dfree = resources['dfree']
825
    backend.dtotal = resources['dtotal']
826
    backend.pinst_cnt = resources['pinst_cnt']
827
    backend.ctotal = resources['ctotal']
828
    backend.updated = datetime.now()
829
    backend.save()
830

    
831

    
832
def get_memory_from_instances(backend):
833
    """ Get the memory that is used from instances.
834

835
    Get the used memory of a backend. Note: This is different for
836
    the real memory used, due to kvm's memory de-duplication.
837

838
    """
839
    with pooled_rapi_client(backend) as client:
840
        instances = client.GetInstances(bulk=True)
841
    mem = 0
842
    for i in instances:
843
        mem += i['oper_ram']
844
    return mem
845

    
846
##
847
## Synchronized operations for reconciliation
848
##
849

    
850

    
851
def create_network_synced(network, backend):
852
    result = _create_network_synced(network, backend)
853
    if result[0] != 'success':
854
        return result
855
    result = connect_network_synced(network, backend)
856
    return result
857

    
858

    
859
def _create_network_synced(network, backend):
860
    with pooled_rapi_client(backend) as client:
861
        job = _create_network(network, backend)
862
        result = wait_for_job(client, job)
863
    return result
864

    
865

    
866
def connect_network_synced(network, backend):
867
    with pooled_rapi_client(backend) as client:
868
        for group in client.GetGroups():
869
            job = client.ConnectNetwork(network.backend_id, group,
870
                                        network.mode, network.link)
871
            result = wait_for_job(client, job)
872
            if result[0] != 'success':
873
                return result
874

    
875
    return result
876

    
877

    
878
def wait_for_job(client, jobid):
879
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
880
    status = result['job_info'][0]
881
    while status not in ['success', 'error', 'cancel']:
882
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
883
                                         [result], None)
884
        status = result['job_info'][0]
885

    
886
    if status == 'success':
887
        return (status, None)
888
    else:
889
        error = result['job_info'][1]
890
        return (status, error)