Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 3524241a

History | View | Annotate | Download (21.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from django.conf import settings
37
from django.db import transaction
38
from datetime import datetime
39

    
40
from synnefo.db.models import (Backend, VirtualMachine, Network,
41
                               BackendNetwork, BACKEND_STATUSES,
42
                               pooled_rapi_client)
43
from synnefo.logic import utils
44

    
45
from logging import getLogger
46
log = getLogger(__name__)
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
@transaction.commit_on_success
58
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
59
    """Process a job progress notification from the backend
60

61
    Process an incoming message from the backend (currently Ganeti).
62
    Job notifications with a terminating status (sucess, error, or canceled),
63
    also update the operating state of the VM.
64

65
    """
66
    # See #1492, #1031, #1111 why this line has been removed
67
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
68
    if status not in [x[0] for x in BACKEND_STATUSES]:
69
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
70

    
71
    vm.backendjobid = jobid
72
    vm.backendjobstatus = status
73
    vm.backendopcode = opcode
74
    vm.backendlogmsg = logmsg
75

    
76
    # Notifications of success change the operating state
77
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
78
    if status == 'success' and state_for_success is not None:
79
        utils.update_state(vm, state_for_success)
80
        # Set the deleted flag explicitly, cater for admin-initiated removals
81
        if opcode == 'OP_INSTANCE_REMOVE':
82
            release_instance_nics(vm)
83
            vm.deleted = True
84
            vm.nics.all().delete()
85

    
86
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
87
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
88
        utils.update_state(vm, 'ERROR')
89

    
90
    # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
91
    # when no instance exists at the Ganeti backend.
92
    # See ticket #799 for all the details.
93
    #
94
    if (status == 'error' and opcode == 'OP_INSTANCE_REMOVE'):
95
        vm.deleted = True
96
        vm.nics.all().delete()
97

    
98
    vm.backendtime = etime
99
    # Any other notification of failure leaves the operating state unchanged
100

    
101
    vm.save()
102

    
103

    
104
@transaction.commit_on_success
105
def process_net_status(vm, etime, nics):
106
    """Process a net status notification from the backend
107

108
    Process an incoming message from the Ganeti backend,
109
    detailing the NIC configuration of a VM instance.
110

111
    Update the state of the VM in the DB accordingly.
112
    """
113

    
114
    release_instance_nics(vm)
115

    
116
    new_nics = enumerate(nics)
117
    for i, new_nic in new_nics:
118
        network = new_nic.get('network', '')
119
        n = str(network)
120
        pk = utils.id_from_network_name(n)
121

    
122
        net = Network.objects.get(pk=pk)
123

    
124
        # Get the new nic info
125
        mac = new_nic.get('mac', '')
126
        ipv4 = new_nic.get('ip', '')
127
        ipv6 = new_nic.get('ipv6', '')
128

    
129
        firewall = new_nic.get('firewall', '')
130
        firewall_profile = _reverse_tags.get(firewall, '')
131
        if not firewall_profile and net.public:
132
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
133

    
134
        if ipv4:
135
            net.reserve_address(ipv4)
136

    
137
        vm.nics.create(
138
            network=net,
139
            index=i,
140
            mac=mac,
141
            ipv4=ipv4,
142
            ipv6=ipv6,
143
            firewall_profile=firewall_profile,
144
            dirty=False)
145

    
146
    vm.backendtime = etime
147
    vm.save()
148

    
149

    
150
def release_instance_nics(vm):
151
    for nic in vm.nics.all():
152
        nic.network.release_address(nic.ipv4)
153
        nic.delete()
154

    
155

    
156
@transaction.commit_on_success
157
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
158
    if status not in [x[0] for x in BACKEND_STATUSES]:
159
        return
160
        #raise Network.InvalidBackendMsgError(opcode, status)
161

    
162
    back_network.backendjobid = jobid
163
    back_network.backendjobstatus = status
164
    back_network.backendopcode = opcode
165
    back_network.backendlogmsg = logmsg
166

    
167
    # Notifications of success change the operating state
168
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
169
    if status == 'success' and state_for_success is not None:
170
        back_network.operstate = state_for_success
171
        if opcode == 'OP_NETWORK_REMOVE':
172
            back_network.deleted = True
173

    
174
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
175
        utils.update_state(back_network, 'ERROR')
176

    
177
    if (status == 'error' and opcode == 'OP_NETWORK_REMOVE'):
178
        back_network.deleted = True
179
        back_network.operstate = 'DELETED'
180

    
181
    back_network.save()
182

    
183

    
184
@transaction.commit_on_success
185
def process_create_progress(vm, etime, rprogress, wprogress):
186

    
187
    # XXX: This only uses the read progress for now.
188
    #      Explore whether it would make sense to use the value of wprogress
189
    #      somewhere.
190
    percentage = int(rprogress)
191

    
192
    # The percentage may exceed 100%, due to the way
193
    # snf-progress-monitor tracks bytes read by image handling processes
194
    percentage = 100 if percentage > 100 else percentage
195
    if percentage < 0:
196
        raise ValueError("Percentage cannot be negative")
197

    
198
    # FIXME: log a warning here, see #1033
199
#   if last_update > percentage:
200
#       raise ValueError("Build percentage should increase monotonically " \
201
#                        "(old = %d, new = %d)" % (last_update, percentage))
202

    
203
    # This assumes that no message of type 'ganeti-create-progress' is going to
204
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
205
    # the instance is STARTED.  What if the two messages are processed by two
206
    # separate dispatcher threads, and the 'ganeti-op-status' message for
207
    # successful creation gets processed before the 'ganeti-create-progress'
208
    # message? [vkoukis]
209
    #
210
    #if not vm.operstate == 'BUILD':
211
    #    raise VirtualMachine.IllegalState("VM is not in building state")
212

    
213
    vm.buildpercentage = percentage
214
    vm.backendtime = etime
215
    vm.save()
216

    
217

    
218
def start_action(vm, action):
219
    """Update the state of a VM when a new action is initiated."""
220
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
221
        raise VirtualMachine.InvalidActionError(action)
222

    
223
    # No actions to deleted and no actions beside destroy to suspended VMs
224
    if vm.deleted:
225
        raise VirtualMachine.DeletedError
226

    
227
    # No actions to machines being built. They may be destroyed, however.
228
    if vm.operstate == 'BUILD' and action != 'DESTROY':
229
        raise VirtualMachine.BuildingError
230

    
231
    vm.action = action
232
    vm.backendjobid = None
233
    vm.backendopcode = None
234
    vm.backendjobstatus = None
235
    vm.backendlogmsg = None
236

    
237
    # Update the relevant flags if the VM is being suspended or destroyed.
238
    # Do not set the deleted flag here, see ticket #721.
239
    #
240
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
241
    # completes successfully. Hence, a server may be visible for some time
242
    # after a DELETE /servers/id returns HTTP 204.
243
    #
244
    if action == "DESTROY":
245
        # vm.deleted = True
246
        pass
247
    elif action == "SUSPEND":
248
        vm.suspended = True
249
    elif action == "START":
250
        vm.suspended = False
251
    vm.save()
252

    
253

    
254
def create_instance(vm, public_nic, flavor, image, password, personality):
255
    """`image` is a dictionary which should contain the keys:
256
            'backend_id', 'format' and 'metadata'
257

258
        metadata value should be a dictionary.
259
    """
260

    
261
    if settings.IGNORE_FLAVOR_DISK_SIZES:
262
        if image['backend_id'].find("windows") >= 0:
263
            sz = 14000
264
        else:
265
            sz = 4000
266
    else:
267
        sz = flavor.disk * 1024
268

    
269
    # Handle arguments to CreateInstance() as a dictionary,
270
    # initialize it based on a deployment-specific value.
271
    # This enables the administrator to override deployment-specific
272
    # arguments, such as the disk template to use, name of os provider
273
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
274
    #
275
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
276
    kw['mode'] = 'create'
277
    kw['name'] = vm.backend_vm_id
278
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
279

    
280
    # Identify if provider parameter should be set in disk options.
281
    # Current implementation support providers only fo ext template.
282
    # To select specific provider for an ext template, template name
283
    # should be formated as `ext_<provider_name>`.
284
    provider = None
285
    disk_template = flavor.disk_template
286
    if flavor.disk_template.startswith("ext"):
287
        disk_template, provider = flavor.disk_template.split("_", 1)
288

    
289
    kw['disk_template'] = disk_template
290
    kw['disks'] = [{"size": sz}]
291
    if provider:
292
        kw['disks'][0]['provider'] = provider
293

    
294
        if provider == 'vlmc':
295
            kw['disks'][0]['origin'] = image['backend_id']
296

    
297
    kw['nics'] = [public_nic]
298
    if settings.GANETI_USE_HOTPLUG:
299
        kw['hotplug'] = True
300
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
301
    # kw['os'] = settings.GANETI_OS_PROVIDER
302
    kw['ip_check'] = False
303
    kw['name_check'] = False
304
    # Do not specific a node explicitly, have
305
    # Ganeti use an iallocator instead
306
    #
307
    # kw['pnode']=rapi.GetNodes()[0]
308
    kw['dry_run'] = settings.TEST
309

    
310
    kw['beparams'] = {
311
        'auto_balance': True,
312
        'vcpus': flavor.cpu,
313
        'memory': flavor.ram}
314

    
315
    kw['osparams'] = {
316
        'img_id': image['backend_id'],
317
        'img_passwd': password,
318
        'img_format': image['format']}
319
    if personality:
320
        kw['osparams']['img_personality'] = json.dumps(personality)
321

    
322
    if provider != None and provider == 'vlmc':
323
        kw['osparams']['img_id'] = 'null'
324

    
325
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
326

    
327
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
328
    # kw['hvparams'] = dict(serial_console=False)
329
    with pooled_rapi_client(vm) as client:
330
        return client.CreateInstance(**kw)
331

    
332

    
333
def delete_instance(vm):
334
    start_action(vm, 'DESTROY')
335
    with pooled_rapi_client(vm) as client:
336
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
337

    
338

    
339
def reboot_instance(vm, reboot_type):
340
    assert reboot_type in ('soft', 'hard')
341
    with pooled_rapi_client(vm) as client:
342
        return client.RebootInstance(vm.backend_vm_id, reboot_type, dry_run=settings.TEST)
343

    
344

    
345
def startup_instance(vm):
346
    start_action(vm, 'START')
347
    with pooled_rapi_client(vm) as client:
348
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
349

    
350

    
351
def shutdown_instance(vm):
352
    start_action(vm, 'STOP')
353
    with pooled_rapi_client(vm) as client:
354
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
355

    
356

    
357
def get_instance_console(vm):
358
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
359
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
360
    # useless (see #783).
361
    #
362
    # Until this is fixed on the Ganeti side, construct a console info reply
363
    # directly.
364
    #
365
    # WARNING: This assumes that VNC runs on port network_port on
366
    #          the instance's primary node, and is probably
367
    #          hypervisor-specific.
368
    #
369
    console = {}
370
    console['kind'] = 'vnc'
371

    
372
    with pooled_rapi_client(vm) as client:
373
        i = client.GetInstance(vm.backend_vm_id)
374

    
375
    if i['hvparams']['serial_console']:
376
        raise Exception("hv parameter serial_console cannot be true")
377
    console['host'] = i['pnode']
378
    console['port'] = i['network_port']
379

    
380
    return console
381

    
382

    
383
def get_instance_info(vm):
384
    with pooled_rapi_client(vm) as client:
385
        return client.GetInstanceInfo(vm.backend_vm_id)
386

    
387

    
388
def create_network(network, backends=None, connect=True):
389
    """Create and connect a network."""
390
    if not backends:
391
        backends = Backend.objects.exclude(offline=True)
392

    
393
    for backend in backends:
394
        create_jobID = _create_network(network, backend)
395
        if connect:
396
            connect_network(network, backend, create_jobID)
397

    
398

    
399
def _create_network(network, backend):
400
    """Create a network."""
401

    
402
    network_type = network.public and 'public' or 'private'
403

    
404
    tags = network.backend_tag
405
    if network.dhcp:
406
        tags.append('nfdhcpd')
407
    tags = ','.join(tags)
408

    
409
    try:
410
        bn = BackendNetwork.objects.get(network=network, backend=backend)
411
        mac_prefix = bn.mac_prefix
412
    except BackendNetwork.DoesNotExist:
413
        raise Exception("BackendNetwork for network '%s' in backend '%s'"\
414
                        " does not exist" % (network.id, backend.id))
415

    
416
    with pooled_rapi_client(backend) as client:
417
        return client.CreateNetwork(network_name=network.backend_id,
418
                                    network=network.subnet,
419
                                    gateway=network.gateway,
420
                                    network_type=network_type,
421
                                    mac_prefix=mac_prefix,
422
                                    tags=tags)
423

    
424

    
425
def connect_network(network, backend, depend_job=None, group=None):
426
    """Connect a network to nodegroups."""
427
    mode = "routed" if "ROUTED" in network.type else "bridged"
428

    
429
    with pooled_rapi_client(backend) as client:
430
        if group:
431
            client.ConnectNetwork(network.backend_id, group, mode,
432
                                  network.link, [depend_job])
433
        else:
434
            for group in client.GetGroups():
435
                client.ConnectNetwork(network.backend_id, group, mode,
436
                                      network.link, [depend_job])
437

    
438

    
439
def delete_network(network, backends=None, disconnect=True):
440
    if not backends:
441
        backends = Backend.objects.exclude(offline=True)
442

    
443
    for backend in backends:
444
        disconnect_jobIDs = []
445
        if disconnect:
446
            disconnect_jobIDs = disconnect_network(network, backend)
447
        _delete_network(network, backend, disconnect_jobIDs)
448

    
449

    
450
def _delete_network(network, backend, depend_jobs=[]):
451
    with pooled_rapi_client(backend) as client:
452
        return client.DeleteNetwork(network.backend_id, depend_jobs)
453

    
454

    
455
def disconnect_network(network, backend, group=None):
456
    with pooled_rapi_client(backend) as client:
457
        if group:
458
            return [client.DisconnectNetwork(network.backend_id, group)]
459
        else:
460
            jobs = []
461
            for group in client.GetGroups():
462
                job = client.DisconnectNetwork(network.backend_id, group)
463
                jobs.append(job)
464
            return jobs
465

    
466

    
467
def connect_to_network(vm, network, address):
468
    nic = {'ip': address, 'network': network.backend_id}
469

    
470
    with pooled_rapi_client(vm) as client:
471
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
472
                                     hotplug=settings.GANETI_USE_HOTPLUG,
473
                                     dry_run=settings.TEST)
