Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / servers.py @ 99988465

History | View | Annotate | Download (20.2 kB)

1
import logging
2

    
3
from socket import getfqdn
4
from functools import wraps
5
from django import dispatch
6
from django.db import transaction
7
from django.utils import simplejson as json
8

    
9
from snf_django.lib.api import faults
10
from django.conf import settings
11
from synnefo import quotas
12
from synnefo.api import util
13
from synnefo.logic import backend
14
from synnefo.logic.backend_allocator import BackendAllocator
15
from synnefo.db import pools
16
from synnefo.db.models import (NetworkInterface, VirtualMachine,
17
                               VirtualMachineMetadata, IPAddressLog)
18
from vncauthproxy.client import request_forwarding as request_vnc_forwarding
19

    
20
log = logging.getLogger(__name__)
21

    
22
# server creation signal
23
server_created = dispatch.Signal(providing_args=["created_vm_params"])
24

    
25

    
26
def validate_server_action(vm, action):
27
    if vm.deleted:
28
        raise faults.BadRequest("Server '%s' has been deleted." % vm.id)
29

    
30
    # Destroyin a server should always be permitted
31
    if action == "DESTROY":
32
        return
33

    
34
    # Check that there is no pending action
35
    pending_action = vm.task
36
    if pending_action:
37
        if pending_action == "BUILD":
38
            raise faults.BuildInProgress("Server '%s' is being build." % vm.id)
39
        raise faults.BadRequest("Can not perform '%s' action while there is a"
40
                                " pending '%s'." % (action, pending_action))
41

    
42
    # Check if action can be performed to VM's operstate
43
    operstate = vm.operstate
44
    if operstate == "BUILD" and action != "BUILD":
45
        raise faults.BuildInProgress("Server '%s' is being build." % vm.id)
46
    elif (action == "START" and operstate != "STOPPED") or\
47
         (action == "STOP" and operstate != "STARTED") or\
48
         (action == "RESIZE" and operstate != "STOPPED") or\
49
         (action in ["CONNECT", "DISCONNECT"] and operstate != "STOPPED"
50
          and not settings.GANETI_USE_HOTPLUG):
51
        raise faults.BadRequest("Can not perform '%s' action while server is"
52
                                " in '%s' state." % (action, operstate))
53
    return
54

    
55

    
56
def server_command(action):
57
    """Handle execution of a server action.
58

59
    Helper function to validate and execute a server action, handle quota
60
    commission and update the 'task' of the VM in the DB.
61

62
    1) Check if action can be performed. If it can, then there must be no
63
       pending task (with the exception of DESTROY).
64
    2) Handle previous commission if unresolved:
65
       * If it is not pending and it to accept, then accept
66
       * If it is not pending and to reject or is pending then reject it. Since
67
       the action can be performed only if there is no pending task, then there
68
       can be no pending commission. The exception is DESTROY, but in this case
69
       the commission can safely be rejected, and the dispatcher will generate
70
       the correct ones!
71
    3) Issue new commission and associate it with the VM. Also clear the task.
72
    4) Send job to ganeti
73
    5) Update task and commit
74
    """
75
    def decorator(func):
76
        @wraps(func)
77
        @transaction.commit_on_success
78
        def wrapper(vm, *args, **kwargs):
79
            user_id = vm.userid
80
            validate_server_action(vm, action)
81
            vm.action = action
82

    
83
            commission_name = "client: api, resource: %s" % vm
84
            quotas.handle_resource_commission(vm, action=action,
85
                                              commission_name=commission_name)
86
            vm.save()
87

    
88
            # XXX: Special case for server creation!
89
            if action == "BUILD":
90
                # Perform a commit, because the VirtualMachine must be saved to
91
                # DB before the OP_INSTANCE_CREATE job in enqueued in Ganeti.
92
                # Otherwise, messages will arrive from snf-dispatcher about
93
                # this instance, before the VM is stored in DB.
94
                transaction.commit()
95
                # After committing the locks are released. Refetch the instance
96
                # to guarantee x-lock.
97
                vm = VirtualMachine.objects.select_for_update().get(id=vm.id)
98

    
99
            # Send the job to Ganeti and get the associated jobID
100
            try:
101
                job_id = func(vm, *args, **kwargs)
102
            except Exception as e:
103
                if vm.serial is not None:
