Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 5f90e24c

History | View | Annotate | Download (44.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
59
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
60
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
UNKNOWN_NIC_PREFIX = "unknown-"
62

    
63

    
64
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
65
    """Handle quotas for updated VirtualMachine.
66

67
    Update quotas for the updated VirtualMachine based on the job that run on
68
    the Ganeti backend. If a commission has been already issued for this job,
69
    then this commission is just accepted or rejected based on the job status.
70
    Otherwise, a new commission for the given change is issued, that is also in
71
    force and auto-accept mode. In this case, previous commissions are
72
    rejected, since they reflect a previous state of the VM.
73

74
    """
75
    if job_status not in rapi.JOB_STATUS_FINALIZED:
76
        return vm
77

    
78
    # Check successful completion of a job will trigger any quotable change in
79
    # the VM state.
80
    action = utils.get_action_from_opcode(job_opcode, job_fields)
81
    if action == "BUILD":
82
        # Quotas for new VMs are automatically accepted by the API
83
        return vm
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == rapi.JOB_STATUS_SUCCESS:
92
            quotas.accept_resource_serial(vm)
93
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_resource_serial(vm)
97
    elif job_status == rapi.JOB_STATUS_SUCCESS:
98
        commission_info = quotas.get_commission_info(resource=vm,
99
                                                     action=action,
100
                                                     action_fields=job_fields)
101
        if commission_info is not None:
102
            # Commission for this change has not been issued, or the issued
103
            # commission was unaware of the current change. Reject all previous
104
            # commissions and create a new one in forced mode!
105
            log.debug("Expected job was %s. Processing job %s. "
106
                      "Attached serial %s",
107
                      vm.task_job_id, job_id, vm.serial)
108
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
109
                      % (vm, job_id))
110
            serial = quotas.handle_resource_commission(
111
                vm, action,
112
                action_fields=job_fields,
113
                commission_name=reason,
114
                force=True,
115
                auto_accept=True)
116
            log.debug("Issued new commission: %s", serial)
117
    return vm
118

    
119

    
120
@transaction.commit_on_success
121
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
123
    """Process a job progress notification from the backend
124

125
    Process an incoming message from the backend (currently Ganeti).
126
    Job notifications with a terminating status (sucess, error, or canceled),
127
    also update the operating state of the VM.
128

129
    """
130
    # See #1492, #1031, #1111 why this line has been removed
131
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
132
    if status not in [x[0] for x in BACKEND_STATUSES]:
133
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
134

    
135
    vm.backendjobid = jobid
136
    vm.backendjobstatus = status
137
    vm.backendopcode = opcode
138
    vm.backendlogmsg = logmsg
139

    
140
    if status not in rapi.JOB_STATUS_FINALIZED:
141
        vm.save()
142
        return
143

    
144
    if job_fields is None:
145
        job_fields = {}
146

    
147
    new_operstate = None
148
    new_flavor = None
149
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
150

    
151
    if status == rapi.JOB_STATUS_SUCCESS:
152
        # If job succeeds, change operating state if needed
153
        if state_for_success is not None:
154
            new_operstate = state_for_success
155

    
156
        beparams = job_fields.get("beparams", None)
157
        if beparams:
158
            # Change the flavor of the VM
159
            new_flavor = _process_resize(vm, beparams)
160

    
161
        # Update backendtime only for jobs that have been successfully
162
        # completed, since only these jobs update the state of the VM. Else a
163
        # "race condition" may occur when a successful job (e.g.
164
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
165
        # in reversed order.
166
        vm.backendtime = etime
167

    
168
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
169
        # Update the NICs of the VM
170
        _process_net_status(vm, etime, nics)
171

    
172
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
173
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
174
                                                     rapi.JOB_STATUS_ERROR):
175
        new_operstate = "ERROR"
176
        vm.backendtime = etime
177
        # Update state of associated NICs
178
        vm.nics.all().update(state="ERROR")
179
    elif opcode == 'OP_INSTANCE_REMOVE':
180
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
181
        # when no instance exists at the Ganeti backend.
182
        # See ticket #799 for all the details.
183
        if (status == rapi.JOB_STATUS_SUCCESS or
184
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
185
            # VM has been deleted
186
            for nic in vm.nics.all():
187
                # Release the IP
188
                remove_nic_ips(nic)
189
                # And delete the NIC.
190
                nic.delete()
191
            vm.deleted = True
192
            new_operstate = state_for_success
193
            vm.backendtime = etime
194
            status = rapi.JOB_STATUS_SUCCESS
195
            #status = "success"
196
            vm.volumes.all().update(deleted=True, machine=None)
197

    
198
    if status in rapi.JOB_STATUS_FINALIZED:
199
        # Job is finalized: Handle quotas/commissioning
200
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
201
                              job_status=status, job_fields=job_fields)
202
        # and clear task fields
203
        if vm.task_job_id == jobid:
204
            vm.task = None
205
            vm.task_job_id = None
206

    
207
    if new_operstate is not None:
208
        vm.operstate = new_operstate
209
    if new_flavor is not None:
210
        vm.flavor = new_flavor
