Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / servers.py @ 0292883e

History | View | Annotate | Download (20.8 kB)

1
import logging
2

    
3
from socket import getfqdn
4
from functools import wraps
5
from django import dispatch
6
from django.db import transaction
7
from django.utils import simplejson as json
8

    
9
from snf_django.lib.api import faults
10
from django.conf import settings
11
from synnefo import quotas
12
from synnefo.api import util
13
from synnefo.logic import backend, ips
14
from synnefo.logic.backend_allocator import BackendAllocator
15
from synnefo.db.models import (NetworkInterface, VirtualMachine,
16
                               VirtualMachineMetadata, IPAddressLog)
17
from vncauthproxy.client import request_forwarding as request_vnc_forwarding
18

    
19
log = logging.getLogger(__name__)
20

    
21
# server creation signal
22
server_created = dispatch.Signal(providing_args=["created_vm_params"])
23

    
24

    
25
def validate_server_action(vm, action):
26
    if vm.deleted:
27
        raise faults.BadRequest("Server '%s' has been deleted." % vm.id)
28

    
29
    # Destroyin a server should always be permitted
30
    if action == "DESTROY":
31
        return
32

    
33
    # Check that there is no pending action
34
    pending_action = vm.task
35
    if pending_action:
36
        if pending_action == "BUILD":
37
            raise faults.BuildInProgress("Server '%s' is being build." % vm.id)
38
        raise faults.BadRequest("Can not perform '%s' action while there is a"
39
                                " pending '%s'." % (action, pending_action))
40

    
41
    # Check if action can be performed to VM's operstate
42
    operstate = vm.operstate
43
    if operstate == "BUILD" and action != "BUILD":
44
        raise faults.BuildInProgress("Server '%s' is being build." % vm.id)
45
    elif (action == "START" and operstate != "STOPPED") or\
46
         (action == "STOP" and operstate != "STARTED") or\
47
         (action == "RESIZE" and operstate != "STOPPED") or\
48
         (action in ["CONNECT", "DISCONNECT"] and operstate != "STOPPED"
49
          and not settings.GANETI_USE_HOTPLUG):
50
        raise faults.BadRequest("Can not perform '%s' action while server is"
51
                                " in '%s' state." % (action, operstate))
52
    return
53

    
54

    
55
def server_command(action):
56
    """Handle execution of a server action.
57

58
    Helper function to validate and execute a server action, handle quota
59
    commission and update the 'task' of the VM in the DB.
60

61
    1) Check if action can be performed. If it can, then there must be no
62
       pending task (with the exception of DESTROY).
63
    2) Handle previous commission if unresolved:
64
       * If it is not pending and it to accept, then accept
65
       * If it is not pending and to reject or is pending then reject it. Since
66
       the action can be performed only if there is no pending task, then there
67
       can be no pending commission. The exception is DESTROY, but in this case
68
       the commission can safely be rejected, and the dispatcher will generate
69
       the correct ones!
70
    3) Issue new commission and associate it with the VM. Also clear the task.
71
    4) Send job to ganeti
72
    5) Update task and commit
73
    """
74
    def decorator(func):
75
        @wraps(func)
76
        @transaction.commit_on_success
77
        def wrapper(vm, *args, **kwargs):
78
            user_id = vm.userid
79
            validate_server_action(vm, action)
80
            vm.action = action
81

    
82
            commission_name = "client: api, resource: %s" % vm
83
            quotas.handle_resource_commission(vm, action=action,
84
                                              commission_name=commission_name)
85
            vm.save()
86

    
87
            # XXX: Special case for server creation!
88
            if action == "BUILD":
89
                # Perform a commit, because the VirtualMachine must be saved to
90
                # DB before the OP_INSTANCE_CREATE job in enqueued in Ganeti.
91
                # Otherwise, messages will arrive from snf-dispatcher about
92
                # this instance, before the VM is stored in DB.
93
                transaction.commit()
94
                # After committing the locks are released. Refetch the instance
95
                # to guarantee x-lock.
96
                vm = VirtualMachine.objects.select_for_update().get(id=vm.id)
97

    
98
            # Send the job to Ganeti and get the associated jobID
99
            try:
100
                job_id = func(vm, *args, **kwargs)
101
            except Exception as e:
102
                if vm.serial is not None:
103
                    # Since the job never reached Ganeti, reject the commission