104
                    # Since the job never reached Ganeti, reject the commission
105
                    log.debug("Rejecting commission: '%s', could not perform"
106
                              " action '%s': %s" % (vm.serial,  action, e))
107
                    transaction.rollback()
108
                    quotas.reject_serial(vm.serial)
109
                    transaction.commit()
110
                raise
111

    
112
            if action == "BUILD" and vm.serial is not None:
113
                # XXX: Special case for server creation: we must accept the
114
                # commission because the VM has been stored in DB. Also, if
115
                # communication with Ganeti fails, the job will never reach
116
                # Ganeti, and the commission will never be resolved.
117
                quotas.accept_serial(vm.serial)
118

    
119
            log.info("user: %s, vm: %s, action: %s, job_id: %s, serial: %s",
120
                     user_id, vm.id, action, job_id, vm.serial)
121

    
122
            # store the new task in the VM
123
            if job_id is not None:
124
                vm.task = action
125
                vm.task_job_id = job_id
126
            vm.save()
127

    
128
            return vm
129
        return wrapper
130
    return decorator
131

    
132

    
133
@transaction.commit_on_success
134
def create(userid, name, password, flavor, image, metadata={},
135
           personality=[], networks=None, floating_ips=None,
136
           use_backend=None):
137
    if use_backend is None:
138
        # Allocate server to a Ganeti backend
139
        use_backend = allocate_new_server(userid, flavor)
140

    
141
    if networks is None:
142
        networks = []
143
    if floating_ips is None:
144
        floating_ips = []
145

    
146
    # Fix flavor for archipelago
147
    disk_template, provider = util.get_flavor_provider(flavor)
148
    if provider:
149
        flavor.disk_template = disk_template
150
        flavor.disk_provider = provider
151
        flavor.disk_origin = None
152
        if provider == 'vlmc':
153
            flavor.disk_origin = image['checksum']
154
            image['backend_id'] = 'null'
155
    else:
156
        flavor.disk_provider = None
157

    
158
    # We must save the VM instance now, so that it gets a valid
159
    # vm.backend_vm_id.
160
    vm = VirtualMachine.objects.create(name=name,
161
                                       backend=use_backend,
162
                                       userid=userid,
163
                                       imageid=image["id"],
164
                                       flavor=flavor,
165
                                       operstate="BUILD")
166
    log.info("Created entry in DB for VM '%s'", vm)
167

    
168
    nics = create_instance_nics(vm, userid, networks, floating_ips)
169

    
170
    for key, val in metadata.items():
171
        VirtualMachineMetadata.objects.create(
172
            meta_key=key,
173
            meta_value=val,
174
            vm=vm)
175

    
176
    # Create the server in Ganeti.
177
    vm = create_server(vm, nics, flavor, image, personality, password)
178

    
179
    return vm
180

    
181

    
182
@transaction.commit_on_success
183
def allocate_new_server(userid, flavor):
184
    """Allocate a new server to a Ganeti backend.
185

186
    Allocation is performed based on the owner of the server and the specified
187
    flavor. Also, backends that do not have a public IPv4 address are excluded
188
    from server allocation.
189

190
    This function runs inside a transaction, because after allocating the
191
    instance a commit must be performed in order to release all locks.
192

193
    """
194
    backend_allocator = BackendAllocator()
195
    use_backend = backend_allocator.allocate(userid, flavor)
196
    if use_backend is None:
197
        log.error("No available backend for VM with flavor %s", flavor)
198
        raise faults.ServiceUnavailable("No available backends")
199
    return use_backend
200

    
201

    
202
@server_command("BUILD")
203
def create_server(vm, nics, flavor, image, personality, password):
204
    # dispatch server created signal needed to trigger the 'vmapi', which
205
    # enriches the vm object with the 'config_url' attribute which must be
206
    # passed to the Ganeti job.
207
    server_created.send(sender=vm, created_vm_params={
208
        'img_id': image['backend_id'],
209
        'img_passwd': password,
210
        'img_format': str(image['format']),
211
        'img_personality': json.dumps(personality),
212
        'img_properties': json.dumps(image['metadata']),
213
    })
214
    # send job to Ganeti
215
    try:
216
        jobID = backend.create_instance(vm, nics, flavor, image)
217
    except:
218
        log.exception("Failed create instance '%s'", vm)