211

    
212
    vm.save()
213

    
214

    
215
def _process_resize(vm, beparams):
216
    """Change flavor of a VirtualMachine based on new beparams."""
217
    old_flavor = vm.flavor
218
    vcpus = beparams.get("vcpus", old_flavor.cpu)
219
    ram = beparams.get("maxmem", old_flavor.ram)
220
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
221
        return
222
    try:
223
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
224
                                        disk=old_flavor.disk,
225
                                        disk_template=old_flavor.disk_template)
226
    except Flavor.DoesNotExist:
227
        raise Exception("Cannot find flavor for VM")
228
    return new_flavor
229

    
230

    
231
@transaction.commit_on_success
232
def process_net_status(vm, etime, nics):
233
    """Wrap _process_net_status inside transaction."""
234
    _process_net_status(vm, etime, nics)
235

    
236

    
237
def _process_net_status(vm, etime, nics):
238
    """Process a net status notification from the backend
239

240
    Process an incoming message from the Ganeti backend,
241
    detailing the NIC configuration of a VM instance.
242

243
    Update the state of the VM in the DB accordingly.
244

245
    """
246
    ganeti_nics = process_ganeti_nics(nics)
247
    db_nics = dict([(nic.id, nic)
248
                    for nic in vm.nics.select_related("network")
249
                                      .prefetch_related("ips")])
250

    
251
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
252
        db_nic = db_nics.get(nic_name)
253
        ganeti_nic = ganeti_nics.get(nic_name)
254
        if ganeti_nic is None:
255
            if nic_is_stale(vm, nic):
256
                log.debug("Removing stale NIC '%s'" % db_nic)
257
                remove_nic_ips(db_nic)
258
                db_nic.delete()
259
            else:
260
                log.info("NIC '%s' is still being created" % db_nic)
261
        elif db_nic is None:
262
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
263
                   " fix this issue!" % (nic_name, vm))
264
            log.error(msg)
265
            continue
266
        elif not nics_are_equal(db_nic, ganeti_nic):
267
            for f in SIMPLE_NIC_FIELDS:
268
                # Update the NIC in DB with the values from Ganeti NIC
269
                setattr(db_nic, f, ganeti_nic[f])
270
                db_nic.save()
271

    
272
            # Special case where the IPv4 address has changed, because you
273
            # need to release the old IPv4 address and reserve the new one
274
            gnt_ipv4_address = ganeti_nic["ipv4_address"]
275
            db_ipv4_address = db_nic.ipv4_address
276
            if db_ipv4_address != gnt_ipv4_address:
277
                change_address_of_port(db_nic, vm.userid,
278
                                       old_address=db_ipv4_address,
279
                                       new_address=gnt_ipv4_address,
280
                                       version=4)
281

    
282
            gnt_ipv6_address = ganeti_nic["ipv6_address"]
283
            db_ipv6_address = db_nic.ipv6_address
284
            if db_ipv6_address != gnt_ipv6_address:
285
                change_address_of_port(db_nic, vm.userid,
286
                                       old_address=db_ipv6_address,
287
                                       new_address=gnt_ipv6_address,
288
                                       version=6)
289

    
290
    vm.backendtime = etime
291
    vm.save()
292

    
293

    
294
def change_address_of_port(port, userid, old_address, new_address, version):
295
    """Change."""
296
    if old_address is not None:
297
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
298
               % (version, port.machine_id, old_address, new_address))
299
        log.error(msg)
300

    
301
    # Remove the old IP address
302
    remove_nic_ips(port, version=version)
303

    
304
    if version == 4:
305
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
306
        ipaddress.nic = port
307
        ipaddress.save()
308
    elif version == 6:
309
        subnet6 = port.network.subnet6
310
        ipaddress = IPAddress.objects.create(userid=userid,
311
                                             network=port.network,
312
                                             subnet=subnet6,
313
                                             nic=port,
314
                                             address=new_address,
315
                                             ipversion=6)
316
    else:
317
        raise ValueError("Unknown version: %s" % version)
318

    
319
    # New address log
320
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
321
                                         network_id=port.network_id,
322
                                         address=new_address,
323
                                         active=True)
324
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
325
             ip_log.id, new_address, port.machine_id)
326

    
327
    return ipaddress
328

    
329

    
330
def nics_are_equal(db_nic, gnt_nic):
331
    for field in NIC_FIELDS:
332
        if getattr(db_nic, field) != gnt_nic[field]:
333
            return False
334
    return True
335

    
336

    
337
def process_ganeti_nics(ganeti_nics):
338
    """Process NIC dict from ganeti"""
339
    new_nics = []
340
    for index, gnic in enumerate(ganeti_nics):
341
        nic_name = gnic.get("name", None)
342
        if nic_name is not None:
343
            nic_id = utils.id_from_nic_name(nic_name)
344
        else:
345
            # Put as default value the index. If it is an unknown NIC to
346
            # synnefo it will be created automaticaly.
347
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
348
        network_name = gnic.get('network', '')
349
        network_id = utils.id_from_network_name(network_name)
350
        network = Network.objects.get(id=network_id)