104
                    log.debug("Rejecting commission: '%s', could not perform"
105
                              " action '%s': %s" % (vm.serial,  action, e))
106
                    transaction.rollback()
107
                    quotas.reject_serial(vm.serial)
108
                    transaction.commit()
109
                raise
110

    
111
            if action == "BUILD" and vm.serial is not None:
112
                # XXX: Special case for server creation: we must accept the
113
                # commission because the VM has been stored in DB. Also, if
114
                # communication with Ganeti fails, the job will never reach
115
                # Ganeti, and the commission will never be resolved.
116
                quotas.accept_serial(vm.serial)
117

    
118
            log.info("user: %s, vm: %s, action: %s, job_id: %s, serial: %s",
119
                     user_id, vm.id, action, job_id, vm.serial)
120

    
121
            # store the new task in the VM
122
            if job_id is not None:
123
                vm.task = action
124
                vm.task_job_id = job_id
125
            vm.save()
126

    
127
            return vm
128
        return wrapper
129
    return decorator
130

    
131

    
132
@transaction.commit_on_success
133
def create(userid, name, password, flavor, image, metadata={},
134
           personality=[], networks=None, floating_ips=None,
135
           use_backend=None):
136
    if use_backend is None:
137
        # Allocate server to a Ganeti backend
138
        use_backend = allocate_new_server(userid, flavor)
139

    
140
    if networks is None:
141
        networks = []
142
    if floating_ips is None:
143
        floating_ips = []
144

    
145
    # Fix flavor for archipelago
146
    disk_template, provider = util.get_flavor_provider(flavor)
147
    if provider:
148
        flavor.disk_template = disk_template
149
        flavor.disk_provider = provider
150
        flavor.disk_origin = None
151
        if provider == 'vlmc':
152
            flavor.disk_origin = image['checksum']
153
            image['backend_id'] = 'null'
154
    else:
155
        flavor.disk_provider = None
156

    
157
    # We must save the VM instance now, so that it gets a valid
158
    # vm.backend_vm_id.
159
    vm = VirtualMachine.objects.create(name=name,
160
                                       backend=use_backend,
161
                                       userid=userid,
162
                                       imageid=image["id"],
163
                                       flavor=flavor,
164
                                       operstate="BUILD")
165
    log.info("Created entry in DB for VM '%s'", vm)
166

    
167
    nics = create_instance_nics(vm, userid, networks, floating_ips)
168

    
169
    for key, val in metadata.items():
170
        VirtualMachineMetadata.objects.create(
171
            meta_key=key,
172
            meta_value=val,
173
            vm=vm)
174

    
175
    # Create the server in Ganeti.
176
    vm = create_server(vm, nics, flavor, image, personality, password)
177

    
178
    return vm
179

    
180

    
181
@transaction.commit_on_success
182
def allocate_new_server(userid, flavor):
183
    """Allocate a new server to a Ganeti backend.
184

185
    Allocation is performed based on the owner of the server and the specified
186
    flavor. Also, backends that do not have a public IPv4 address are excluded
187
    from server allocation.
188

189
    This function runs inside a transaction, because after allocating the
190
    instance a commit must be performed in order to release all locks.
191

192
    """
193
    backend_allocator = BackendAllocator()
194
    use_backend = backend_allocator.allocate(userid, flavor)
195
    if use_backend is None:
196
        log.error("No available backend for VM with flavor %s", flavor)
197
        raise faults.ServiceUnavailable("No available backends")
198
    return use_backend
199

    
200

    
201
@server_command("BUILD")
202
def create_server(vm, nics, flavor, image, personality, password):
203
    # dispatch server created signal needed to trigger the 'vmapi', which
204
    # enriches the vm object with the 'config_url' attribute which must be
205
    # passed to the Ganeti job.
206
    server_created.send(sender=vm, created_vm_params={
207
        'img_id': image['backend_id'],
208
        'img_passwd': password,
209
        'img_format': str(image['format']),
210
        'img_personality': json.dumps(personality),
211
        'img_properties': json.dumps(image['metadata']),
212
    })
213
    # send job to Ganeti
214
    try:
215
        jobID = backend.create_instance(vm, nics, flavor, image)
216
    except:
217
        log.exception("Failed create instance '%s'", vm)
218
        jobID = None
219
        vm.operstate = "ERROR"
220
        vm.backendlogmsg = "Failed to send job to Ganeti."
221
        vm.save()
