Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 9c65f166

History | View | Annotate | Download (20.6 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from logging import getLogger
37
from django.conf import settings
38
from django.db import transaction
39
from datetime import datetime
40

    
41
from synnefo.db.models import (Backend, VirtualMachine, Network,
42
                               BackendNetwork, BACKEND_STATUSES)
43
from synnefo.logic import utils
44
from synnefo.util.rapi import GanetiRapiClient
45

    
46
log = getLogger('synnefo.logic')
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
def create_client(hostname, port=5080, username=None, password=None):
58
    return GanetiRapiClient(hostname, port, username, password)
59

    
60

    
61
@transaction.commit_on_success
62
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
63
    """Process a job progress notification from the backend
64

65
    Process an incoming message from the backend (currently Ganeti).
66
    Job notifications with a terminating status (sucess, error, or canceled),
67
    also update the operating state of the VM.
68

69
    """
70
    # See #1492, #1031, #1111 why this line has been removed
71
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
72
    if status not in [x[0] for x in BACKEND_STATUSES]:
73
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
74

    
75
    vm.backendjobid = jobid
76
    vm.backendjobstatus = status
77
    vm.backendopcode = opcode
78
    vm.backendlogmsg = logmsg
79

    
80
    # Notifications of success change the operating state
81
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
82
    if status == 'success' and state_for_success is not None:
83
        utils.update_state(vm, state_for_success)
84
        # Set the deleted flag explicitly, cater for admin-initiated removals
85
        if opcode == 'OP_INSTANCE_REMOVE':
86
            vm.deleted = True
87
            vm.nics.all().delete()
88

    
89
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
90
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
91
        utils.update_state(vm, 'ERROR')
92

    
93
    # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
94
    # when no instance exists at the Ganeti backend.
95
    # See ticket #799 for all the details.
96
    #
97
    if (status == 'error' and opcode == 'OP_INSTANCE_REMOVE'):
98
        vm.deleted = True
99
        vm.nics.all().delete()
100

    
101
    vm.backendtime = etime
102
    # Any other notification of failure leaves the operating state unchanged
103

    
104
    vm.save()
105

    
106

    
107
@transaction.commit_on_success
108
def process_net_status(vm, etime, nics):
109
    """Process a net status notification from the backend
110

111
    Process an incoming message from the Ganeti backend,
112
    detailing the NIC configuration of a VM instance.
113

114
    Update the state of the VM in the DB accordingly.
115
    """
116

    
117
    vm.nics.all().delete()
118
    for i, nic in enumerate(nics):
119
        network = nic.get('network', '')
120
        n = str(network)
121
        if n == settings.GANETI_PUBLIC_NETWORK:
122
            net = Network.objects.get(public=True)
123
        else:
124
            pk = utils.id_from_network_name(n)
125
            net = Network.objects.get(id=pk)
126

    
127
        firewall = nic.get('firewall', '')
128
        firewall_profile = _reverse_tags.get(firewall, '')
129
        if not firewall_profile and net.public:
130
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
131

    
132
        vm.nics.create(
133
            network=net,
134
            index=i,
135
            mac=nic.get('mac', ''),
136
            ipv4=nic.get('ip', ''),
137
            ipv6=nic.get('ipv6', ''),
138
            firewall_profile=firewall_profile,
139
            dirty=False)
140

    
141
    vm.backendtime = etime
142
    vm.save()
143
    net.save()
144

    
145

    
146
@transaction.commit_on_success
147
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
148
    if status not in [x[0] for x in BACKEND_STATUSES]:
149
        return
150
        #raise Network.InvalidBackendMsgError(opcode, status)
151

    
152
    back_network.backendjobid = jobid
153
    back_network.backendjobstatus = status
154
    back_network.backendopcode = opcode
155
    back_network.backendlogmsg = logmsg
156

    
157
    # Notifications of success change the operating state
158
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
159
    if status == 'success' and state_for_success is not None:
160
        back_network.operstate = state_for_success
161
        if opcode == 'OP_NETWORK_REMOVE':
162
            back_network.deleted = True
163

    
164
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_CREATE':
165
        utils.update_state(back_network, 'ERROR')
166

    
167
    if (status == 'error' and opcode == 'OP_NETWORK_REMOVE'):
168
        back_network.deleted = True
169
        back_network.operstate = 'DELETED'
170

    
171
    back_network.save()
172

    
173

    
174
@transaction.commit_on_success
175
def process_create_progress(vm, etime, rprogress, wprogress):
176

    
177
    # XXX: This only uses the read progress for now.
178
    #      Explore whether it would make sense to use the value of wprogress
179
    #      somewhere.
180
    percentage = int(rprogress)
181

    
182
    # The percentage may exceed 100%, due to the way
183
    # snf-progress-monitor tracks bytes read by image handling processes
184
    percentage = 100 if percentage > 100 else percentage
185
    if percentage < 0:
186
        raise ValueError("Percentage cannot be negative")
187

    
188
    # FIXME: log a warning here, see #1033
189
#   if last_update > percentage:
190
#       raise ValueError("Build percentage should increase monotonically " \
191
#                        "(old = %d, new = %d)" % (last_update, percentage))
192

    
193
    # This assumes that no message of type 'ganeti-create-progress' is going to
194
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
195
    # the instance is STARTED.  What if the two messages are processed by two
196
    # separate dispatcher threads, and the 'ganeti-op-status' message for
197
    # successful creation gets processed before the 'ganeti-create-progress'
198
    # message? [vkoukis]
199
    #
200
    #if not vm.operstate == 'BUILD':
201
    #    raise VirtualMachine.IllegalState("VM is not in building state")
202

    
203
    vm.buildpercentage = percentage
204
    vm.backendtime = etime
205
    vm.save()
206

    
207

    
208
def start_action(vm, action):
209
    """Update the state of a VM when a new action is initiated."""
210
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
211
        raise VirtualMachine.InvalidActionError(action)
212

    
213
    # No actions to deleted and no actions beside destroy to suspended VMs
214
    if vm.deleted:
215
        raise VirtualMachine.DeletedError
216

    
217
    # No actions to machines being built. They may be destroyed, however.
218
    if vm.operstate == 'BUILD' and action != 'DESTROY':
219
        raise VirtualMachine.BuildingError
220

    
221
    vm.action = action
222
    vm.backendjobid = None
223
    vm.backendopcode = None
224
    vm.backendjobstatus = None
225
    vm.backendlogmsg = None
226

    
227
    # Update the relevant flags if the VM is being suspended or destroyed.
228
    # Do not set the deleted flag here, see ticket #721.
229
    #
230
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
231
    # completes successfully. Hence, a server may be visible for some time
232
    # after a DELETE /servers/id returns HTTP 204.
233
    #
234
    if action == "DESTROY":
235
        # vm.deleted = True
236
        pass
237
    elif action == "SUSPEND":
238
        vm.suspended = True
239
    elif action == "START":
240
        vm.suspended = False
241
    vm.save()
242

    
243

    
244
def create_instance(vm, flavor, image, password, personality):
245
    """`image` is a dictionary which should contain the keys:
246
            'backend_id', 'format' and 'metadata'
247

248
        metadata value should be a dictionary.
249
    """
250
    nic = {'ip': 'pool', 'network': settings.GANETI_PUBLIC_NETWORK}
251

    
252
    if settings.IGNORE_FLAVOR_DISK_SIZES:
253
        if image['backend_id'].find("windows") >= 0:
254
            sz = 14000
255
        else:
256
            sz = 4000
257
    else:
258
        sz = flavor.disk * 1024
259

    
260
    # Handle arguments to CreateInstance() as a dictionary,
261
    # initialize it based on a deployment-specific value.
262
    # This enables the administrator to override deployment-specific
263
    # arguments, such as the disk template to use, name of os provider
264
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
265
    #
266
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
267
    kw['mode'] = 'create'
268
    kw['name'] = vm.backend_vm_id
269
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
270
    kw['disk_template'] = flavor.disk_template
271
    kw['disks'] = [{"size": sz}]
272
    kw['nics'] = [nic]
273
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
274
    # kw['os'] = settings.GANETI_OS_PROVIDER
275
    kw['ip_check'] = False
276
    kw['name_check'] = False
277
    # Do not specific a node explicitly, have
278
    # Ganeti use an iallocator instead
279
    #
280
    # kw['pnode']=rapi.GetNodes()[0]
281
    kw['dry_run'] = settings.TEST
282

    
283
    kw['beparams'] = {
284
        'auto_balance': True,
285
        'vcpus': flavor.cpu,
286
        'memory': flavor.ram}
287

    
288
    kw['osparams'] = {
289
        'img_id': image['backend_id'],
290
        'img_passwd': password,
291
        'img_format': image['format']}
292
    if personality:
293
        kw['osparams']['img_personality'] = json.dumps(personality)
294

    
295
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
296

    
297
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
298
    # kw['hvparams'] = dict(serial_console=False)
299

    
300
    return vm.client.CreateInstance(**kw)
301

    
302

    
303
def delete_instance(vm):
304
    start_action(vm, 'DESTROY')
305
    vm.client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
306

    
307

    
308
def reboot_instance(vm, reboot_type):
309
    assert reboot_type in ('soft', 'hard')
310
    vm.client.RebootInstance(vm.backend_vm_id, reboot_type, dry_run=settings.TEST)
311
    log.info('Rebooting instance %s', vm.backend_vm_id)
312

    
313

    
314
def startup_instance(vm):
315
    start_action(vm, 'START')
316
    vm.client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
317

    
318

    
319
def shutdown_instance(vm):
320
    start_action(vm, 'STOP')
321
    vm.client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
322

    
323

    
324
def get_instance_console(vm):
325
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
326
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
327
    # useless (see #783).
328
    #
329
    # Until this is fixed on the Ganeti side, construct a console info reply
330
    # directly.
331
    #
332
    # WARNING: This assumes that VNC runs on port network_port on
333
    #          the instance's primary node, and is probably
334
    #          hypervisor-specific.
335
    #
336
    console = {}
337
    console['kind'] = 'vnc'
338
    i = vm.client.GetInstance(vm.backend_vm_id)
339
    if i['hvparams']['serial_console']:
340
        raise Exception("hv parameter serial_console cannot be true")
341
    console['host'] = i['pnode']
342
    console['port'] = i['network_port']
343

    
344
    return console
345
    # return rapi.GetInstanceConsole(vm.backend_vm_id)
346

    
347

    
348
def request_status_update(vm):
349
    return vm.client.GetInstanceInfo(vm.backend_vm_id)
350

    
351

    
352
def update_status(vm, status):
353
    utils.update_state(vm, status)
354

    
355

    
356
def create_network(network, backends=None):
357
    """ Add and connect a network to backends.
358

359
    @param network: Network object
360
    @param backends: List of Backend objects. None defaults to all.
361

362
    """
363
    backend_jobs = _create_network(network, backends)
364
    connect_network(network, backend_jobs)
365
    return network
366

    
367

    
368
def _create_network(network, backends=None):
369
    """Add a network to backends.
370
    @param network: Network object
371
    @param backends: List of Backend objects. None defaults to all.
372

373
    """
374

    
375
    network_type = network.public and 'public' or 'private'
376

    
377
    if not backends:
378
        backends = Backend.objects.exclude(offline=True)
379

    
380
    tags = network.backend_tag
381
    if network.dhcp:
382
        tags.append('nfdhcpd')
383
    tags = ','.join(tags)
384

    
385
    backend_jobs = []
386
    for backend in backends:
387
        job = backend.client.CreateNetwork(
388
                network_name=network.backend_id,
389
                network=network.subnet,
390
                gateway=network.gateway,
391
                network_type=network_type,
392
                mac_prefix=network.mac_prefix,
393
                tags=tags)
394
        backend_jobs.append((backend, job))
395

    
396
    return backend_jobs
397

    
398

    
399
def connect_network(network, backend_jobs=None):
400
    """Connect a network to all nodegroups.
401

402
    @param network: Network object
403
    @param backend_jobs: List of tuples of the form (Backend, jobs) which are
404
                         the backends to connect the network and the jobs on
405
                         which the connect job depends.
406

407
    """
408

    
409
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
410
        mode = 'routed'
411
    else:
412
        mode = 'bridged'
413

    
414
    if not backend_jobs:
415
        backend_jobs = [(backend, []) for backend in
416
                        Backend.objects.exclude(offline=True)]
417

    
418
    for backend, job in backend_jobs:
419
        client = backend.client
420
        for group in client.GetGroups():
421
            client.ConnectNetwork(network.backend_id, group, mode,
422
                                  network.link, [job])
423

    
424

    
425
def connect_network_group(backend, network, group):
426
    """Connect a network to a specific nodegroup of a backend.
427

428
    """
429
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
430
        mode = 'routed'
431
    else:
432
        mode = 'bridged'
433

    
434
    return backend.client.ConnectNetwork(network.backend_id, group, mode,
435
                                         network.link)
436

    
437

    
438
def delete_network(network, backends=None):
439
    """ Disconnect and a remove a network from backends.
440

441
    @param network: Network object
442
    @param backends: List of Backend objects. None defaults to all.
443

444
    """
445
    backend_jobs = disconnect_network(network, backends)
446
    _delete_network(network, backend_jobs)
447

    
448

    
449
def disconnect_network(network, backends=None):
450
    """Disconnect a network from all nodegroups.
451

452
    @param network: Network object
453
    @param backends: List of Backend objects. None defaults to all.
454

455
    """
456

    
457
    if not backends:
458
        backends = Backend.objects.exclude(offline=True)
459

    
460
    backend_jobs = []
461
    for backend in backends:
462
        client = backend.client
463
        jobs = []
464
        for group in client.GetGroups():
465
            job = client.DisconnectNetwork(network.backend_id, group)
466
            jobs.append(job)
467
        backend_jobs.append((backend, jobs))
468

    
469
    return backend_jobs
470

    
471

    
472
def disconnect_from_network(vm, nic):
473
    """Disconnect a virtual machine from a network by removing it's nic.
474

475
    @param vm: VirtualMachine object
476
    @param network: Network object
477

478
    """
479

    
480
    op = [('remove', nic.index, {})]
481
    return vm.client.ModifyInstance(vm.backend_vm_id, nics=op,
482
                                   hotplug=True, dry_run=settings.TEST)
483

    
484

    
485
def _delete_network(network, backend_jobs=None):
486
    if not backend_jobs:
487
        backend_jobs = [(backend, []) for backend in
488
                Backend.objects.exclude(offline=True)]
489
    for backend, jobs in backend_jobs:
490
        backend.client.DeleteNetwork(network.backend_id, jobs)
491

    
492

    
493
def connect_to_network(vm, network):
494
    """Connect a virtual machine to a network.
495

496
    @param vm: VirtualMachine object
497
    @param network: Network object
498

499
    """
500

    
501
    ip = network.dhcp and 'pool' or None
502

    
503
    nic = {'ip': ip, 'network': network.backend_id}
504
    vm.client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
505
                             hotplug=True, dry_run=settings.TEST)