351

    
352
        # Get the new nic info
353
        mac = gnic.get('mac')
354
        ipv4 = gnic.get('ip')
355
        subnet6 = network.subnet6
356
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
357

    
358
        firewall = gnic.get('firewall')
359
        firewall_profile = _reverse_tags.get(firewall)
360
        if not firewall_profile and network.public:
361
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
362

    
363
        nic_info = {
364
            'index': index,
365
            'network': network,
366
            'mac': mac,
367
            'ipv4_address': ipv4,
368
            'ipv6_address': ipv6,
369
            'firewall_profile': firewall_profile,
370
            'state': 'ACTIVE'}
371

    
372
        new_nics.append((nic_id, nic_info))
373
    return dict(new_nics)
374

    
375

    
376
def remove_nic_ips(nic, version=None):
377
    """Remove IP addresses associated with a NetworkInterface.
378

379
    Remove all IP addresses that are associated with the NetworkInterface
380
    object, by returning them to the pool and deleting the IPAddress object. If
381
    the IP is a floating IP, then it is just disassociated from the NIC.
382
    If version is specified, then only IP addressses of that version will be
383
    removed.
384

385
    """
386
    for ip in nic.ips.all():
387
        if version and ip.ipversion != version:
388
            continue
389

    
390
        # Update the DB table holding the logging of all IP addresses
391
        terminate_active_ipaddress_log(nic, ip)
392

    
393
        if ip.floating_ip:
394
            ip.nic = None
395
            ip.save()
396
        else:
397
            # Release the IPv4 address
398
            ip.release_address()
399
            ip.delete()
400

    
401

    
402
def terminate_active_ipaddress_log(nic, ip):
403
    """Update DB logging entry for this IP address."""
404
    if not ip.network.public or nic.machine is None:
405
        return
406
    try:
407
        ip_log, created = \
408
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
409
                                               network_id=ip.network_id,
410
                                               address=ip.address,
411
                                               active=True)
412
    except IPAddressLog.MultipleObjectsReturned:
413
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
414
                  "Server %s. Cannot proceed!"
415
                  % (ip.address, ip.network, nic.machine))
416
        log.error(logmsg)
417
        raise
418

    
419
    if created:
420
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
421
                  " but with wrong creation timestamp."
422
                  % (ip.address, ip.network, nic.machine))
423
        log.error(logmsg)
424
    ip_log.released_at = datetime.now()
425
    ip_log.active = False
426
    ip_log.save()
427

    
428

    
429
@transaction.commit_on_success
430
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
431
    if status not in [x[0] for x in BACKEND_STATUSES]:
432
        raise Network.InvalidBackendMsgError(opcode, status)
433

    
434
    back_network.backendjobid = jobid
435
    back_network.backendjobstatus = status
436
    back_network.backendopcode = opcode
437
    back_network.backendlogmsg = logmsg
438

    
439
    # Note: Network is already locked!
440
    network = back_network.network
441

    
442
    # Notifications of success change the operating state
443
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
444
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
445
        back_network.operstate = state_for_success
446

    
447
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
448
       and opcode == 'OP_NETWORK_ADD'):
449
        back_network.operstate = 'ERROR'
450
        back_network.backendtime = etime
451

    
452
    if opcode == 'OP_NETWORK_REMOVE':
453
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
454
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
455
                                  network_exists_in_backend(back_network)):
456
            back_network.operstate = state_for_success
457
            back_network.deleted = True
458
            back_network.backendtime = etime
459

    
460
    if status == rapi.JOB_STATUS_SUCCESS:
461
        back_network.backendtime = etime
462
    back_network.save()
463
    # Also you must update the state of the Network!!
464
    update_network_state(network)
465

    
466

    
467
def update_network_state(network):
468
    """Update the state of a Network based on BackendNetwork states.
469

470
    Update the state of a Network based on the operstate of the networks in the
471
    backends that network exists.
472

473
    The state of the network is:
474
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
475
    * DELETED: If it is is 'DELETED' in all backends that have been created.
476

477
    This function also releases the resources (MAC prefix or Bridge) and the
478
    quotas for the network.
479

480
    """
481
    if network.deleted:
482
        # Network has already been deleted. Just assert that state is also
483
        # DELETED
484
        if not network.state == "DELETED":
485
            network.state = "DELETED"
486
            network.save()
487
        return
488

    
489
    backend_states = [s.operstate for s in network.backend_networks.all()]
490
    if not backend_states and network.action != "DESTROY":
491
        if network.state != "ACTIVE":
492
            network.state = "ACTIVE"
493
            network.save()
494
            return
495

    
496
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
497
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
498
                     "DELETED")
499

    
500
    # Release the resources on the deletion of the Network
501
    if deleted:
502
        if network.ips.filter(deleted=False, floating_ip=True).exists():
503
            msg = "Cannot delete network %s! Floating IPs still in use!"
504
            log.error(msg % network)
505
            raise Exception(msg % network)
506
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
507
                 network.id, network.mac_prefix, network.link)
508
        network.deleted = True
509
        network.state = "DELETED"