222
        vm.nics.all().update(state="ERROR")
223

    
224
    # At this point the job is enqueued in the Ganeti backend
225
    vm.backendjobid = jobID
226
    vm.save()
227
    log.info("User %s created VM %s, NICs %s, Backend %s, JobID %s",
228
             vm.userid, vm, nics, backend, str(jobID))
229

    
230
    return jobID
231

    
232

    
233
def create_instance_nics(vm, userid, networks=[], floating_ips=[]):
234
    """Create NICs for VirtualMachine.
235

236
    Helper function for allocating IP addresses and creating NICs in the DB
237
    for a VirtualMachine. Created NICs are the combination of the default
238
    network policy (defined by administration settings) and the networks
239
    defined by the user.
240

241
    """
242
    ports = []
243
    for network_id in settings.DEFAULT_INSTANCE_NETWORKS:
244
        if network_id == "SNF:ANY_PUBLIC":
245
            ipaddress = ips.allocate_public_ip(userid=userid,
246
                                               backend=vm.backend)
247
            port = _create_port(userid, network=ipaddress.network,
248
                                use_ipaddress=ipaddress)
249
        else:
250
            try:
251
                network = util.get_network(network_id, userid,
252
                                           non_deleted=True)
253
            except faults.ItemNotFound:
254
                msg = "Invalid configuration. Setting"\
255
                      " 'DEFAULT_INSTANCE_NETWORKS' contains invalid"\
256
                      " network '%s'" % network_id
257
                log.error(msg)
258
                raise faults.InternalServerError(msg)
259
            port = _create_port(userid, network)
260
        ports.append(port)
261

    
262
    for floating_ip_id in floating_ips:
263
        floating_ip = util.get_floating_ip_by_id(userid, floating_ip_id,
264
                                                 for_update=True)
265
        port = _create_port(userid, network=floating_ip.network,
266
                            use_ipaddress=floating_ip)
267
        ports.append(port)
268

    
269
    for net in networks:
270
        port_id = net.get("port")
271
        net_id = net.get("uuid")
272
        if port_id is not None:
273
            port = util.get_port(port_id, userid, for_update=True)
274
            ports.append(port)
275
        elif net_id is not None:
276
            address = net.get("fixed_ip")
277
            network = util.get_network(net_id, userid, non_deleted=True)
278
            if network.public:
279
                if address is None:
280
                    msg = ("Can not connect to public network %s. Specify"
281
                           " 'fixed_ip'" " attribute to connect to a public"
282
                           " network")
283
                    raise faults.BadRequest(msg % network.id)
284
                floating_ip = util.get_floating_ip_by_address(userid,
285
                                                              address,
286
                                                              for_update=True)
287
                port = _create_port(userid, network, use_ipaddress=floating_ip)
288
            else:
289
                port = _create_port(userid, network, address=address)
290
            ports.append(port)
291
        else:
292
            raise faults.BadRequest("Network 'uuid' or 'port' attribute"
293
                                    " is required.")
294

    
295
    for index, port in enumerate(ports):
296
        associate_port_with_machine(port, vm)
297
        port.index = index
298
        port.save()
299
    return ports
300

    
301

    
302
@server_command("DESTROY")
303
def destroy(vm):
304
    log.info("Deleting VM %s", vm)
305
    return backend.delete_instance(vm)
306

    
307

    
308
@server_command("START")
309
def start(vm):
310
    log.info("Starting VM %s", vm)
311
    return backend.startup_instance(vm)
312

    
313

    
314
@server_command("STOP")
315
def stop(vm):
316
    log.info("Stopping VM %s", vm)
317
    return backend.shutdown_instance(vm)
318

    
319

    
320
@server_command("REBOOT")
321
def reboot(vm, reboot_type):
322
    if reboot_type not in ("SOFT", "HARD"):
323
        raise faults.BadRequest("Malformed request. Invalid reboot"
324
                                " type %s" % reboot_type)
325
    log.info("Rebooting VM %s. Type %s", vm, reboot_type)
326

    
327
    return backend.reboot_instance(vm, reboot_type.lower())
328

    
329

    
330
@server_command("RESIZE")
331
def resize(vm, flavor):
332
    old_flavor = vm.flavor
333
    # User requested the same flavor
334
    if old_flavor.id == flavor.id:
335
        raise faults.BadRequest("Server '%s' flavor is already '%s'."
336
                                % (vm, flavor))
337
        return None