474

    
475

    
476
def disconnect_from_network(vm, nic):
477
    op = [('remove', nic.index, {})]
478
    with pooled_rapi_client(vm) as client:
479
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
480
                                     hotplug=settings.GANETI_USE_HOTPLUG,
481
                                     dry_run=settings.TEST)
482

    
483

    
484
def set_firewall_profile(vm, profile):
485
    try:
486
        tag = _firewall_tags[profile]
487
    except KeyError:
488
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
489

    
490
    with pooled_rapi_client(vm) as client:
491
        # Delete all firewall tags
492
        for t in _firewall_tags.values():
493
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
494
                                      dry_run=settings.TEST)
495

    
496
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
497

    
498
        # XXX NOP ModifyInstance call to force process_net_status to run
499
        # on the dispatcher
500
        client.ModifyInstance(vm.backend_vm_id,
501
                         os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
502

    
503

    
504
def get_ganeti_instances(backend=None, bulk=False):
505
    instances = []
506
    for backend in get_backends(backend):
507
        with pooled_rapi_client(backend) as client:
508
            instances.append(client.GetInstances(bulk=bulk))
509

    
510
    return reduce(list.__add__, instances, [])
511

    
512

    
513
def get_ganeti_nodes(backend=None, bulk=False):
514
    nodes = []
515
    for backend in get_backends(backend):
516
        with pooled_rapi_client(backend) as client:
517
            nodes.append(client.GetNodes(bulk=bulk))
518

    
519
    return reduce(list.__add__, nodes, [])
520

    
521

    
522
def get_ganeti_jobs(backend=None, bulk=False):
523
    jobs = []
524
    for backend in get_backends(backend):
525
        with pooled_rapi_client(backend) as client:
526
            jobs.append(client.GetJobs(bulk=bulk))
527
    return reduce(list.__add__, jobs, [])
528

    
529
##
530
##
531
##
532

    
533

    
534
def get_backends(backend=None):
535
    if backend:
536
        return [backend]
537
    return Backend.objects.filter(offline=False)
538

    
539

    
540
def get_physical_resources(backend):
541
    """ Get the physical resources of a backend.
542

543
    Get the resources of a backend as reported by the backend (not the db).
544

545
    """
546
    nodes = get_ganeti_nodes(backend, bulk=True)
547
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
548
    res = {}
549
    for a in attr:
550
        res[a] = 0
551
    for n in nodes:
552
        # Filter out drained, offline and not vm_capable nodes since they will
553
        # not take part in the vm allocation process
554
        if n['vm_capable'] and not n['drained'] and not n['offline']\
555
           and n['cnodes']:
556
            for a in attr:
557
                res[a] += int(n[a])
558
    return res
559

    
560

    
561
def update_resources(backend, resources=None):
562
    """ Update the state of the backend resources in db.
563

564
    """
565

    
566
    if not resources:
567
        resources = get_physical_resources(backend)
568

    
569
    backend.mfree = resources['mfree']
570
    backend.mtotal = resources['mtotal']
571
    backend.dfree = resources['dfree']
572
    backend.dtotal = resources['dtotal']
573
    backend.pinst_cnt = resources['pinst_cnt']
574
    backend.ctotal = resources['ctotal']
575
    backend.updated = datetime.now()
576
    backend.save()
577

    
578

    
579
def get_memory_from_instances(backend):
580
    """ Get the memory that is used from instances.
581

582
    Get the used memory of a backend. Note: This is different for
583
    the real memory used, due to kvm's memory de-duplication.
584

585
    """
586
    with pooled_rapi_client(backend) as client:
587
        instances = client.GetInstances(bulk=True)
588
    mem = 0
589
    for i in instances:
590
        mem += i['oper_ram']
591
    return mem
592

    
593
##
594
## Synchronized operations for reconciliation
595
##
596

    
597

    
598
def create_network_synced(network, backend):
599
    result = _create_network_synced(network, backend)
600
    if result[0] != 'success':
601
        return result
602
    result = connect_network_synced(network, backend)
603
    return result
604

    
605

    
606
def _create_network_synced(network, backend):
607
    with pooled_rapi_client(backend) as client:
608
        backend_jobs = _create_network(network, [backend])
609
        (_, job) = backend_jobs[0]
610
        result = wait_for_job(client, job)
611
    return result
612

    
613

    
614
def connect_network_synced(network, backend):
615
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
616
        mode = 'routed'
617
    else:
618
        mode = 'bridged'
619
    with pooled_rapi_client(backend) as client:
620
        for group in client.GetGroups():
621
            job = client.ConnectNetwork(network.backend_id, group, mode,
622
                                        network.link)
623
            result = wait_for_job(client, job)
624
            if result[0] != 'success':
625
                return result
626

    
627
    return result
628

    
629

    
630
def wait_for_job(client, jobid):
631
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
632
    status = result['job_info'][0]
633
    while status not in ['success', 'error', 'cancel']:
634
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
635
                                        [result], None)
636
        status = result['job_info'][0]
637

    
638
    if status == 'success':
639
        return (status, None)
640
    else:
641
        error = result['job_info'][1]
642
        return (status, error)