510
        # Undrain the network, otherwise the network state will remain
511
        # as 'SNF:DRAINED'
512
        network.drained = False
513
        if network.mac_prefix:
514
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
515
                release_resource(res_type="mac_prefix",
516
                                 value=network.mac_prefix)
517
        if network.link:
518
            if network.FLAVORS[network.flavor]["link"] == "pool":
519
                release_resource(res_type="bridge", value=network.link)
520

    
521
        # Set all subnets as deleted
522
        network.subnets.update(deleted=True)
523
        # And delete the IP pools
524
        for subnet in network.subnets.all():
525
            if subnet.ipversion == 4:
526
                subnet.ip_pools.all().delete()
527
        # And all the backend networks since there are useless
528
        network.backend_networks.all().delete()
529

    
530
        # Issue commission
531
        if network.userid:
532
            quotas.issue_and_accept_commission(network, action="DESTROY")
533
            # the above has already saved the object and committed;
534
            # a second save would override others' changes, since the
535
            # object is now unlocked
536
            return
537
        elif not network.public:
538
            log.warning("Network %s does not have an owner!", network.id)
539
    network.save()
540

    
541

    
542
@transaction.commit_on_success
543
def process_network_modify(back_network, etime, jobid, opcode, status,
544
                           job_fields):
545
    assert (opcode == "OP_NETWORK_SET_PARAMS")
546
    if status not in [x[0] for x in BACKEND_STATUSES]:
547
        raise Network.InvalidBackendMsgError(opcode, status)
548

    
549
    back_network.backendjobid = jobid
550
    back_network.backendjobstatus = status
551
    back_network.opcode = opcode
552

    
553
    add_reserved_ips = job_fields.get("add_reserved_ips")
554
    if add_reserved_ips:
555
        network = back_network.network
556
        for ip in add_reserved_ips:
557
            network.reserve_address(ip, external=True)
558

    
559
    if status == rapi.JOB_STATUS_SUCCESS:
560
        back_network.backendtime = etime
561
    back_network.save()
562

    
563

    
564
@transaction.commit_on_success
565
def process_create_progress(vm, etime, progress):
566

    
567
    percentage = int(progress)
568

    
569
    # The percentage may exceed 100%, due to the way
570
    # snf-image:copy-progress tracks bytes read by image handling processes
571
    percentage = 100 if percentage > 100 else percentage
572
    if percentage < 0:
573
        raise ValueError("Percentage cannot be negative")
574

    
575
    # FIXME: log a warning here, see #1033
576
#   if last_update > percentage:
577
#       raise ValueError("Build percentage should increase monotonically " \
578
#                        "(old = %d, new = %d)" % (last_update, percentage))
579

    
580
    # This assumes that no message of type 'ganeti-create-progress' is going to
581
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
582
    # the instance is STARTED.  What if the two messages are processed by two
583
    # separate dispatcher threads, and the 'ganeti-op-status' message for
584
    # successful creation gets processed before the 'ganeti-create-progress'
585
    # message? [vkoukis]
586
    #
587
    #if not vm.operstate == 'BUILD':
588
    #    raise VirtualMachine.IllegalState("VM is not in building state")
589

    
590
    vm.buildpercentage = percentage
591
    vm.backendtime = etime
592
    vm.save()
593

    
594

    
595
@transaction.commit_on_success
596
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
597
                               details=None):
598
    """
599
    Create virtual machine instance diagnostic entry.
600

601
    :param vm: VirtualMachine instance to create diagnostic for.
602
    :param message: Diagnostic message.
603
    :param source: Diagnostic source identifier (e.g. image-helper).
604
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
605
    :param etime: The time the message occured (if available).
606
    :param details: Additional details or debug information.
607
    """
608
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
609
                                                   source_date=etime,
610
                                                   message=message,
611
                                                   details=details)
612

    
613

    
614
def create_instance(vm, nics, volumes, flavor, image):
615
    """`image` is a dictionary which should contain the keys:
616
            'backend_id', 'format' and 'metadata'
617

618
        metadata value should be a dictionary.
619
    """
620

    
621
    # Handle arguments to CreateInstance() as a dictionary,
622
    # initialize it based on a deployment-specific value.
623
    # This enables the administrator to override deployment-specific
624
    # arguments, such as the disk template to use, name of os provider
625
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
626
    #
627
    kw = vm.backend.get_create_params()
628
    kw['mode'] = 'create'
629
    kw['name'] = vm.backend_vm_id
630
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
631

    
632
    kw['disk_template'] = flavor.disk_template
633
    disks = []
634
    for volume in volumes:
635
        disk = {}
636
        disk["size"] = volume.size * 1024
637
        provider = flavor.disk_provider
638
        if provider is not None:
639
            disk["provider"] = provider
640
            disk["origin"] = volume.origin
641
            extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS\
642
                                        .get(provider)
643
            if extra_disk_params is not None:
644
                disk.update(extra_disk_params)
645
        disks.append(disk)
646

    
647
    kw["disks"] = disks
648

    
649
    kw['nics'] = [{"name": nic.backend_uuid,
650
                   "network": nic.network.backend_id,
651
                   "ip": nic.ipv4_address}
652
                  for nic in nics]