338
    # Check that resize can be performed
339
    if old_flavor.disk != flavor.disk:
340
        raise faults.BadRequest("Can not resize instance disk.")
341
    if old_flavor.disk_template != flavor.disk_template:
342
        raise faults.BadRequest("Can not change instance disk template.")
343

    
344
    log.info("Resizing VM from flavor '%s' to '%s", old_flavor, flavor)
345
    commission_info = {"cyclades.cpu": flavor.cpu - old_flavor.cpu,
346
                       "cyclades.ram": 1048576 * (flavor.ram - old_flavor.ram)}
347
    # Save serial to VM, since it is needed by server_command decorator
348
    vm.serial = quotas.issue_commission(user=vm.userid,
349
                                        source=quotas.DEFAULT_SOURCE,
350
                                        provisions=commission_info,
351
                                        name="resource: %s. resize" % vm)
352
    return backend.resize_instance(vm, vcpus=flavor.cpu, memory=flavor.ram)
353

    
354

    
355
@server_command("SET_FIREWALL_PROFILE")
356
def set_firewall_profile(vm, profile, nic):
357
    log.info("Setting VM %s, NIC %s, firewall %s", vm, nic, profile)
358

    
359
    if profile not in [x[0] for x in NetworkInterface.FIREWALL_PROFILES]:
360
        raise faults.BadRequest("Unsupported firewall profile")
361
    backend.set_firewall_profile(vm, profile=profile, nic=nic)
362
    return None
363

    
364

    
365
@server_command("CONNECT")
366
def connect(vm, network, port=None):
367
    if port is None:
368
        port = _create_port(vm.userid, network)
369
    associate_port_with_machine(port, vm)
370

    
371
    log.info("Creating NIC %s with IPv4 Address %s", port, port.ipv4_address)
372

    
373
    return backend.connect_to_network(vm, port)
374

    
375

    
376
@server_command("DISCONNECT")
377
def disconnect(vm, nic):
378
    log.info("Removing NIC %s from VM %s", nic, vm)
379
    return backend.disconnect_from_network(vm, nic)
380

    
381

    
382
def console(vm, console_type):
383
    """Arrange for an OOB console of the specified type
384

385
    This method arranges for an OOB console of the specified type.
386
    Only consoles of type "vnc" are supported for now.
387

388
    It uses a running instance of vncauthproxy to setup proper
389
    VNC forwarding with a random password, then returns the necessary
390
    VNC connection info to the caller.
391

392
    """
393
    log.info("Get console  VM %s, type %s", vm, console_type)
394

    
395
    # Use RAPI to get VNC console information for this instance
396
    if vm.operstate != "STARTED":
397
        raise faults.BadRequest('Server not in ACTIVE state.')
398

    
399
    if settings.TEST:
400
        console_data = {'kind': 'vnc', 'host': 'ganeti_node', 'port': 1000}
401
    else:
402
        console_data = backend.get_instance_console(vm)
403

    
404
    if console_data['kind'] != 'vnc':
405
        message = 'got console of kind %s, not "vnc"' % console_data['kind']
406
        raise faults.ServiceUnavailable(message)
407

    
408
    # Let vncauthproxy decide on the source port.
409
    # The alternative: static allocation, e.g.
410
    # sport = console_data['port'] - 1000
411
    sport = 0
412
    daddr = console_data['host']
413
    dport = console_data['port']
414
    password = util.random_password()
415

    
416
    if settings.TEST:
417
        fwd = {'source_port': 1234, 'status': 'OK'}
418
    else:
419
        fwd = request_vnc_forwarding(sport, daddr, dport, password)
420

    
421
    if fwd['status'] != "OK":
422
        raise faults.ServiceUnavailable('vncauthproxy returned error status')
423

    
424
    # Verify that the VNC server settings haven't changed
425
    if not settings.TEST:
426
        if console_data != backend.get_instance_console(vm):
427
            raise faults.ServiceUnavailable('VNC Server settings changed.')
428

    
429
    console = {
430
        'type': 'vnc',
431
        'host': getfqdn(),
432
        'port': fwd['source_port'],
433
        'password': password}
434

    
435
    return console
436

    
437

    
438
def rename(server, new_name):
439
    """Rename a VirtualMachine."""
440
    old_name = server.name
441
    server.name = new_name
442
    server.save()
443
    log.info("Renamed server '%s' from '%s' to '%s'", server, old_name,
444
             new_name)