219
        jobID = None
220
        vm.operstate = "ERROR"
221
        vm.backendlogmsg = "Failed to send job to Ganeti."
222
        vm.save()
223
        vm.nics.all().update(state="ERROR")
224

    
225
    # At this point the job is enqueued in the Ganeti backend
226
    vm.backendjobid = jobID
227
    vm.save()
228
    log.info("User %s created VM %s, NICs %s, Backend %s, JobID %s",
229
             vm.userid, vm, nics, backend, str(jobID))
230

    
231
    return jobID
232

    
233

    
234
def create_instance_nics(vm, userid, networks=[], floating_ips=[]):
235
    """Create NICs for VirtualMachine.
236

237
    Helper function for allocating IP addresses and creating NICs in the DB
238
    for a VirtualMachine. Created NICs are the combination of the default
239
    network policy (defined by administration settings) and the networks
240
    defined by the user.
241

242
    """
243
    ports = []
244
    for network_id in settings.DEFAULT_INSTANCE_NETWORKS:
245
        if network_id == "SNF:ANY_PUBLIC":
246
            ipaddress = util.allocate_public_ip(userid=userid,
247
                                                backend=vm.backend)
248
            port = _create_port(userid, network=ipaddress.network,
249
                                use_ipaddress=ipaddress)
250
        else:
251
            try:
252
                network = util.get_network(network_id, userid,
253
                                           non_deleted=True)
254
            except faults.ItemNotFound:
255
                msg = "Invalid configuration. Setting"\
256
                      " 'DEFAULT_INSTANCE_NETWORKS' contains invalid"\
257
                      " network '%s'" % network_id
258
                log.error(msg)
259
                raise faults.InternalServerError(msg)
260
            port = _create_port(userid, network)
261
        ports.append(port)
262

    
263
    for floating_ip_id in floating_ips:
264
        floating_ip = util.get_floating_ip_by_id(vm.userid, floating_ip_id,
265
                                                 for_update=True)
266
        port = _create_port(userid, network=floating_ip.network,
267
                            use_ipaddress=floating_ip)
268
        ports.append(port)
269

    
270
    for net in networks:
271
        port_id = net.get("port")
272
        net_id = net.get("uuid")
273
        if port_id is not None:
274
            port = util.get_port(port_id, userid, for_update=True)
275
            ports.append(port)
276
        elif net_id is not None:
277
            network = util.get_network(net_id, userid, non_deleted=True)
278
            if network.public:
279
                raise faults.Forbidden("Can not connect to public network")
280
            address = net.get("fixed_ip")
281
            port = _create_port(userid, network, address=address)
282
            ports.append(port)
283
        else:
284
            raise faults.BadRequest("")
285

    
286
    for index, port in enumerate(ports):
287
        associate_port_with_machine(port, vm)
288
        port.index = index
289
        port.save()
290
    return ports
291

    
292

    
293
@server_command("DESTROY")
294
def destroy(vm):
295
    log.info("Deleting VM %s", vm)
296
    return backend.delete_instance(vm)
297

    
298

    
299
@server_command("START")
300
def start(vm):
301
    log.info("Starting VM %s", vm)
302
    return backend.startup_instance(vm)
303

    
304

    
305
@server_command("STOP")
306
def stop(vm):
307
    log.info("Stopping VM %s", vm)
308
    return backend.shutdown_instance(vm)
309

    
310

    
311
@server_command("REBOOT")
312
def reboot(vm, reboot_type):
313
    if reboot_type not in ("SOFT", "HARD"):
314
        raise faults.BadRequest("Malformed request. Invalid reboot"
315
                                " type %s" % reboot_type)
316
    log.info("Rebooting VM %s. Type %s", vm, reboot_type)
317

    
318
    return backend.reboot_instance(vm, reboot_type.lower())
319

    
320

    
321
@server_command("RESIZE")
322
def resize(vm, flavor):
323
    old_flavor = vm.flavor
324
    # User requested the same flavor
325
    if old_flavor.id == flavor.id:
326
        raise faults.BadRequest("Server '%s' flavor is already '%s'."
327
                                % (vm, flavor))
328
        return None
329
    # Check that resize can be performed
330
    if old_flavor.disk != flavor.disk:
331
        raise faults.BadRequest("Can not resize instance disk.")
