Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / servers.py @ 3aecadc8

History | View | Annotate | Download (24.4 kB)

1
import logging
2

    
3
from socket import getfqdn
4
from functools import wraps
5
from django import dispatch
6
from django.db import transaction
7
from django.utils import simplejson as json
8

    
9
from snf_django.lib.api import faults
10
from django.conf import settings
11
from synnefo import quotas
12
from synnefo.api import util
13
from synnefo.logic import backend, ips
14
from synnefo.logic.backend_allocator import BackendAllocator
15
from synnefo.db.models import (NetworkInterface, VirtualMachine,
16
                               VirtualMachineMetadata, IPAddressLog, Network)
17
from vncauthproxy.client import request_forwarding as request_vnc_forwarding
18

    
19
log = logging.getLogger(__name__)
20

    
21
# server creation signal
22
server_created = dispatch.Signal(providing_args=["created_vm_params"])
23

    
24

    
25
def validate_server_action(vm, action):
26
    if vm.deleted:
27
        raise faults.BadRequest("Server '%s' has been deleted." % vm.id)
28

    
29
    # Destroyin a server should always be permitted
30
    if action == "DESTROY":
31
        return
32

    
33
    # Check that there is no pending action
34
    pending_action = vm.task
35
    if pending_action:
36
        if pending_action == "BUILD":
37
            raise faults.BuildInProgress("Server '%s' is being build." % vm.id)
38
        raise faults.BadRequest("Can not perform '%s' action while there is a"
39
                                " pending '%s'." % (action, pending_action))
40

    
41
    # Check if action can be performed to VM's operstate
42
    operstate = vm.operstate
43
    if operstate == "BUILD" and action != "BUILD":
44
        raise faults.BuildInProgress("Server '%s' is being build." % vm.id)
45
    elif (action == "START" and operstate != "STOPPED") or\
46
         (action == "STOP" and operstate != "STARTED") or\
47
         (action == "RESIZE" and operstate != "STOPPED") or\
48
         (action in ["CONNECT", "DISCONNECT"] and operstate != "STOPPED"
49
          and not settings.GANETI_USE_HOTPLUG):
50
        raise faults.BadRequest("Can not perform '%s' action while server is"
51
                                " in '%s' state." % (action, operstate))
52
    return
53

    
54

    
55
def server_command(action):
56
    """Handle execution of a server action.
57

58
    Helper function to validate and execute a server action, handle quota
59
    commission and update the 'task' of the VM in the DB.
60

61
    1) Check if action can be performed. If it can, then there must be no
62
       pending task (with the exception of DESTROY).
63
    2) Handle previous commission if unresolved:
64
       * If it is not pending and it to accept, then accept
65
       * If it is not pending and to reject or is pending then reject it. Since
66
       the action can be performed only if there is no pending task, then there
67
       can be no pending commission. The exception is DESTROY, but in this case
68
       the commission can safely be rejected, and the dispatcher will generate
69
       the correct ones!
70
    3) Issue new commission and associate it with the VM. Also clear the task.
71
    4) Send job to ganeti
72
    5) Update task and commit
73
    """
74
    def decorator(func):
75
        @wraps(func)
76
        @transaction.commit_on_success
77
        def wrapper(vm, *args, **kwargs):
78
            user_id = vm.userid
79
            validate_server_action(vm, action)
80
            vm.action = action
81

    
82
            commission_name = "client: api, resource: %s" % vm
83
            quotas.handle_resource_commission(vm, action=action,
84
                                              commission_name=commission_name)
85
            vm.save()
86

    
87
            # XXX: Special case for server creation!
88
            if action == "BUILD":
89
                # Perform a commit, because the VirtualMachine must be saved to
90
                # DB before the OP_INSTANCE_CREATE job in enqueued in Ganeti.
91
                # Otherwise, messages will arrive from snf-dispatcher about
92
                # this instance, before the VM is stored in DB.
93
                transaction.commit()
94
                # After committing the locks are released. Refetch the instance
95
                # to guarantee x-lock.
96
                vm = VirtualMachine.objects.select_for_update().get(id=vm.id)
97

    
98
            # Send the job to Ganeti and get the associated jobID
99
            try:
100
                job_id = func(vm, *args, **kwargs)
101
            except Exception as e:
102
                if vm.serial is not None:
103
                    # Since the job never reached Ganeti, reject the commission
104
                    log.debug("Rejecting commission: '%s', could not perform"
105
                              " action '%s': %s" % (vm.serial,  action, e))
106
                    transaction.rollback()
107
                    quotas.reject_serial(vm.serial)
108
                    transaction.commit()
109
                raise
110

    
111
            if action == "BUILD" and vm.serial is not None:
112
                # XXX: Special case for server creation: we must accept the
113
                # commission because the VM has been stored in DB. Also, if
114
                # communication with Ganeti fails, the job will never reach
115
                # Ganeti, and the commission will never be resolved.
116
                quotas.accept_serial(vm.serial)
117

    
118
            log.info("user: %s, vm: %s, action: %s, job_id: %s, serial: %s",
119
                     user_id, vm.id, action, job_id, vm.serial)
120

    
121
            # store the new task in the VM
122
            if job_id is not None:
123
                vm.task = action
124
                vm.task_job_id = job_id
125
            vm.save()
126

    
127
            return vm
128
        return wrapper
129
    return decorator
130

    
131

    
132
@transaction.commit_on_success
133
def create(userid, name, password, flavor, image, metadata={},
134
           personality=[], networks=None, use_backend=None):
135
    if use_backend is None:
136
        # Allocate server to a Ganeti backend
137
        use_backend = allocate_new_server(userid, flavor)
138

    
139
    # Create the ports for the server
140
    try:
141
        ports = create_instance_ports(userid, networks)
142
    except Exception as e:
143
        raise e
144

    
145
    # Fix flavor for archipelago
146
    disk_template, provider = util.get_flavor_provider(flavor)
147
    if provider:
148
        flavor.disk_template = disk_template
149
        flavor.disk_provider = provider
150
        flavor.disk_origin = None
151
        if provider == 'vlmc':
152
            flavor.disk_origin = image['checksum']
153
            image['backend_id'] = 'null'
154
    else:
155
        flavor.disk_provider = None
156

    
157
    # We must save the VM instance now, so that it gets a valid
158
    # vm.backend_vm_id.
159
    vm = VirtualMachine.objects.create(name=name,
160
                                       backend=use_backend,
161
                                       userid=userid,
162
                                       imageid=image["id"],
163
                                       flavor=flavor,
164
                                       operstate="BUILD")
165
    log.info("Created entry in DB for VM '%s'", vm)
166

    
167
    # Associate the ports with the server
168
    for index, port in enumerate(ports):
169
        associate_port_with_machine(port, vm)
170
        port.index = index
171
        port.save()
172

    
173
    for key, val in metadata.items():
174
        VirtualMachineMetadata.objects.create(
175
            meta_key=key,
176
            meta_value=val,
177
            vm=vm)
178

    
179
    # Create the server in Ganeti.
180
    vm = create_server(vm, ports, flavor, image, personality, password)
181

    
182
    return vm
183

    
184

    
185
@transaction.commit_on_success
186
def allocate_new_server(userid, flavor):
187
    """Allocate a new server to a Ganeti backend.
188

189
    Allocation is performed based on the owner of the server and the specified
190
    flavor. Also, backends that do not have a public IPv4 address are excluded
191
    from server allocation.
192

193
    This function runs inside a transaction, because after allocating the
194
    instance a commit must be performed in order to release all locks.
195

196
    """
197
    backend_allocator = BackendAllocator()
198
    use_backend = backend_allocator.allocate(userid, flavor)
199
    if use_backend is None:
200
        log.error("No available backend for VM with flavor %s", flavor)
201
        raise faults.ServiceUnavailable("No available backends")
202
    return use_backend
203

    
204

    
205
@server_command("BUILD")
206
def create_server(vm, nics, flavor, image, personality, password):
207
    # dispatch server created signal needed to trigger the 'vmapi', which
208
    # enriches the vm object with the 'config_url' attribute which must be
209
    # passed to the Ganeti job.
210
    server_created.send(sender=vm, created_vm_params={
211
        'img_id': image['backend_id'],
212
        'img_passwd': password,
213
        'img_format': str(image['format']),
214
        'img_personality': json.dumps(personality),
215
        'img_properties': json.dumps(image['metadata']),
216
    })
217
    # send job to Ganeti