653

    
654
    backend = vm.backend
655
    depend_jobs = []
656
    for nic in nics:
657
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
658
        depend_jobs.extend(job_ids)
659

    
660
    kw["depends"] = create_job_dependencies(depend_jobs)
661

    
662
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
663
    # kw['os'] = settings.GANETI_OS_PROVIDER
664
    kw['ip_check'] = False
665
    kw['name_check'] = False
666

    
667
    # Do not specific a node explicitly, have
668
    # Ganeti use an iallocator instead
669
    #kw['pnode'] = rapi.GetNodes()[0]
670

    
671
    kw['dry_run'] = settings.TEST
672

    
673
    kw['beparams'] = {
674
        'auto_balance': True,
675
        'vcpus': flavor.cpu,
676
        'memory': flavor.ram}
677

    
678
    kw['osparams'] = {
679
        'config_url': vm.config_url,
680
        # Store image id and format to Ganeti
681
        'img_id': image['backend_id'],
682
        'img_format': image['format']}
683

    
684
    # Use opportunistic locking
685
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
686

    
687
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
688
    # kw['hvparams'] = dict(serial_console=False)
689

    
690
    log.debug("Creating instance %s", utils.hide_pass(kw))
691
    with pooled_rapi_client(vm) as client:
692
        return client.CreateInstance(**kw)
693

    
694

    
695
def delete_instance(vm, shutdown_timeout=None):
696
    with pooled_rapi_client(vm) as client:
697
        return client.DeleteInstance(vm.backend_vm_id,
698
                                     shutdown_timeout=shutdown_timeout,
699
                                     dry_run=settings.TEST)
700

    
701

    
702
def reboot_instance(vm, reboot_type, shutdown_timeout=None):
703
    assert reboot_type in ('soft', 'hard')
704
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
705
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
706
    # 'shutdown_timeout'.
707
    kwargs = {"instance": vm.backend_vm_id,
708
              "reboot_type": "hard"}
709
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
710
    # Ganeti > 2.10. In other versions this parameter will be ignored and
711
    # we will fallback to default timeout of Ganeti (120s).
712
    if shutdown_timeout is not None:
713
        kwargs["shutdown_timeout"] = shutdown_timeout
714
    if reboot_type == "hard":
715
        kwargs["shutdown_timeout"] = 0
716
    if settings.TEST:
717
        kwargs["dry_run"] = True
718
    with pooled_rapi_client(vm) as client:
719
        return client.RebootInstance(**kwargs)
720

    
721

    
722
def startup_instance(vm):
723
    with pooled_rapi_client(vm) as client:
724
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
725

    
726

    
727
def shutdown_instance(vm, shutdown_timeout=None):
728
    with pooled_rapi_client(vm) as client:
729
        return client.ShutdownInstance(vm.backend_vm_id,
730
                                       timeout=shutdown_timeout,
731
                                       dry_run=settings.TEST)
732

    
733

    
734
def resize_instance(vm, vcpus, memory):
735
    beparams = {"vcpus": int(vcpus),
736
                "minmem": int(memory),
737
                "maxmem": int(memory)}
738
    with pooled_rapi_client(vm) as client:
739
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
740

    
741

    
742
def get_instance_console(vm):
743
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
744
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
745
    # useless (see #783).
746
    #
747
    # Until this is fixed on the Ganeti side, construct a console info reply
748
    # directly.
749
    #
750
    # WARNING: This assumes that VNC runs on port network_port on
751
    #          the instance's primary node, and is probably
752
    #          hypervisor-specific.
753
    #
754
    log.debug("Getting console for vm %s", vm)
755

    
756
    console = {}
757
    console['kind'] = 'vnc'
758

    
759
    with pooled_rapi_client(vm) as client:
760
        i = client.GetInstance(vm.backend_vm_id)
761

    
762
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
763
        raise Exception("hv parameter serial_console cannot be true")
764
    console['host'] = i['pnode']
765
    console['port'] = i['network_port']
766

    
767
    return console
768

    
769

    
770
def get_instance_info(vm):
771
    with pooled_rapi_client(vm) as client:
772
        return client.GetInstance(vm.backend_vm_id)
773

    
774

    
775
def vm_exists_in_backend(vm):
776
    try:
777
        get_instance_info(vm)
778
        return True
779
    except rapi.GanetiApiError as e:
780
        if e.code == 404:
781
            return False
782
        raise e
783

    
784

    
785
def get_network_info(backend_network):
786
    with pooled_rapi_client(backend_network) as client:
787
        return client.GetNetwork(backend_network.network.backend_id)
788

    
789

    
790
def network_exists_in_backend(backend_network):
791
    try:
792
        get_network_info(backend_network)
793
        return True
794
    except rapi.GanetiApiError as e:
795
        if e.code == 404:
796
            return False
797

    
798

    
799
def job_is_still_running(vm, job_id=None):
800
    with pooled_rapi_client(vm) as c:
801
        try:
802
            if job_id is None:
803
                job_id = vm.backendjobid
