Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 44e2c577

History | View | Annotate | Download (20.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import json
35

    
36
from logging import getLogger
37
from django.conf import settings
38
from django.db import transaction
39
from datetime import datetime
40

    
41
from synnefo.db.models import (Backend, VirtualMachine, Network,
42
                               BackendNetwork, BACKEND_STATUSES)
43
from synnefo.logic import utils
44
from synnefo.util.rapi import GanetiRapiClient
45

    
46
log = getLogger('synnefo.logic')
47

    
48

    
49
_firewall_tags = {
50
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
51
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
52
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
53

    
54
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
55

    
56

    
57
def create_client(hostname, port=5080, username=None, password=None):
58
    return GanetiRapiClient(hostname, port, username, password)
59

    
60

    
61
@transaction.commit_on_success
62
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
63
    """Process a job progress notification from the backend
64

65
    Process an incoming message from the backend (currently Ganeti).
66
    Job notifications with a terminating status (sucess, error, or canceled),
67
    also update the operating state of the VM.
68

69
    """
70
    # See #1492, #1031, #1111 why this line has been removed
71
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
72
    if status not in [x[0] for x in BACKEND_STATUSES]:
73
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
74

    
75
    vm.backendjobid = jobid
76
    vm.backendjobstatus = status
77
    vm.backendopcode = opcode
78
    vm.backendlogmsg = logmsg
79

    
80
    # Notifications of success change the operating state
81
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
82
    if status == 'success' and state_for_success is not None:
83
        utils.update_state(vm, state_for_success)
84
        # Set the deleted flag explicitly, cater for admin-initiated removals
85
        if opcode == 'OP_INSTANCE_REMOVE':
86
            vm.deleted = True
87
            vm.nics.all().delete()
88

    
89
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
90
    if status in ('canceled', 'error') and opcode == 'OP_INSTANCE_CREATE':
91
        utils.update_state(vm, 'ERROR')
92

    
93
    # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
94
    # when no instance exists at the Ganeti backend.
95
    # See ticket #799 for all the details.
96
    #
97
    if (status == 'error' and opcode == 'OP_INSTANCE_REMOVE' and
98
        vm.operstate == 'ERROR'):
99
        vm.deleted = True
100
        vm.nics.all().delete()
101

    
102
    vm.backendtime = etime
103
    # Any other notification of failure leaves the operating state unchanged
104

    
105
    vm.save()
106

    
107

    
108
@transaction.commit_on_success
109
def process_net_status(vm, etime, nics):
110
    """Process a net status notification from the backend
111

112
    Process an incoming message from the Ganeti backend,
113
    detailing the NIC configuration of a VM instance.
114

115
    Update the state of the VM in the DB accordingly.
116
    """
117

    
118
    vm.nics.all().delete()
119
    for i, nic in enumerate(nics):
120
        network = nic.get('network', '')
121
        n = str(network)
122
        if n == settings.GANETI_PUBLIC_NETWORK:
123
            net = Network.objects.get(public=True)
124
        else:
125
            pk = utils.id_from_network_name(n)
126
            net = Network.objects.get(id=pk)
127

    
128
        firewall = nic.get('firewall', '')
129
        firewall_profile = _reverse_tags.get(firewall, '')
130
        if not firewall_profile and net.public:
131
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
132

    
133
        vm.nics.create(
134
            network=net,
135
            index=i,
136
            mac=nic.get('mac', ''),
137
            ipv4=nic.get('ip', ''),
138
            ipv6=nic.get('ipv6', ''),
139
            firewall_profile=firewall_profile)
140

    
141
    vm.backendtime = etime
142
    vm.save()
143
    net.save()
144

    
145

    
146
@transaction.commit_on_success
147
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
148
    if status not in [x[0] for x in BACKEND_STATUSES]:
149
        return
150
        #raise Network.InvalidBackendMsgError(opcode, status)
151

    
152
    back_network.backendjobid = jobid
153
    back_network.backendjobstatus = status
154
    back_network.backendopcode = opcode
155
    back_network.backendlogmsg = logmsg
156

    
157
    # Notifications of success change the operating state
158
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
159
    if status == 'success' and state_for_success is not None:
160
        back_network.operstate = state_for_success
161
        if opcode == 'OP_NETWORK_REMOVE':
162
            back_network.deleted = True
163

    
164
    if status in ('canceled', 'error'):
165
        utils.update_state(back_network, 'ERROR')
166

    
167
    if (status == 'error' and opcode == 'OP_NETWORK_REMOVE' and
168
        back_network.operstate == 'ERROR'):
169
        back_network.deleted = True
170
        back_network.operstate = 'DELETED'
171

    
172
    back_network.save()
173

    
174

    
175
@transaction.commit_on_success
176
def process_create_progress(vm, etime, rprogress, wprogress):
177

    
178
    # XXX: This only uses the read progress for now.
179
    #      Explore whether it would make sense to use the value of wprogress
180
    #      somewhere.
181
    percentage = int(rprogress)
182

    
183
    # The percentage may exceed 100%, due to the way
184
    # snf-progress-monitor tracks bytes read by image handling processes
185
    percentage = 100 if percentage > 100 else percentage
186
    if percentage < 0:
187
        raise ValueError("Percentage cannot be negative")
188

    
189
    # FIXME: log a warning here, see #1033
190
#   if last_update > percentage:
191
#       raise ValueError("Build percentage should increase monotonically " \
192
#                        "(old = %d, new = %d)" % (last_update, percentage))
193

    
194
    # This assumes that no message of type 'ganeti-create-progress' is going to
195
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
196
    # the instance is STARTED.  What if the two messages are processed by two
197
    # separate dispatcher threads, and the 'ganeti-op-status' message for
198
    # successful creation gets processed before the 'ganeti-create-progress'
199
    # message? [vkoukis]
200
    #
201
    #if not vm.operstate == 'BUILD':
202
    #    raise VirtualMachine.IllegalState("VM is not in building state")
203

    
204
    vm.buildpercentage = percentage
205
    vm.backendtime = etime
206
    vm.save()
207

    
208

    
209
def start_action(vm, action):
210
    """Update the state of a VM when a new action is initiated."""
211
    if not action in [x[0] for x in VirtualMachine.ACTIONS]:
212
        raise VirtualMachine.InvalidActionError(action)
213

    
214
    # No actions to deleted and no actions beside destroy to suspended VMs
215
    if vm.deleted:
216
        raise VirtualMachine.DeletedError
217

    
218
    # No actions to machines being built. They may be destroyed, however.
219
    if vm.operstate == 'BUILD' and action != 'DESTROY':
220
        raise VirtualMachine.BuildingError
221

    
222
    vm.action = action
223
    vm.backendjobid = None
224
    vm.backendopcode = None
225
    vm.backendjobstatus = None
226
    vm.backendlogmsg = None
227

    
228
    # Update the relevant flags if the VM is being suspended or destroyed.
229
    # Do not set the deleted flag here, see ticket #721.
230
    #
231
    # The deleted flag is set asynchronously, when an OP_INSTANCE_REMOVE
232
    # completes successfully. Hence, a server may be visible for some time
233
    # after a DELETE /servers/id returns HTTP 204.
234
    #
235
    if action == "DESTROY":
236
        # vm.deleted = True
237
        pass
238
    elif action == "SUSPEND":
239
        vm.suspended = True
240
    elif action == "START":
241
        vm.suspended = False
242
    vm.save()
243

    
244

    
245
def create_instance(vm, flavor, image, password, personality):
246
    """`image` is a dictionary which should contain the keys:
247
            'backend_id', 'format' and 'metadata'
248

249
        metadata value should be a dictionary.
250
    """
251
    nic = {'ip': 'pool', 'network': settings.GANETI_PUBLIC_NETWORK}
252

    
253
    if settings.IGNORE_FLAVOR_DISK_SIZES:
254
        if image['backend_id'].find("windows") >= 0:
255
            sz = 14000
256
        else:
257
            sz = 4000
258
    else:
259
        sz = flavor.disk * 1024
260

    
261
    # Handle arguments to CreateInstance() as a dictionary,
262
    # initialize it based on a deployment-specific value.
263
    # This enables the administrator to override deployment-specific
264
    # arguments, such as the disk template to use, name of os provider
265
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
266
    #
267
    kw = settings.GANETI_CREATEINSTANCE_KWARGS
268
    kw['mode'] = 'create'
269
    kw['name'] = vm.backend_vm_id
270
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
271
    kw['disk_template'] = flavor.disk_template
272
    kw['disks'] = [{"size": sz}]
273
    kw['nics'] = [nic]
274
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
275
    # kw['os'] = settings.GANETI_OS_PROVIDER
276
    kw['ip_check'] = False
277
    kw['name_check'] = False
278
    # Do not specific a node explicitly, have
279
    # Ganeti use an iallocator instead
280
    #
281
    # kw['pnode']=rapi.GetNodes()[0]
282
    kw['dry_run'] = settings.TEST
283

    
284
    kw['beparams'] = {
285
        'auto_balance': True,
286
        'vcpus': flavor.cpu,
287
        'memory': flavor.ram}
288

    
289
    kw['osparams'] = {
290
        'img_id': image['backend_id'],
291
        'img_passwd': password,
292
        'img_format': image['format']}
293
    if personality:
294
        kw['osparams']['img_personality'] = json.dumps(personality)
295

    
296
    kw['osparams']['img_properties'] = json.dumps(image['metadata'])
297

    
298
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
299
    # kw['hvparams'] = dict(serial_console=False)
300

    
301
    return vm.client.CreateInstance(**kw)
302

    
303

    
304
def delete_instance(vm):
305
    start_action(vm, 'DESTROY')
306
    vm.client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
307

    
308

    
309
def reboot_instance(vm, reboot_type):
310
    assert reboot_type in ('soft', 'hard')
311
    vm.client.RebootInstance(vm.backend_vm_id, reboot_type, dry_run=settings.TEST)
312
    log.info('Rebooting instance %s', vm.backend_vm_id)
313

    
314

    
315
def startup_instance(vm):
316
    start_action(vm, 'START')
317
    vm.client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
318

    
319

    
320
def shutdown_instance(vm):
321
    start_action(vm, 'STOP')
322
    vm.client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
323

    
324

    
325
def get_instance_console(vm):
326
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
327
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
328
    # useless (see #783).
329
    #
330
    # Until this is fixed on the Ganeti side, construct a console info reply
331
    # directly.
332
    #
333
    # WARNING: This assumes that VNC runs on port network_port on
334
    #          the instance's primary node, and is probably
335
    #          hypervisor-specific.
336
    #
337
    console = {}
338
    console['kind'] = 'vnc'
339
    i = vm.client.GetInstance(vm.backend_vm_id)
340
    if i['hvparams']['serial_console']:
341
        raise Exception("hv parameter serial_console cannot be true")
342
    console['host'] = i['pnode']
343
    console['port'] = i['network_port']
344

    
345
    return console
346
    # return rapi.GetInstanceConsole(vm.backend_vm_id)
347

    
348

    
349
def request_status_update(vm):
350
    return vm.client.GetInstanceInfo(vm.backend_vm_id)
351

    
352

    
353
def update_status(vm, status):
354
    utils.update_state(vm, status)
355

    
356

    
357
def create_network(network, backends=None):
358
    """ Add and connect a network to backends.
359

360
    @param network: Network object
361
    @param backends: List of Backend objects. None defaults to all.
362

363
    """
364
    backend_jobs = _create_network(network, backends)
365
    connect_network(network, backend_jobs)
366
    return network
367

    
368

    
369
def _create_network(network, backends=None):
370
    """Add a network to backends.
371
    @param network: Network object
372
    @param backends: List of Backend objects. None defaults to all.
373

374
    """
375

    
376
    network_type = network.public and 'public' or 'private'
377

    
378
    if not backends:
379
        backends = Backend.objects.exclude(offline=True)
380

    
381
    tags = network.backend_tag
382
    if network.dhcp:
383
        tags.append('nfdhcpd')
384
    tags = ','.join(tags)
385

    
386
    backend_jobs = []
387
    for backend in backends:
388
        job = backend.client.CreateNetwork(
389
                network_name=network.backend_id,
390
                network=network.subnet,
391
                gateway=network.gateway,
392
                network_type=network_type,
393
                mac_prefix=network.mac_prefix,
394
                tags=tags)
395
        backend_jobs.append((backend, job))
396

    
397
    return backend_jobs
398

    
399

    
400
def connect_network(network, backend_jobs=None):
401
    """Connect a network to all nodegroups.
402

403
    @param network: Network object
404
    @param backend_jobs: List of tuples of the form (Backend, jobs) which are
405
                         the backends to connect the network and the jobs on
406
                         which the connect job depends.
407

408
    """
409

    
410
    mode = network.public and 'routed' or 'bridged'
411

    
412
    if not backend_jobs:
413
        backend_jobs = [(backend, []) for backend in
414
                        Backend.objects.exclude(offline=True)]
415

    
416
    for backend, job in backend_jobs:
417
        client = backend.client
418
        for group in client.GetGroups():
419
            client.ConnectNetwork(network.backend_id, group, mode,
420
                                  network.link, [job])
421

    
422

    
423
def connect_network_group(backend, network, group):
424
    """Connect a network to a specific nodegroup of a backend.
425

426
    """
427
    mode = network.public and 'routed' or 'bridged'
428

    
429
    return backend.client.ConnectNetwork(network.backend_id, group, mode,
430
                                         network.link)
431

    
432

    
433
def delete_network(network, backends=None):
434
    """ Disconnect and a remove a network from backends.
435

436
    @param network: Network object
437
    @param backends: List of Backend objects. None defaults to all.
438

439
    """
440
    backend_jobs = disconnect_network(network, backends)
441
    _delete_network(network, backend_jobs)
442

    
443

    
444
def disconnect_network(network, backends=None):
445
    """Disconnect a network from virtualmachines and nodegroups.
446

447
    @param network: Network object
448
    @param backends: List of Backend objects. None defaults to all.
449

450
    """
451

    
452
    if not backends:
453
        backends = Backend.objects.exclude(offline=True)
454

    
455
    backend_jobs = []
456
    for backend in backends:
457
        client = backend.client
458
        jobs = []
459
        for vm in network.machines.filter(backend=backend):
460
            job = disconnect_from_network(vm, network)
461
            jobs.append(job)
462

    
463
        jobs2 = []
464
        for group in client.GetGroups():
465
            job = client.DisconnectNetwork(network.backend_id, group, jobs)
466
            jobs2.append(job)
467
        backend_jobs.append((backend, jobs2))
468

    
469
    return backend_jobs
470

    
471

    
472
def disconnect_from_network(vm, network):
473
    """Disconnect a virtual machine from a network by removing it's nic.
474

475
    @param vm: VirtualMachine object
476
    @param network: Network object
477

478
    """
479

    
480
    nics = vm.nics.filter(network__public=False).order_by('index')
481
    ops = [('remove', nic.index, {}) for nic in nics if nic.network == network]
482
    if not ops:  # Vm not connected to network
483
        return
484
    job = vm.client.ModifyInstance(vm.backend_vm_id, nics=ops[::-1],
485
                                    hotplug=True, dry_run=settings.TEST)
486

    
487
    return job
488

    
489

    
490
def _delete_network(network, backend_jobs=None):
491
    if not backend_jobs:
492
        backend_jobs = [(backend, []) for backend in
493
                Backend.objects.exclude(offline=True)]
494
    for backend, jobs in backend_jobs:
495
        backend.client.DeleteNetwork(network.backend_id, jobs)
496

    
497

    
498
def connect_to_network(vm, network):
499
    """Connect a virtual machine to a network.
500

501
    @param vm: VirtualMachine object
502
    @param network: Network object
503

504
    """
505

    
506
    ip = network.dhcp and 'pool' or None
507

    
508
    nic = {'ip': ip, 'network': network.backend_id}
509
    vm.client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
510
                             hotplug=True, dry_run=settings.TEST)
511

    
512

    
513
def set_firewall_profile(vm, profile):
514
    try:
515
        tag = _firewall_tags[profile]
516
    except KeyError:
517
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
518

    
519
    client = vm.client
520
    # Delete all firewall tags
521
    for t in _firewall_tags.values():
522
        client.DeleteInstanceTags(vm.backend_vm_id, [t], dry_run=settings.TEST)
523

    
524
    client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
525

    
526
    # XXX NOP ModifyInstance call to force process_net_status to run
527
    # on the dispatcher
528
    vm.client.ModifyInstance(vm.backend_vm_id,
529
                        os_name=settings.GANETI_CREATEINSTANCE_KWARGS['os'])
530

    
531

    
532
def get_ganeti_instances(backend=None, bulk=False):
533
    Instances = [c.client.GetInstances(bulk=bulk)\
534
                 for c in get_backends(backend)]
535
    return reduce(list.__add__, Instances, [])
536

    
537

    
538
def get_ganeti_nodes(backend=None, bulk=False):
539
    Nodes = [c.client.GetNodes(bulk=bulk) for c in get_backends(backend)]
540
    return reduce(list.__add__, Nodes, [])
541

    
542

    
543
def get_ganeti_jobs(backend=None, bulk=False):
544
    Jobs = [c.client.GetJobs(bulk=bulk) for c in get_backends(backend)]
545
    return reduce(list.__add__, Jobs, [])
546

    
547
##
548
##
549
##
550

    
551

    
552
def get_backends(backend=None):
553
    if backend:
554
        return [backend]
555
    return Backend.objects.filter(offline=False)
556

    
557

    
558
def get_physical_resources(backend):
559
    """ Get the physical resources of a backend.
560

561
    Get the resources of a backend as reported by the backend (not the db).
562

563
    """
564
    nodes = get_ganeti_nodes(backend, bulk=True)
565
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
566
    res = {}
567
    for a in attr:
568
        res[a] = 0
569
    for n in nodes:
570
        # Filter out drained, offline and not vm_capable nodes since they will
571
        # not take part in the vm allocation process
572
        if n['vm_capable'] and not n['drained'] and not n['offline']\
573
           and n['cnodes']:
574
            for a in attr:
575
                res[a] += int(n[a])
576
    return res
577

    
578

    
579
def update_resources(backend, resources=None):
580
    """ Update the state of the backend resources in db.
581

582
    """
583

    
584
    if not resources:
585
        resources = get_physical_resources(backend)
586

    
587
    backend.mfree = resources['mfree']
588
    backend.mtotal = resources['mtotal']
589
    backend.dfree = resources['dfree']
590
    backend.dtotal = resources['dtotal']
591
    backend.pinst_cnt = resources['pinst_cnt']
592
    backend.ctotal = resources['ctotal']
593
    backend.updated = datetime.now()
594
    backend.save()
595

    
596

    
597
def get_memory_from_instances(backend):
598
    """ Get the memory that is used from instances.
599

600
    Get the used memory of a backend. Note: This is different for
601
    the real memory used, due to kvm's memory de-duplication.
602

603
    """
604
    instances = backend.client.GetInstances(bulk=True)
605
    mem = 0
606
    for i in instances:
607
        mem += i['oper_ram']
608
    return mem
609

    
610
##
611
## Synchronized operations for reconciliation
612
##
613

    
614

    
615
def create_network_synced(network, backend):
616
    result = _create_network_synced(network, backend)
617
    if result[0] != 'success':
618
        return result
619
    result = connect_network_synced(network, backend)
620
    return result
621

    
622

    
623
def _create_network_synced(network, backend):
624
    client = backend.client
625
    job = client.CreateNetwork(network.backend_id, network.subnet)
626
    return wait_for_job(client, job)
627

    
628

    
629
def connect_network_synced(network, backend):
630
    mode = network.public and 'routed' or 'bridged'
631
    client = backend.client
632

    
633
    for group in client.GetGroups():
634
        job = client.ConnectNetwork(network.backend_id, group, mode,
635
                                    network.link)
636
        result = wait_for_job(client, job)
637
        if result[0] != 'success':
638
            return result
639

    
640
    return result
641

    
642

    
643
def wait_for_job(client, jobid):
644
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
645
    status = result['job_info'][0]
646
    while status not in ['success', 'error', 'cancel']:
647
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
648
                                        [result], None)
649
        status = result['job_info'][0]
650

    
651
    if status == 'success':
652
        return (status, None)
653
    else:
654
        error = result['job_info'][1]
655
        return (status, error)