218
    try:
219
        jobID = backend.create_instance(vm, nics, flavor, image)
220
    except:
221
        log.exception("Failed create instance '%s'", vm)
222
        jobID = None
223
        vm.operstate = "ERROR"
224
        vm.backendlogmsg = "Failed to send job to Ganeti."
225
        vm.save()
226
        vm.nics.all().update(state="ERROR")
227

    
228
    # At this point the job is enqueued in the Ganeti backend
229
    vm.backendjobid = jobID
230
    vm.save()
231
    log.info("User %s created VM %s, NICs %s, Backend %s, JobID %s",
232
             vm.userid, vm, nics, backend, str(jobID))
233

    
234
    return jobID
235

    
236

    
237
@server_command("DESTROY")
238
def destroy(vm):
239
    log.info("Deleting VM %s", vm)
240
    return backend.delete_instance(vm)
241

    
242

    
243
@server_command("START")
244
def start(vm):
245
    log.info("Starting VM %s", vm)
246
    return backend.startup_instance(vm)
247

    
248

    
249
@server_command("STOP")
250
def stop(vm):
251
    log.info("Stopping VM %s", vm)
252
    return backend.shutdown_instance(vm)
253

    
254

    
255
@server_command("REBOOT")
256
def reboot(vm, reboot_type):
257
    if reboot_type not in ("SOFT", "HARD"):
258
        raise faults.BadRequest("Malformed request. Invalid reboot"
259
                                " type %s" % reboot_type)
260
    log.info("Rebooting VM %s. Type %s", vm, reboot_type)
261

    
262
    return backend.reboot_instance(vm, reboot_type.lower())
263

    
264

    
265
@server_command("RESIZE")
266
def resize(vm, flavor):
267
    old_flavor = vm.flavor
268
    # User requested the same flavor
269
    if old_flavor.id == flavor.id:
270
        raise faults.BadRequest("Server '%s' flavor is already '%s'."
271
                                % (vm, flavor))
272
        return None
273
    # Check that resize can be performed
274
    if old_flavor.disk != flavor.disk:
275
        raise faults.BadRequest("Can not resize instance disk.")
276
    if old_flavor.disk_template != flavor.disk_template:
277
        raise faults.BadRequest("Can not change instance disk template.")
278

    
279
    log.info("Resizing VM from flavor '%s' to '%s", old_flavor, flavor)
280
    commission_info = {"cyclades.cpu": flavor.cpu - old_flavor.cpu,
281
                       "cyclades.ram": 1048576 * (flavor.ram - old_flavor.ram)}
282
    # Save serial to VM, since it is needed by server_command decorator
283
    vm.serial = quotas.issue_commission(user=vm.userid,
284
                                        source=quotas.DEFAULT_SOURCE,
285
                                        provisions=commission_info,
286
                                        name="resource: %s. resize" % vm)
287
    return backend.resize_instance(vm, vcpus=flavor.cpu, memory=flavor.ram)
288

    
289

    
290
@server_command("SET_FIREWALL_PROFILE")
291
def set_firewall_profile(vm, profile, nic):
292
    log.info("Setting VM %s, NIC %s, firewall %s", vm, nic, profile)
293

    
294
    if profile not in [x[0] for x in NetworkInterface.FIREWALL_PROFILES]:
295
        raise faults.BadRequest("Unsupported firewall profile")
296
    backend.set_firewall_profile(vm, profile=profile, nic=nic)
297
    return None
298

    
299

    
300
@server_command("CONNECT")
301
def connect(vm, network, port=None):
302
    if port is None:
303
        port = _create_port(vm.userid, network)
304
    associate_port_with_machine(port, vm)
305

    
306
    log.info("Creating NIC %s with IPv4 Address %s", port, port.ipv4_address)
307

    
308
    return backend.connect_to_network(vm, port)
309

    
310

    
311
@server_command("DISCONNECT")
312
def disconnect(vm, nic):
313
    log.info("Removing NIC %s from VM %s", nic, vm)
314
    return backend.disconnect_from_network(vm, nic)
315

    
316

    
317
def console(vm, console_type):
318
    """Arrange for an OOB console of the specified type
319

320
    This method arranges for an OOB console of the specified type.
321
    Only consoles of type "vnc" are supported for now.
322

323
    It uses a running instance of vncauthproxy to setup proper
324
    VNC forwarding with a random password, then returns the necessary
325
    VNC connection info to the caller.
326

327
    """