804
            job_info = c.GetJobStatus(job_id)
805
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
806
        except rapi.GanetiApiError:
807
            return False
808

    
809

    
810
def nic_is_stale(vm, nic, timeout=60):
811
    """Check if a NIC is stale or exists in the Ganeti backend."""
812
    # First check the state of the NIC and if there is a pending CONNECT
813
    if nic.state == "BUILD" and vm.task == "CONNECT":
814
        if datetime.now() < nic.created + timedelta(seconds=timeout):
815
            # Do not check for too recent NICs to avoid the time overhead
816
            return False
817
        if job_is_still_running(vm, job_id=vm.task_job_id):
818
            return False
819
        else:
820
            # If job has finished, check that the NIC exists, because the
821
            # message may have been lost or stuck in the queue.
822
            vm_info = get_instance_info(vm)
823
            if nic.backend_uuid in vm_info["nic.names"]:
824
                return False
825
    return True
826

    
827

    
828
def ensure_network_is_active(backend, network_id):
829
    """Ensure that a network is active in the specified backend
830

831
    Check that a network exists and is active in the specified backend. If not
832
    (re-)create the network. Return the corresponding BackendNetwork object
833
    and the IDs of the Ganeti job to create the network.
834

835
    """
836
    job_ids = []
837
    try:
838
        bnet = BackendNetwork.objects.select_related("network")\
839
                                     .get(backend=backend, network=network_id)
840
        if bnet.operstate != "ACTIVE":
841
            job_ids = create_network(bnet.network, backend, connect=True)
842
    except BackendNetwork.DoesNotExist:
843
        network = Network.objects.select_for_update().get(id=network_id)
844
        bnet = BackendNetwork.objects.create(backend=backend, network=network)
845
        job_ids = create_network(network, backend, connect=True)
846

    
847
    return bnet, job_ids
848

    
849

    
850
def create_network(network, backend, connect=True):
851
    """Create a network in a Ganeti backend"""
852
    log.debug("Creating network %s in backend %s", network, backend)
853

    
854
    job_id = _create_network(network, backend)
855

    
856
    if connect:
857
        job_ids = connect_network(network, backend, depends=[job_id])
858
        return job_ids
859
    else:
860
        return [job_id]
861

    
862

    
863
def _create_network(network, backend):
864
    """Create a network."""
865

    
866
    tags = network.backend_tag
867
    subnet = None
868
    subnet6 = None
869
    gateway = None
870
    gateway6 = None
871
    for _subnet in network.subnets.all():
872
        if _subnet.dhcp and not "nfdhcpd" in tags:
873
            tags.append("nfdhcpd")
874
        if _subnet.ipversion == 4:
875
            subnet = _subnet.cidr
876
            gateway = _subnet.gateway
877
        elif _subnet.ipversion == 6:
878
            subnet6 = _subnet.cidr
879
            gateway6 = _subnet.gateway
880

    
881
    conflicts_check = False
882
    if network.public:
883
        tags.append('public')
884
        if subnet is not None:
885
            conflicts_check = True
886
    else:
887
        tags.append('private')
888

    
889
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
890
    # not support IPv6 only networks. To bypass this limitation, we create the
891
    # network with a dummy network subnet, and make Cyclades connect instances
892
    # to such networks, with address=None.
893
    if subnet is None:
894
        subnet = "10.0.0.0/29"
895

    
896
    try:
897
        bn = BackendNetwork.objects.get(network=network, backend=backend)
898
        mac_prefix = bn.mac_prefix
899
    except BackendNetwork.DoesNotExist:
900
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
901
                        " does not exist" % (network.id, backend.id))
902

    
903
    with pooled_rapi_client(backend) as client:
904
        return client.CreateNetwork(network_name=network.backend_id,
905
                                    network=subnet,
906
                                    network6=subnet6,
907
                                    gateway=gateway,
908
                                    gateway6=gateway6,
909
                                    mac_prefix=mac_prefix,
910
                                    conflicts_check=conflicts_check,
911
                                    tags=tags)
912

    
913

    
914
def connect_network(network, backend, depends=[], group=None):
915
    """Connect a network to nodegroups."""
916
    log.debug("Connecting network %s to backend %s", network, backend)
917

    
918
    conflicts_check = False
919
    if network.public and (network.subnet4 is not None):
920
        conflicts_check = True
921

    
922
    depends = create_job_dependencies(depends)
923
    with pooled_rapi_client(backend) as client:
924
        groups = [group] if group is not None else client.GetGroups()
925
        job_ids = []
926
        for group in groups:
927
            job_id = client.ConnectNetwork(network.backend_id, group,
928
                                           network.mode, network.link,
929
                                           conflicts_check,
930
                                           depends=depends)
931
            job_ids.append(job_id)
932
    return job_ids
933

    
934

    
935
def delete_network(network, backend, disconnect=True):
936
    log.debug("Deleting network %s from backend %s", network, backend)
937

    
938
    depends = []
939
    if disconnect:
940
        depends = disconnect_network(network, backend)
941
    _delete_network(network, backend, depends=depends)