332
    if old_flavor.disk_template != flavor.disk_template:
333
        raise faults.BadRequest("Can not change instance disk template.")
334

    
335
    log.info("Resizing VM from flavor '%s' to '%s", old_flavor, flavor)
336
    commission_info = {"cyclades.cpu": flavor.cpu - old_flavor.cpu,
337
                       "cyclades.ram": 1048576 * (flavor.ram - old_flavor.ram)}
338
    # Save serial to VM, since it is needed by server_command decorator
339
    vm.serial = quotas.issue_commission(user=vm.userid,
340
                                        source=quotas.DEFAULT_SOURCE,
341
                                        provisions=commission_info,
342
                                        name="resource: %s. resize" % vm)
343
    return backend.resize_instance(vm, vcpus=flavor.cpu, memory=flavor.ram)
344

    
345

    
346
@server_command("SET_FIREWALL_PROFILE")
347
def set_firewall_profile(vm, profile, nic):
348
    log.info("Setting VM %s, NIC %s, firewall %s", vm, nic, profile)
349

    
350
    if profile not in [x[0] for x in NetworkInterface.FIREWALL_PROFILES]:
351
        raise faults.BadRequest("Unsupported firewall profile")
352
    backend.set_firewall_profile(vm, profile=profile, nic=nic)
353
    return None
354

    
355

    
356
@server_command("CONNECT")
357
def connect(vm, network, port=None):
358
    if port is None:
359
        port = _create_port(vm.userid, network)
360
    associate_port_with_machine(port, vm)
361

    
362
    log.info("Creating NIC %s with IPv4 Address %s", port, port.ipv4_address)
363

    
364
    return backend.connect_to_network(vm, port)
365

    
366

    
367
@server_command("DISCONNECT")
368
def disconnect(vm, nic):
369
    log.info("Removing NIC %s from VM %s", nic, vm)
370
    return backend.disconnect_from_network(vm, nic)
371

    
372

    
373
def console(vm, console_type):
374
    """Arrange for an OOB console of the specified type
375

376
    This method arranges for an OOB console of the specified type.
377
    Only consoles of type "vnc" are supported for now.
378

379
    It uses a running instance of vncauthproxy to setup proper
380
    VNC forwarding with a random password, then returns the necessary
381
    VNC connection info to the caller.
382

383
    """
384
    log.info("Get console  VM %s, type %s", vm, console_type)
385

    
386
    # Use RAPI to get VNC console information for this instance
387
    if vm.operstate != "STARTED":
388
        raise faults.BadRequest('Server not in ACTIVE state.')
389

    
390
    if settings.TEST:
391
        console_data = {'kind': 'vnc', 'host': 'ganeti_node', 'port': 1000}
392
    else:
393
        console_data = backend.get_instance_console(vm)
394

    
395
    if console_data['kind'] != 'vnc':
396
        message = 'got console of kind %s, not "vnc"' % console_data['kind']
397
        raise faults.ServiceUnavailable(message)
398

    
399
    # Let vncauthproxy decide on the source port.
400
    # The alternative: static allocation, e.g.
401
    # sport = console_data['port'] - 1000
402
    sport = 0
403
    daddr = console_data['host']
404
    dport = console_data['port']
405
    password = util.random_password()
406

    
407
    if settings.TEST:
408
        fwd = {'source_port': 1234, 'status': 'OK'}
409
    else:
410
        fwd = request_vnc_forwarding(sport, daddr, dport, password)
411

    
412
    if fwd['status'] != "OK":
413
        raise faults.ServiceUnavailable('vncauthproxy returned error status')
414

    
415
    # Verify that the VNC server settings haven't changed
416
    if not settings.TEST:
417
        if console_data != backend.get_instance_console(vm):
418
            raise faults.ServiceUnavailable('VNC Server settings changed.')
419

    
420
    console = {
421
        'type': 'vnc',
422
        'host': getfqdn(),
423
        'port': fwd['source_port'],
424
        'password': password}
425

    
426
    return console
427

    
428

    
429
def rename(server, new_name):
430
    """Rename a VirtualMachine."""
431
    old_name = server.name
432
    server.name = new_name
433
    server.save()
434
    log.info("Renamed server '%s' from '%s' to '%s'", server, old_name,
435
             new_name)
436
    return server