445
    return server
446

    
447

    
448
@transaction.commit_on_success
449
def create_port(*args, **kwargs):
450
    return _create_port(*args, **kwargs)
451

    
452

    
453
def _create_port(userid, network, machine=None, use_ipaddress=None,
454
                 address=None, name="", security_groups=None,
455
                 device_owner=None):
456
    """Create a new port on the specified network.
457

458
    Create a new Port(NetworkInterface model) on the specified Network. If
459
    'machine' is specified, the machine will be connected to the network using
460
    this port. If 'use_ipaddress' argument is specified, the port will be
461
    assigned this IPAddress. Otherwise, an IPv4 address from the IPv4 subnet
462
    will be allocated.
463

464
    """
465
    if network.state != "ACTIVE":
466
        raise faults.BuildInProgress("Can not create port while network is in"
467
                                     " state %s" % network.state)
468
    if network.action == "DESTROY":
469
        msg = "Can not create port. Network %s is being deleted."
470
        raise faults.Conflict(msg % network.id)
471
    ipaddress = None
472
    if use_ipaddress is not None:
473
        # Use an existing IPAddress object.
474
        ipaddress = use_ipaddress
475
        if ipaddress and (ipaddress.network_id != network.id):
476
            msg = "IP Address %s does not belong to network %s"
477
            raise faults.Conflict(msg % (ipaddress.address, network.id))
478
    else:
479
        # If network has IPv4 subnets, try to allocate the address that the
480
        # the user specified or a random one.
481
        if network.subnets.filter(ipversion=4).exists():
482
            ipaddress = ips.allocate_ip(network, userid=userid,
483
                                        address=address)
484
        elif address is not None:
485
            raise faults.BadRequest("Address %s is not a valid IP for the"
486
                                    " defined network subnets" % address)
487

    
488
    if ipaddress is not None and ipaddress.nic is not None:
489
        raise faults.Conflict("IP address '%s' is already in use" %
490
                              ipaddress.address)
491

    
492
    port = NetworkInterface.objects.create(network=network,
493
                                           state="DOWN",
494
                                           userid=userid,
495
                                           device_owner=None,
496
                                           name=name)
497

    
498
    # add the security groups if any
499
    if security_groups:
500
        port.security_groups.add(*security_groups)
501

    
502
    if ipaddress is not None:
503
        # Associate IPAddress with the Port
504
        ipaddress.nic = port
505
        ipaddress.save()
506

    
507
    if machine is not None:
508
        # Connect port to the instance.
509
        machine = connect(machine, network, port)
510
        jobID = machine.task_job_id
511
        log.info("Created Port %s with IP %s. Ganeti Job: %s",
512
                 port, ipaddress, jobID)
513
    else:
514
        log.info("Created Port %s with IP %s not attached to any instance",
515
                 port, ipaddress)
516

    
517
    return port
518

    
519

    
520
def associate_port_with_machine(port, machine):
521
    """Associate a Port with a VirtualMachine.
522

523
    Associate the port with the VirtualMachine and add an entry to the
524
    IPAddressLog if the port has a public IPv4 address from a public network.
525

526
    """
527
    if port.machine is not None:
528
        raise faults.Conflict("Port %s is already in use." % port.id)
529
    if port.network.public:
530
        ipv4_address = port.ipv4_address
531
        if ipv4_address is not None:
532
            ip_log = IPAddressLog.objects.create(server_id=machine.id,
533
                                                 network_id=port.network_id,
534
                                                 address=ipv4_address,
535
                                                 active=True)
536
            log.debug("Created IP log entry %s", ip_log)
537
    port.machine = machine
538
    port.state = "BUILD"
539
    port.device_owner = "vm"
540
    port.save()
541
    return port
542

    
543

    
544
@transaction.commit_on_success
545
def delete_port(port):
546
    """Delete a port by removing the NIC card from the instance.
547

548
    Send a Job to remove the NIC card from the instance. The port
549
    will be deleted and the associated IPv4 addressess will be released
550
    when the job completes successfully.
551

552
    """
553

    
554
    if port.machine is not None:
555
        vm = disconnect(port.machine, port)
556
        log.info("Removing port %s, Job: %s", port, vm.task_job_id)
557
    else:
558
        backend.remove_nic_ips(port)
559
        port.delete()
560
        log.info("Removed port %s", port)
561

    
562
    return port