942

    
943

    
944
def _delete_network(network, backend, depends=[]):
945
    depends = create_job_dependencies(depends)
946
    with pooled_rapi_client(backend) as client:
947
        return client.DeleteNetwork(network.backend_id, depends)
948

    
949

    
950
def disconnect_network(network, backend, group=None):
951
    log.debug("Disconnecting network %s to backend %s", network, backend)
952

    
953
    with pooled_rapi_client(backend) as client:
954
        groups = [group] if group is not None else client.GetGroups()
955
        job_ids = []
956
        for group in groups:
957
            job_id = client.DisconnectNetwork(network.backend_id, group)
958
            job_ids.append(job_id)
959
    return job_ids
960

    
961

    
962
def connect_to_network(vm, nic):
963
    network = nic.network
964
    backend = vm.backend
965
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
966

    
967
    depends = create_job_dependencies(depend_jobs)
968

    
969
    nic = {'name': nic.backend_uuid,
970
           'network': network.backend_id,
971
           'ip': nic.ipv4_address}
972

    
973
    log.debug("Adding NIC %s to VM %s", nic, vm)
974

    
975
    kwargs = {
976
        "instance": vm.backend_vm_id,
977
        "nics": [("add", "-1", nic)],
978
        "depends": depends,
979
    }
980
    if vm.backend.use_hotplug():
981
        kwargs["hotplug_if_possible"] = True
982
    if settings.TEST:
983
        kwargs["dry_run"] = True
984

    
985
    with pooled_rapi_client(vm) as client:
986
        return client.ModifyInstance(**kwargs)
987

    
988

    
989
def disconnect_from_network(vm, nic):
990
    log.debug("Removing NIC %s of VM %s", nic, vm)
991

    
992
    kwargs = {
993
        "instance": vm.backend_vm_id,
994
        "nics": [("remove", nic.backend_uuid, {})],
995
    }
996
    if vm.backend.use_hotplug():
997
        kwargs["hotplug_if_possible"] = True
998
    if settings.TEST:
999
        kwargs["dry_run"] = True
1000

    
1001
    with pooled_rapi_client(vm) as client:
1002
        jobID = client.ModifyInstance(**kwargs)
1003
        firewall_profile = nic.firewall_profile
1004
        if firewall_profile and firewall_profile != "DISABLED":
1005
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
1006
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
1007
                                      dry_run=settings.TEST)
1008

    
1009
        return jobID
1010

    
1011

    
1012
def set_firewall_profile(vm, profile, nic):
1013
    uuid = nic.backend_uuid
1014
    try:
1015
        tag = _firewall_tags[profile] % uuid
1016
    except KeyError:
1017
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
1018

    
1019
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
1020

    
1021
    with pooled_rapi_client(vm) as client:
1022
        # Delete previous firewall tags
1023
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
1024
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
1025
                       if (t % uuid) in old_tags]
1026
        if delete_tags:
1027
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1028
                                      dry_run=settings.TEST)
1029

    
1030
        if profile != "DISABLED":
1031
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1032
                                   dry_run=settings.TEST)
1033

    
1034
        # XXX NOP ModifyInstance call to force process_net_status to run
1035
        # on the dispatcher
1036
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1037
        client.ModifyInstance(vm.backend_vm_id,
1038
                              os_name=os_name)
1039
    return None
1040

    
1041

    
1042
def attach_volume(vm, volume, depends=[]):
1043
    log.debug("Attaching volume %s to vm %s", vm, volume)
1044

    
1045
    disk = {"size": int(volume.size) << 10,
1046
            "name": volume.backend_volume_uuid,
1047
            "volume_name": volume.backend_volume_uuid}
1048

    
1049
    disk_provider = volume.disk_provider
1050
    if disk_provider is not None:
1051
        disk["provider"] = disk_provider
1052

    
1053
    if volume.origin is not None:
1054
        disk["origin"] = volume.origin
1055

    
1056
    kwargs = {
1057
        "instance": vm.backend_vm_id,
1058
        "disks": [("add", "-1", disk)],
1059
        "depends": depends,
1060
    }
1061
    if vm.backend.use_hotplug():
1062
        kwargs["hotplug_if_possible"] = True
1063
    if settings.TEST:
1064
        kwargs["dry_run"] = True
1065

    
1066
    with pooled_rapi_client(vm) as client:
1067
        return client.ModifyInstance(**kwargs)
1068

    
1069

    
1070
def detach_volume(vm, volume, depends=[]):
1071
    log.debug("Removing volume %s from vm %s", volume, vm)
1072
    kwargs = {
1073
        "instance": vm.backend_vm_id,
1074
        "disks": [("remove", volume.backend_volume_uuid, {})],
1075
        "depends": depends,
1076
    }
1077
    if vm.backend.use_hotplug():
1078
        kwargs["hotplug_if_possible"] = True
1079
    if settings.TEST:
1080
        kwargs["dry_run"] = True
1081

    
1082
    with pooled_rapi_client(vm) as client:
1083
        return client.ModifyInstance(**kwargs)
1084

    
1085

    
1086
def snapshot_instance(vm, snapshot_name):
1087
    #volume = instance.volumes.all()[0]