506

    
507

    
508
def set_firewall_profile(vm, profile):
509
    try:
510
        tag = _firewall_tags[profile]
511
    except KeyError:
512
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
513

    
514
    client = vm.client
515
    # Delete all firewall tags
516
    for t in _firewall_tags.values():
517
        client.DeleteInstanceTags(vm.backend_vm_id, [t], dry_run=settings.TEST)
518

    
519
    client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
520

    
521
    # XXX NOP ModifyInstance call to force process_net_status to run
522
    # on the dispatcher
523
    vm.client.ModifyInstance(vm.backend_vm_id,
524
                        os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
525

    
526

    
527
def get_ganeti_instances(backend=None, bulk=False):
528
    Instances = [c.client.GetInstances(bulk=bulk)\
529
                 for c in get_backends(backend)]
530
    return reduce(list.__add__, Instances, [])
531

    
532

    
533
def get_ganeti_nodes(backend=None, bulk=False):
534
    Nodes = [c.client.GetNodes(bulk=bulk) for c in get_backends(backend)]
535
    return reduce(list.__add__, Nodes, [])
536

    
537

    
538
def get_ganeti_jobs(backend=None, bulk=False):
539
    Jobs = [c.client.GetJobs(bulk=bulk) for c in get_backends(backend)]
540
    return reduce(list.__add__, Jobs, [])
541

    
542
##
543
##
544
##
545

    
546

    
547
def get_backends(backend=None):
548
    if backend:
549
        return [backend]
550
    return Backend.objects.filter(offline=False)
551

    
552

    
553
def get_physical_resources(backend):
554
    """ Get the physical resources of a backend.
555

556
    Get the resources of a backend as reported by the backend (not the db).
557

558
    """
559
    nodes = get_ganeti_nodes(backend, bulk=True)
560
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
561
    res = {}
562
    for a in attr:
563
        res[a] = 0
564
    for n in nodes:
565
        # Filter out drained, offline and not vm_capable nodes since they will
566
        # not take part in the vm allocation process
567
        if n['vm_capable'] and not n['drained'] and not n['offline']\
568
           and n['cnodes']:
569
            for a in attr:
570
                res[a] += int(n[a])
571
    return res
572

    
573

    
574
def update_resources(backend, resources=None):
575
    """ Update the state of the backend resources in db.
576

577
    """
578

    
579
    if not resources:
580
        resources = get_physical_resources(backend)
581

    
582
    backend.mfree = resources['mfree']
583
    backend.mtotal = resources['mtotal']
584
    backend.dfree = resources['dfree']
585
    backend.dtotal = resources['dtotal']
586
    backend.pinst_cnt = resources['pinst_cnt']
587
    backend.ctotal = resources['ctotal']
588
    backend.updated = datetime.now()
589
    backend.save()
590

    
591

    
592
def get_memory_from_instances(backend):
593
    """ Get the memory that is used from instances.
594

595
    Get the used memory of a backend. Note: This is different for
596
    the real memory used, due to kvm's memory de-duplication.
597

598
    """
599
    instances = backend.client.GetInstances(bulk=True)
600
    mem = 0
601
    for i in instances:
602
        mem += i['oper_ram']
603
    return mem
604

    
605
##
606
## Synchronized operations for reconciliation
607
##
608

    
609

    
610
def create_network_synced(network, backend):
611
    result = _create_network_synced(network, backend)
612
    if result[0] != 'success':
613
        return result
614
    result = connect_network_synced(network, backend)
615
    return result
616

    
617

    
618
def _create_network_synced(network, backend):
619
    client = backend.client
620
    job = client.CreateNetwork(network.backend_id, network.subnet)
621
    return wait_for_job(client, job)
622

    
623

    
624
def connect_network_synced(network, backend):
625
    if network.type in ('PUBLIC_ROUTED', 'CUSTOM_ROUTED'):
626
        mode = 'routed'
627
    else:
628
        mode = 'bridged'
629
    client = backend.client
630

    
631
    for group in client.GetGroups():
632
        job = client.ConnectNetwork(network.backend_id, group, mode,
633
                                    network.link)
634
        result = wait_for_job(client, job)
635
        if result[0] != 'success':
636
            return result
637

    
638
    return result
639

    
640

    
641
def wait_for_job(client, jobid):
642
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
643
    status = result['job_info'][0]
644
    while status not in ['success', 'error', 'cancel']:
645
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
646
                                        [result], None)
647
        status = result['job_info'][0]
648

    
649
    if status == 'success':
650
        return (status, None)
651
    else:
652
        error = result['job_info'][1]
653
        return (status, error)