328
    log.info("Get console  VM %s, type %s", vm, console_type)
329

    
330
    # Use RAPI to get VNC console information for this instance
331
    if vm.operstate != "STARTED":
332
        raise faults.BadRequest('Server not in ACTIVE state.')
333

    
334
    if settings.TEST:
335
        console_data = {'kind': 'vnc', 'host': 'ganeti_node', 'port': 1000}
336
    else:
337
        console_data = backend.get_instance_console(vm)
338

    
339
    if console_data['kind'] != 'vnc':
340
        message = 'got console of kind %s, not "vnc"' % console_data['kind']
341
        raise faults.ServiceUnavailable(message)
342

    
343
    # Let vncauthproxy decide on the source port.
344
    # The alternative: static allocation, e.g.
345
    # sport = console_data['port'] - 1000
346
    sport = 0
347
    daddr = console_data['host']
348
    dport = console_data['port']
349
    password = util.random_password()
350

    
351
    if settings.TEST:
352
        fwd = {'source_port': 1234, 'status': 'OK'}
353
    else:
354
        fwd = request_vnc_forwarding(sport, daddr, dport, password)
355

    
356
    if fwd['status'] != "OK":
357
        raise faults.ServiceUnavailable('vncauthproxy returned error status')
358

    
359
    # Verify that the VNC server settings haven't changed
360
    if not settings.TEST:
361
        if console_data != backend.get_instance_console(vm):
362
            raise faults.ServiceUnavailable('VNC Server settings changed.')
363

    
364
    console = {
365
        'type': 'vnc',
366
        'host': getfqdn(),
367
        'port': fwd['source_port'],
368
        'password': password}
369

    
370
    return console
371

    
372

    
373
def rename(server, new_name):
374
    """Rename a VirtualMachine."""
375
    old_name = server.name
376
    server.name = new_name
377
    server.save()
378
    log.info("Renamed server '%s' from '%s' to '%s'", server, old_name,
379
             new_name)
380
    return server
381

    
382

    
383
@transaction.commit_on_success
384
def create_port(*args, **kwargs):
385
    return _create_port(*args, **kwargs)
386

    
387

    
388
def _create_port(userid, network, machine=None, use_ipaddress=None,
389
                 address=None, name="", security_groups=None,
390
                 device_owner=None):
391
    """Create a new port on the specified network.
392

393
    Create a new Port(NetworkInterface model) on the specified Network. If
394
    'machine' is specified, the machine will be connected to the network using
395
    this port. If 'use_ipaddress' argument is specified, the port will be
396
    assigned this IPAddress. Otherwise, an IPv4 address from the IPv4 subnet
397
    will be allocated.
398

399
    """
400
    if network.state != "ACTIVE":
401
        raise faults.BuildInProgress("Can not create port while network is in"
402
                                     " state %s" % network.state)
403
    if network.action == "DESTROY":
404
        msg = "Can not create port. Network %s is being deleted."
405
        raise faults.Conflict(msg % network.id)
406
    ipaddress = None
407
    if use_ipaddress is not None:
408
        # Use an existing IPAddress object.
409
        ipaddress = use_ipaddress
410
        if ipaddress and (ipaddress.network_id != network.id):
411
            msg = "IP Address %s does not belong to network %s"
412
            raise faults.Conflict(msg % (ipaddress.address, network.id))
413
    else:
414
        # If network has IPv4 subnets, try to allocate the address that the
415
        # the user specified or a random one.
416
        if network.subnets.filter(ipversion=4).exists():
417
            ipaddress = ips.allocate_ip(network, userid=userid,
418
                                        address=address)
419
        elif address is not None:
420
            raise faults.BadRequest("Address %s is not a valid IP for the"
421
                                    " defined network subnets" % address)
422

    
423
    if ipaddress is not None and ipaddress.nic is not None:
424
        raise faults.Conflict("IP address '%s' is already in use" %
425
                              ipaddress.address)
426

    
427
    port = NetworkInterface.objects.create(network=network,
428
                                           state="DOWN",
429
                                           userid=userid,
430
                                           device_owner=None,
431
                                           name=name)
432

    
433
    # add the security groups if any