1088
    with pooled_rapi_client(vm) as client:
1089
        return client.SnapshotInstance(instance=vm.backend_vm_id,
1090
                                       snapshot_name=snapshot_name)
1091

    
1092

    
1093
def get_instances(backend, bulk=True):
1094
    with pooled_rapi_client(backend) as c:
1095
        return c.GetInstances(bulk=bulk)
1096

    
1097

    
1098
def get_nodes(backend, bulk=True):
1099
    with pooled_rapi_client(backend) as c:
1100
        return c.GetNodes(bulk=bulk)
1101

    
1102

    
1103
def get_jobs(backend, bulk=True):
1104
    with pooled_rapi_client(backend) as c:
1105
        return c.GetJobs(bulk=bulk)
1106

    
1107

    
1108
def get_physical_resources(backend):
1109
    """ Get the physical resources of a backend.
1110

1111
    Get the resources of a backend as reported by the backend (not the db).
1112

1113
    """
1114
    nodes = get_nodes(backend, bulk=True)
1115
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1116
    res = {}
1117
    for a in attr:
1118
        res[a] = 0
1119
    for n in nodes:
1120
        # Filter out drained, offline and not vm_capable nodes since they will
1121
        # not take part in the vm allocation process
1122
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1123
        if can_host_vms and n['cnodes']:
1124
            for a in attr:
1125
                res[a] += int(n[a] or 0)
1126
    return res
1127

    
1128

    
1129
def update_backend_resources(backend, resources=None):
1130
    """ Update the state of the backend resources in db.
1131

1132
    """
1133

    
1134
    if not resources:
1135
        resources = get_physical_resources(backend)
1136

    
1137
    backend.mfree = resources['mfree']
1138
    backend.mtotal = resources['mtotal']
1139
    backend.dfree = resources['dfree']
1140
    backend.dtotal = resources['dtotal']
1141
    backend.pinst_cnt = resources['pinst_cnt']
1142
    backend.ctotal = resources['ctotal']
1143
    backend.updated = datetime.now()
1144
    backend.save()
1145

    
1146

    
1147
def get_memory_from_instances(backend):
1148
    """ Get the memory that is used from instances.
1149

1150
    Get the used memory of a backend. Note: This is different for
1151
    the real memory used, due to kvm's memory de-duplication.
1152

1153
    """
1154
    with pooled_rapi_client(backend) as client:
1155
        instances = client.GetInstances(bulk=True)
1156
    mem = 0
1157
    for i in instances:
1158
        mem += i['oper_ram']
1159
    return mem
1160

    
1161

    
1162
def get_available_disk_templates(backend):
1163
    """Get the list of available disk templates of a Ganeti backend.
1164

1165
    The list contains the disk templates that are enabled in the Ganeti backend
1166
    and also included in ipolicy-disk-templates.
1167

1168
    """
1169
    with pooled_rapi_client(backend) as c:
1170
        info = c.GetInfo()
1171
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1172
    try:
1173
        enabled_disk_templates = info["enabled_disk_templates"]
1174
        return [dp for dp in enabled_disk_templates
1175
                if dp in ipolicy_disk_templates]
1176
    except KeyError:
1177
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1178
        return ipolicy_disk_templates
1179

    
1180

    
1181
def update_backend_disk_templates(backend):
1182
    disk_templates = get_available_disk_templates(backend)
1183
    backend.disk_templates = disk_templates
1184
    backend.save()
1185

    
1186

    
1187
##
1188
## Synchronized operations for reconciliation
1189
##
1190

    
1191

    
1192
def create_network_synced(network, backend):
1193
    result = _create_network_synced(network, backend)
1194
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1195
        return result
1196
    result = connect_network_synced(network, backend)
1197
    return result
1198

    
1199

    
1200
def _create_network_synced(network, backend):
1201
    with pooled_rapi_client(backend) as client:
1202
        job = _create_network(network, backend)
1203
        result = wait_for_job(client, job)
1204
    return result
1205

    
1206

    
1207
def connect_network_synced(network, backend):
1208
    with pooled_rapi_client(backend) as client:
1209
        for group in client.GetGroups():
1210
            job = client.ConnectNetwork(network.backend_id, group,
1211
                                        network.mode, network.link)
1212
            result = wait_for_job(client, job)
1213
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1214
                return result
1215

    
1216
    return result
1217

    
1218

    
1219
def wait_for_job(client, jobid):
1220
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1221
    status = result['job_info'][0]
1222
    while status not in rapi.JOB_STATUS_FINALIZED:
1223
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1224
                                         [result], None)
1225
        status = result['job_info'][0]
1226

    
1227
    if status == rapi.JOB_STATUS_SUCCESS:
1228
        return (status, None)
1229
    else:
1230
        error = result['job_info'][1]
1231
        return (status, error)
1232

    
1233

    
1234
def create_job_dependencies(job_ids=[], job_states=None):
1235
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1236
    if job_states is None:
1237
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1238
    assert(type(job_states) == list)
1239
    return [[job_id, job_states] for job_id in job_ids]