437

    
438

    
439
@transaction.commit_on_success
440
def create_port(*args, **kwargs):
441
    return _create_port(*args, **kwargs)
442

    
443

    
444
def _create_port(userid, network, machine=None, use_ipaddress=None,
445
                 address=None, name="", security_groups=None,
446
                 device_owner=None):
447
    """Create a new port on the specified network.
448

449
    Create a new Port(NetworkInterface model) on the specified Network. If
450
    'machine' is specified, the machine will be connected to the network using
451
    this port. If 'use_ipaddress' argument is specified, the port will be
452
    assigned this IPAddress. Otherwise, an IPv4 address from the IPv4 subnet
453
    will be allocated.
454

455
    """
456
    if network.state != "ACTIVE":
457
        raise faults.BuildInProgress("Can not create port while network is in"
458
                                     " state %s" % network.state)
459
    ipaddress = None
460
    if use_ipaddress is not None:
461
        # Use an existing IPAddress object.
462
        ipaddress = use_ipaddress
463
        if ipaddress and (ipaddress.network_id != network.id):
464
            msg = "IP Address %s does not belong to network %s"
465
            raise faults.Conflict(msg % (ipaddress.address, network.id))
466
    else:
467
        # If network has IPv4 subnets, try to allocate the address that the
468
        # the user specified or a random one.
469
        if network.subnets.filter(ipversion=4).exists():
470
            try:
471
                ipaddress = util.allocate_ip(network, userid=userid,
472
                                             address=address)
473
            except pools.ValueNotAvailable:
474
                msg = "Address %s is already in use." % address
475
                raise faults.Conflict(msg)
476
        elif address is not None:
477
            raise faults.BadRequest("Address %s is not a valid IP for the"
478
                                    " defined network subnets" % address)
479

    
480
    if ipaddress is not None and ipaddress.nic is not None:
481
        raise faults.Conflict("IP address '%s' is already in use" %
482
                              ipaddress.address)
483

    
484
    port = NetworkInterface.objects.create(network=network,
485
                                           state="DOWN",
486
                                           userid=userid,
487
                                           device_owner=None,
488
                                           name=name)
489

    
490
    # add the security groups if any
491
    if security_groups:
492
        port.security_groups.add(*security_groups)
493

    
494
    if ipaddress is not None:
495
        # Associate IPAddress with the Port
496
        ipaddress.nic = port
497
        ipaddress.save()
498

    
499
    if machine is not None:
500
        # Connect port to the instance.
501
        machine = connect(machine, network, port)
502
        jobID = machine.task_job_id
503
        log.info("Created Port %s with IP %s. Ganeti Job: %s",
504
                 port, ipaddress, jobID)
505
    else:
506
        log.info("Created Port %s with IP %s not attached to any instance",
507
                 port, ipaddress)
508

    
509
    return port
510

    
511

    
512
def associate_port_with_machine(port, machine):
513
    """Associate a Port with a VirtualMachine.
514

515
    Associate the port with the VirtualMachine and add an entry to the
516
    IPAddressLog if the port has a public IPv4 address from a public network.
517

518
    """
519
    if port.machine is not None:
520
        raise faults.Conflict("Port %s is already in use." % port.id)
521
    if port.network.public:
522
        ipv4_address = port.ipv4_address
523
        if ipv4_address is not None:
524
            ip_log = IPAddressLog.objects.create(server_id=machine.id,
525
                                                 network_id=port.network_id,
526
                                                 address=ipv4_address,
527
                                                 active=True)
528
            log.debug("Created IP log entry %s", ip_log)
529
    port.machine = machine
530
    port.state = "BUILD"
531
    port.device_owner = "vm"
532
    port.save()
533
    return port
534

    
535

    
536
@transaction.commit_on_success
537
def delete_port(port):
538
    """Delete a port by removing the NIC card from the instance.
539

540
    Send a Job to remove the NIC card from the instance. The port
541
    will be deleted and the associated IPv4 addressess will be released
542
    when the job completes successfully.
543

544
    """
545

    
546
    if port.machine is not None:
547
        vm = disconnect(port.machine, port)
548
        log.info("Removing port %s, Job: %s", port, vm.task_job_id)
549
    else:
550
        backend.remove_nic_ips(port)
551
        port.delete()
552
        log.info("Removed port %s", port)
553

    
554
    return port