434
    if security_groups:
435
        port.security_groups.add(*security_groups)
436

    
437
    if ipaddress is not None:
438
        # Associate IPAddress with the Port
439
        ipaddress.nic = port
440
        ipaddress.save()
441

    
442
    if machine is not None:
443
        # Connect port to the instance.
444
        machine = connect(machine, network, port)
445
        jobID = machine.task_job_id
446
        log.info("Created Port %s with IP %s. Ganeti Job: %s",
447
                 port, ipaddress, jobID)
448
    else:
449
        log.info("Created Port %s with IP %s not attached to any instance",
450
                 port, ipaddress)
451

    
452
    return port
453

    
454

    
455
def associate_port_with_machine(port, machine):
456
    """Associate a Port with a VirtualMachine.
457

458
    Associate the port with the VirtualMachine and add an entry to the
459
    IPAddressLog if the port has a public IPv4 address from a public network.
460

461
    """
462
    if port.machine is not None:
463
        raise faults.Conflict("Port %s is already in use." % port.id)
464
    if port.network.public:
465
        ipv4_address = port.ipv4_address
466
        if ipv4_address is not None:
467
            ip_log = IPAddressLog.objects.create(server_id=machine.id,
468
                                                 network_id=port.network_id,
469
                                                 address=ipv4_address,
470
                                                 active=True)
471
            log.debug("Created IP log entry %s", ip_log)
472
    port.machine = machine
473
    port.state = "BUILD"
474
    port.device_owner = "vm"
475
    port.save()
476
    return port
477

    
478

    
479
@transaction.commit_on_success
480
def delete_port(port):
481
    """Delete a port by removing the NIC card from the instance.
482

483
    Send a Job to remove the NIC card from the instance. The port
484
    will be deleted and the associated IPv4 addressess will be released
485
    when the job completes successfully. Deleting port that is connected to
486
    a public network is allowed only if the port has an associated floating IP
487
    address.
488

489
    """
490

    
491
    if port.network.public and not port.ips.filter(floating_ip=True,
492
                                                   deleted=False).exists():
493
        raise faults.Forbidden("Can not disconnect from public network.")
494

    
495
    if port.machine is not None:
496
        vm = disconnect(port.machine, port)
497
        log.info("Removing port %s, Job: %s", port, vm.task_job_id)
498
    else:
499
        backend.remove_nic_ips(port)
500
        port.delete()
501
        log.info("Removed port %s", port)
502

    
503
    return port
504

    
505

    
506
def create_instance_ports(user_id, networks=None):
507
    # First connect the instance to the networks defined by the admin
508
    forced_ports = create_ports_for_setting(user_id, category="admin")
509
    if networks is None:
510
        # If the user did not asked for any networks, connect instance to
511
        # default networks as defined by the admin
512
        ports = create_ports_for_setting(user_id, category="default")
513
    else:
514
        # Else just connect to the networks that the user defined
515
        ports = create_ports_for_request(user_id, networks)
516
    return forced_ports + ports
517

    
518

    
519
def create_ports_for_setting(user_id, category):
520
    if category == "admin":
521
        network_setting = settings.CYCLADES_FORCED_SERVER_NETWORKS
522
    elif category == "default":
523
        network_setting = settings.CYCLADES_DEFAULT_SERVER_NETWORKS
524
    else:
525
        raise ValueError("Unknown category: %s" % category)
526

    
527
    ports = []
528
    for network_ids in network_setting:
529
        # Treat even simple network IDs as group of networks with one network
530
        if type(network_ids) not in (list, tuple):
531
            network_ids = [network_ids]
532

    
533
        for network_id in network_ids:
534
            try:
535
                ports.append(_port_from_setting(user_id, network_id, category))
536
                break
537
            except faults.Conflict:
538
                # Try all network IDs in the network group
539
                pass
540

    
541
            # Diffrent exception for each category!
542
            if category == "admin":
543
                exception = faults.ServiceUnavailable
544
            else:
545
                exception = faults.Conflict
546
            raise exception("Cannot connect instance to any of the following"
547
                            " networks %s" % network_ids)
548
    return ports
549

    
550

    
551
def _port_from_setting(user_id, network_id, category):
552
    # TODO: Fix this..you need only IPv4 and only IPv6 network
553
    if network_id == "SNF:ANY_PUBLIC_IPV4":
554
        return create_public_ipv4_port(user_id, category=category)
555
    elif network_id == "SNF:ANY_PUBLIC_IPV6":
556
        return create_public_ipv6_port(user_id, category=category)
557
    elif network_id == "SNF:ANY_PUBLIC":
558
        try:
559
            return create_public_ipv4_port(user_id, category=category)
560
        except faults.Conflict:
561
            return create_public_ipv6_port(user_id, category=category)
562
    else:  # Case of network ID
563
        if category in ["user", "default"]:
564
            return _port_for_request(user_id, {"uuid": network_id})
565
        elif category == "admin":
566
            network = util.get_network(network_id, user_id, non_deleted=True)
567
            return _create_port(user_id, network)
568
        else:
569
            raise ValueError("Unknown category: %s" % category)
570

    
571

    
572
def create_public_ipv4_port(user_id, network=None, address=None,
573
                            category="user"):
574
    """Create a port in a public IPv4 network.
575

576
    Create a port in a public IPv4 network (that may also have an IPv6
577
    subnet). If the category is 'user' or 'default' this will try to use
578
    one of the users floating IPs. If the category is 'admin' will
579
    create a port to the public network (without floating IPs or quotas).
580

581
    """
582
    if category in ["user", "default"]:
583
        if address is None:
584
            ipaddress = ips.get_free_floating_ip(user_id, network)
585
        else:
586
            ipaddress = util.get_floating_ip_by_address(user_id, address,
587
                                                        for_update=True)
588
    elif category == "admin":
589
        if network is None:
590
            ipaddress = ips.allocate_public_ip(user_id)
591
        else:
592
            ipaddress = ips.allocate_ip(network, user_id)
593
    else:
594
        raise ValueError("Unknown category: %s" % category)
595
    if network is None:
596
        network = ipaddress.network
597
    return _create_port(user_id, network, use_ipaddress=ipaddress)
598

    
599

    
600
def create_public_ipv6_port(user_id, category=None):
601
    """Create a port in a public IPv6 only network."""
602
    networks = Network.objects.filter(public=True, deleted=False,
603
                                      drained=False, subnets__ipversion=6)\
604
                              .exclude(subnets__ipversion=4)
605
    if networks:
606
        return _create_port(user_id, networks[0])
607
    else:
608
        msg = "No available IPv6 only network!"
609
        log.error(msg)
610
        raise faults.Conflict(msg)
611

    
612

    
613
def create_ports_for_request(user_id, networks):
614
    """Create the server ports requested by the user.
615

616
    Create the ports for the new servers as requested in the 'networks'
617
    attribute. The networks attribute contains either a list of network IDs
618
    ('uuid') or a list of ports IDs ('port'). In case of network IDs, the user
619
    can also specify an IPv4 address ('fixed_ip'). In order to connect to a
620
    public network, the 'fixed_ip' attribute must contain the IPv4 address of a
621
    floating IP. If the network is public but the 'fixed_ip' attribute is not
622
    specified, the system will automatically reserve one of the users floating
623
    IPs.
624

625
    """
626
    return [_port_for_request(user_id, network) for network in networks]
627

    
628

    
629
def _port_for_request(user_id, network_dict):
630
    port_id = network_dict.get("port")
631
    network_id = network_dict.get("uuid")
632
    if port_id is not None:
633
        return util.get_port(port_id, user_id, for_update=True)
634
    elif network_id is not None:
635
        address = network_dict.get("fixed_ip")
636
        network = util.get_network(network_id, user_id, non_deleted=True)
637
        if network.public:
638
            if network.subnet4 is not None:
639
                if not "fixed_ip" in network_dict:
640
                    return create_public_ipv4_port(user_id, network)
641
                elif address is None:
642
                    msg = "Cannot connect to public network"
643
                    raise faults.BadRequest(msg % network.id)
644
                else:
645
                    return create_public_ipv4_port(user_id, network, address)
646
            else:
647
                raise faults.Forbidden("Cannot connect to IPv6 only public"
648
                                       " network %" % network.id)
649
        else:
650
            return _create_port(user_id, network, address=address)
651
    else:
652
        raise faults.BadRequest("Network 'uuid' or 'port' attribute"
653
                                " is required.")