Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 1b3f1792

History | View | Annotate | Download (42.7 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
59
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
60
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
UNKNOWN_NIC_PREFIX = "unknown-"
62

    
63

    
64
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
65
    """Handle quotas for updated VirtualMachine.
66

67
    Update quotas for the updated VirtualMachine based on the job that run on
68
    the Ganeti backend. If a commission has been already issued for this job,
69
    then this commission is just accepted or rejected based on the job status.
70
    Otherwise, a new commission for the given change is issued, that is also in
71
    force and auto-accept mode. In this case, previous commissions are
72
    rejected, since they reflect a previous state of the VM.
73

74
    """
75
    if job_status not in rapi.JOB_STATUS_FINALIZED:
76
        return vm
77

    
78
    # Check successful completion of a job will trigger any quotable change in
79
    # the VM state.
80
    action = utils.get_action_from_opcode(job_opcode, job_fields)
81
    if action == "BUILD":
82
        # Quotas for new VMs are automatically accepted by the API
83
        return vm
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == rapi.JOB_STATUS_SUCCESS:
92
            quotas.accept_serial(serial)
93
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_serial(serial)
97
        vm.serial = None
98
    elif job_status == rapi.JOB_STATUS_SUCCESS:
99
        commission_info = quotas.get_commission_info(resource=vm,
100
                                                     action=action,
101
                                                     action_fields=job_fields)
102
        if commission_info is not None:
103
            # Commission for this change has not been issued, or the issued
104
            # commission was unaware of the current change. Reject all previous
105
            # commissions and create a new one in forced mode!
106
            log.debug("Expected job was %s. Processing job %s.",
107
                      vm.task_job_id, job_id)
108
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
109
                      % (vm, job_id))
110
            quotas.handle_resource_commission(vm, action,
111
                                              action_fields=job_fields,
112
                                              commission_name=reason,
113
                                              force=True,
114
                                              auto_accept=True)
115
            log.debug("Issued new commission: %s", vm.serial)
116
            # NOTE: Since we rejected the serial that was associated with the
117
            # 'vm.task_job_id' job, we must also clear the 'vm.serial' field.
118
            # If not, there will be no new commission for the 'vm.task_job_id'
119
            # job!
120
            vm.serial = None
121

    
122
    return vm
123

    
124

    
125
@transaction.commit_on_success
126
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
127
                      job_fields=None):
128
    """Process a job progress notification from the backend
129

130
    Process an incoming message from the backend (currently Ganeti).
131
    Job notifications with a terminating status (sucess, error, or canceled),
132
    also update the operating state of the VM.
133

134
    """
135
    # See #1492, #1031, #1111 why this line has been removed
136
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
137
    if status not in [x[0] for x in BACKEND_STATUSES]:
138
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
139

    
140
    vm.backendjobid = jobid
141
    vm.backendjobstatus = status
142
    vm.backendopcode = opcode
143
    vm.backendlogmsg = logmsg
144

    
145
    if status not in rapi.JOB_STATUS_FINALIZED:
146
        vm.save()
147
        return
148

    
149
    if job_fields is None:
150
        job_fields = {}
151

    
152
    new_operstate = None
153
    new_flavor = None
154
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
155

    
156
    if status == rapi.JOB_STATUS_SUCCESS:
157
        # If job succeeds, change operating state if needed
158
        if state_for_success is not None:
159
            new_operstate = state_for_success
160

    
161
        beparams = job_fields.get("beparams", None)
162
        if beparams:
163
            # Change the flavor of the VM
164
            new_flavor = _process_resize(vm, beparams)
165

    
166
        # Update backendtime only for jobs that have been successfully
167
        # completed, since only these jobs update the state of the VM. Else a
168
        # "race condition" may occur when a successful job (e.g.
169
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
170
        # in reversed order.
171
        vm.backendtime = etime
172

    
173
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
174
        # Update the NICs of the VM
175
        _process_net_status(vm, etime, nics)
176

    
177
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
178
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
179
                                                     rapi.JOB_STATUS_ERROR):
180
        new_operstate = "ERROR"
181
        vm.backendtime = etime
182
        # Update state of associated NICs
183
        vm.nics.all().update(state="ERROR")
184
    elif opcode == 'OP_INSTANCE_REMOVE':
185
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
186
        # when no instance exists at the Ganeti backend.
187
        # See ticket #799 for all the details.
188
        if (status == rapi.JOB_STATUS_SUCCESS or
189
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
190
            # VM has been deleted
191
            for nic in vm.nics.all():
192
                # Release the IP
193
                remove_nic_ips(nic)
194
                # And delete the NIC.
195
                nic.delete()
196
            vm.deleted = True
197
            new_operstate = state_for_success
198
            vm.backendtime = etime
199
            status = rapi.JOB_STATUS_SUCCESS
200

    
201
    if status in rapi.JOB_STATUS_FINALIZED:
202
        # Job is finalized: Handle quotas/commissioning
203
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
204
                              job_status=status, job_fields=job_fields)
205
        # and clear task fields
206
        if vm.task_job_id == jobid:
207
            vm.task = None
208
            vm.task_job_id = None
209

    
210
    if new_operstate is not None:
211
        vm.operstate = new_operstate
212
    if new_flavor is not None:
213
        vm.flavor = new_flavor
214

    
215
    vm.save()
216

    
217

    
218
def _process_resize(vm, beparams):
219
    """Change flavor of a VirtualMachine based on new beparams."""
220
    old_flavor = vm.flavor
221
    vcpus = beparams.get("vcpus", old_flavor.cpu)
222
    ram = beparams.get("maxmem", old_flavor.ram)
223
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
224
        return
225
    try:
226
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
227
                                        disk=old_flavor.disk,
228
                                        disk_template=old_flavor.disk_template)
229
    except Flavor.DoesNotExist:
230
        raise Exception("Cannot find flavor for VM")
231
    return new_flavor
232

    
233

    
234
@transaction.commit_on_success
235
def process_net_status(vm, etime, nics):
236
    """Wrap _process_net_status inside transaction."""
237
    _process_net_status(vm, etime, nics)
238

    
239

    
240
def _process_net_status(vm, etime, nics):
241
    """Process a net status notification from the backend
242

243
    Process an incoming message from the Ganeti backend,
244
    detailing the NIC configuration of a VM instance.
245

246
    Update the state of the VM in the DB accordingly.
247

248
    """
249
    ganeti_nics = process_ganeti_nics(nics)
250
    db_nics = dict([(nic.id, nic)
251
                    for nic in vm.nics.select_related("network")
252
                                      .prefetch_related("ips")])
253

    
254
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
255
        db_nic = db_nics.get(nic_name)
256
        ganeti_nic = ganeti_nics.get(nic_name)
257
        if ganeti_nic is None:
258
            if nic_is_stale(vm, nic):
259
                log.debug("Removing stale NIC '%s'" % db_nic)
260
                remove_nic_ips(db_nic)
261
                db_nic.delete()
262
            else:
263
                log.info("NIC '%s' is still being created" % db_nic)
264
        elif db_nic is None:
265
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
266
                   " fix this issue!" % (nic_name, vm))
267
            log.error(msg)
268
            continue
269
        elif not nics_are_equal(db_nic, ganeti_nic):
270
            for f in SIMPLE_NIC_FIELDS:
271
                # Update the NIC in DB with the values from Ganeti NIC
272
                setattr(db_nic, f, ganeti_nic[f])
273
                db_nic.save()
274

    
275
            # Special case where the IPv4 address has changed, because you
276
            # need to release the old IPv4 address and reserve the new one
277
            gnt_ipv4_address = ganeti_nic["ipv4_address"]
278
            db_ipv4_address = db_nic.ipv4_address
279
            if db_ipv4_address != gnt_ipv4_address:
280
                change_address_of_port(db_nic, vm.userid,
281
                                       old_address=db_ipv4_address,
282
                                       new_address=gnt_ipv4_address,
283
                                       version=4)
284

    
285
            gnt_ipv6_address = ganeti_nic["ipv6_address"]
286
            db_ipv6_address = db_nic.ipv6_address
287
            if db_ipv6_address != gnt_ipv6_address:
288
                change_address_of_port(db_nic, vm.userid,
289
                                       old_address=db_ipv6_address,
290
                                       new_address=gnt_ipv6_address,
291
                                       version=6)
292

    
293
    vm.backendtime = etime
294
    vm.save()
295

    
296

    
297
def change_address_of_port(port, userid, old_address, new_address, version):
298
    """Change."""
299
    if old_address is not None:
300
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
301
               % (version, port.machine_id, old_address, new_address))
302
        log.error(msg)
303

    
304
    # Remove the old IP address
305
    remove_nic_ips(port, version=version)
306

    
307
    if version == 4:
308
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
309
        ipaddress.nic = port
310
        ipaddress.save()
311
    elif version == 6:
312
        subnet6 = port.network.subnet6
313
        ipaddress = IPAddress.objects.create(userid=userid,
314
                                             network=port.network,
315
                                             subnet=subnet6,
316
                                             nic=port,
317
                                             address=new_address,
318
                                             ipversion=6)
319
    else:
320
        raise ValueError("Unknown version: %s" % version)
321

    
322
    # New address log
323
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
324
                                         network_id=port.network_id,
325
                                         address=new_address,
326
                                         active=True)
327
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
328
             ip_log.id, new_address, port.machine_id)
329

    
330
    return ipaddress
331

    
332

    
333
def nics_are_equal(db_nic, gnt_nic):
334
    for field in NIC_FIELDS:
335
        if getattr(db_nic, field) != gnt_nic[field]:
336
            return False
337
    return True
338

    
339

    
340
def process_ganeti_nics(ganeti_nics):
341
    """Process NIC dict from ganeti"""
342
    new_nics = []
343
    for index, gnic in enumerate(ganeti_nics):
344
        nic_name = gnic.get("name", None)
345
        if nic_name is not None:
346
            nic_id = utils.id_from_nic_name(nic_name)
347
        else:
348
            # Put as default value the index. If it is an unknown NIC to
349
            # synnefo it will be created automaticaly.
350
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
351
        network_name = gnic.get('network', '')
352
        network_id = utils.id_from_network_name(network_name)
353
        network = Network.objects.get(id=network_id)
354

    
355
        # Get the new nic info
356
        mac = gnic.get('mac')
357
        ipv4 = gnic.get('ip')
358
        subnet6 = network.subnet6
359
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
360

    
361
        firewall = gnic.get('firewall')
362
        firewall_profile = _reverse_tags.get(firewall)
363
        if not firewall_profile and network.public:
364
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
365

    
366
        nic_info = {
367
            'index': index,
368
            'network': network,
369
            'mac': mac,
370
            'ipv4_address': ipv4,
371
            'ipv6_address': ipv6,
372
            'firewall_profile': firewall_profile,
373
            'state': 'ACTIVE'}
374

    
375
        new_nics.append((nic_id, nic_info))
376
    return dict(new_nics)
377

    
378

    
379
def remove_nic_ips(nic, version=None):
380
    """Remove IP addresses associated with a NetworkInterface.
381

382
    Remove all IP addresses that are associated with the NetworkInterface
383
    object, by returning them to the pool and deleting the IPAddress object. If
384
    the IP is a floating IP, then it is just disassociated from the NIC.
385
    If version is specified, then only IP addressses of that version will be
386
    removed.
387

388
    """
389
    for ip in nic.ips.all():
390
        if version and ip.ipversion != version:
391
            continue
392

    
393
        # Update the DB table holding the logging of all IP addresses
394
        terminate_active_ipaddress_log(nic, ip)
395

    
396
        if ip.floating_ip:
397
            ip.nic = None
398
            ip.save()
399
        else:
400
            # Release the IPv4 address
401
            ip.release_address()
402
            ip.delete()
403

    
404

    
405
def terminate_active_ipaddress_log(nic, ip):
406
    """Update DB logging entry for this IP address."""
407
    if not ip.network.public or nic.machine is None:
408
        return
409
    try:
410
        ip_log, created = \
411
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
412
                                               network_id=ip.network_id,
413
                                               address=ip.address,
414
                                               active=True)
415
    except IPAddressLog.MultipleObjectsReturned:
416
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
417
                  "Server %s. Cannot proceed!"
418
                  % (ip.address, ip.network, nic.machine))
419
        log.error(logmsg)
420
        raise
421

    
422
    if created:
423
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
424
                  " but with wrong creation timestamp."
425
                  % (ip.address, ip.network, nic.machine))
426
        log.error(logmsg)
427
    ip_log.released_at = datetime.now()
428
    ip_log.active = False
429
    ip_log.save()
430

    
431

    
432
@transaction.commit_on_success
433
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
434
    if status not in [x[0] for x in BACKEND_STATUSES]:
435
        raise Network.InvalidBackendMsgError(opcode, status)
436

    
437
    back_network.backendjobid = jobid
438
    back_network.backendjobstatus = status
439
    back_network.backendopcode = opcode
440
    back_network.backendlogmsg = logmsg
441

    
442
    # Note: Network is already locked!
443
    network = back_network.network
444

    
445
    # Notifications of success change the operating state
446
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
447
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
448
        back_network.operstate = state_for_success
449

    
450
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
451
       and opcode == 'OP_NETWORK_ADD'):
452
        back_network.operstate = 'ERROR'
453
        back_network.backendtime = etime
454

    
455
    if opcode == 'OP_NETWORK_REMOVE':
456
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
457
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
458
                                  network_exists_in_backend(back_network)):
459
            back_network.operstate = state_for_success
460
            back_network.deleted = True
461
            back_network.backendtime = etime
462

    
463
    if status == rapi.JOB_STATUS_SUCCESS:
464
        back_network.backendtime = etime
465
    back_network.save()
466
    # Also you must update the state of the Network!!
467
    update_network_state(network)
468

    
469

    
470
def update_network_state(network):
471
    """Update the state of a Network based on BackendNetwork states.
472

473
    Update the state of a Network based on the operstate of the networks in the
474
    backends that network exists.
475

476
    The state of the network is:
477
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
478
    * DELETED: If it is is 'DELETED' in all backends that have been created.
479

480
    This function also releases the resources (MAC prefix or Bridge) and the
481
    quotas for the network.
482

483
    """
484
    if network.deleted:
485
        # Network has already been deleted. Just assert that state is also
486
        # DELETED
487
        if not network.state == "DELETED":
488
            network.state = "DELETED"
489
            network.save()
490
        return
491

    
492
    backend_states = [s.operstate for s in network.backend_networks.all()]
493
    if not backend_states and network.action != "DESTROY":
494
        if network.state != "ACTIVE":
495
            network.state = "ACTIVE"
496
            network.save()
497
            return
498

    
499
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
500
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
501
                     "DELETED")
502

    
503
    # Release the resources on the deletion of the Network
504
    if deleted:
505
        if network.ips.filter(deleted=False, floating_ip=True).exists():
506
            msg = "Cannot delete network %s! Floating IPs still in use!"
507
            log.error(msg % network)
508
            raise Exception(msg % network)
509
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
510
                 network.id, network.mac_prefix, network.link)
511
        network.deleted = True
512
        network.state = "DELETED"
513
        # Undrain the network, otherwise the network state will remain
514
        # as 'SNF:DRAINED'
515
        network.drained = False
516
        if network.mac_prefix:
517
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
518
                release_resource(res_type="mac_prefix",
519
                                 value=network.mac_prefix)
520
        if network.link:
521
            if network.FLAVORS[network.flavor]["link"] == "pool":
522
                release_resource(res_type="bridge", value=network.link)
523

    
524
        # Set all subnets as deleted
525
        network.subnets.update(deleted=True)
526
        # And delete the IP pools
527
        for subnet in network.subnets.all():
528
            if subnet.ipversion == 4:
529
                subnet.ip_pools.all().delete()
530
        # And all the backend networks since there are useless
531
        network.backend_networks.all().delete()
532

    
533
        # Issue commission
534
        if network.userid:
535
            quotas.issue_and_accept_commission(network, action="DESTROY")
536
            # the above has already saved the object and committed;
537
            # a second save would override others' changes, since the
538
            # object is now unlocked
539
            return
540
        elif not network.public:
541
            log.warning("Network %s does not have an owner!", network.id)
542
    network.save()
543

    
544

    
545
@transaction.commit_on_success
546
def process_network_modify(back_network, etime, jobid, opcode, status,
547
                           job_fields):
548
    assert (opcode == "OP_NETWORK_SET_PARAMS")
549
    if status not in [x[0] for x in BACKEND_STATUSES]:
550
        raise Network.InvalidBackendMsgError(opcode, status)
551

    
552
    back_network.backendjobid = jobid
553
    back_network.backendjobstatus = status
554
    back_network.opcode = opcode
555

    
556
    add_reserved_ips = job_fields.get("add_reserved_ips")
557
    if add_reserved_ips:
558
        network = back_network.network
559
        for ip in add_reserved_ips:
560
            network.reserve_address(ip, external=True)
561

    
562
    if status == rapi.JOB_STATUS_SUCCESS:
563
        back_network.backendtime = etime
564
    back_network.save()
565

    
566

    
567
@transaction.commit_on_success
568
def process_create_progress(vm, etime, progress):
569

    
570
    percentage = int(progress)
571

    
572
    # The percentage may exceed 100%, due to the way
573
    # snf-image:copy-progress tracks bytes read by image handling processes
574
    percentage = 100 if percentage > 100 else percentage
575
    if percentage < 0:
576
        raise ValueError("Percentage cannot be negative")
577

    
578
    # FIXME: log a warning here, see #1033
579
#   if last_update > percentage:
580
#       raise ValueError("Build percentage should increase monotonically " \
581
#                        "(old = %d, new = %d)" % (last_update, percentage))
582

    
583
    # This assumes that no message of type 'ganeti-create-progress' is going to
584
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
585
    # the instance is STARTED.  What if the two messages are processed by two
586
    # separate dispatcher threads, and the 'ganeti-op-status' message for
587
    # successful creation gets processed before the 'ganeti-create-progress'
588
    # message? [vkoukis]
589
    #
590
    #if not vm.operstate == 'BUILD':
591
    #    raise VirtualMachine.IllegalState("VM is not in building state")
592

    
593
    vm.buildpercentage = percentage
594
    vm.backendtime = etime
595
    vm.save()
596

    
597

    
598
@transaction.commit_on_success
599
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
600
                               details=None):
601
    """
602
    Create virtual machine instance diagnostic entry.
603

604
    :param vm: VirtualMachine instance to create diagnostic for.
605
    :param message: Diagnostic message.
606
    :param source: Diagnostic source identifier (e.g. image-helper).
607
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
608
    :param etime: The time the message occured (if available).
609
    :param details: Additional details or debug information.
610
    """
611
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
612
                                                   source_date=etime,
613
                                                   message=message,
614
                                                   details=details)
615

    
616

    
617
def create_instance(vm, nics, flavor, image):
618
    """`image` is a dictionary which should contain the keys:
619
            'backend_id', 'format' and 'metadata'
620

621
        metadata value should be a dictionary.
622
    """
623

    
624
    # Handle arguments to CreateInstance() as a dictionary,
625
    # initialize it based on a deployment-specific value.
626
    # This enables the administrator to override deployment-specific
627
    # arguments, such as the disk template to use, name of os provider
628
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
629
    #
630
    kw = vm.backend.get_create_params()
631
    kw['mode'] = 'create'
632
    kw['name'] = vm.backend_vm_id
633
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
634

    
635
    kw['disk_template'] = flavor.disk_template
636
    kw['disks'] = [{"size": flavor.disk * 1024}]
637
    provider = flavor.disk_provider
638
    if provider:
639
        kw['disks'][0]['provider'] = provider
640
        kw['disks'][0]['origin'] = flavor.disk_origin
641
        extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS.get(provider)
642
        if extra_disk_params is not None:
643
            kw["disks"][0].update(extra_disk_params)
644

    
645
    kw['nics'] = [{"name": nic.backend_uuid,
646
                   "network": nic.network.backend_id,
647
                   "ip": nic.ipv4_address}
648
                  for nic in nics]
649

    
650
    backend = vm.backend
651
    depend_jobs = []
652
    for nic in nics:
653
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
654
        depend_jobs.extend(job_ids)
655

    
656
    kw["depends"] = create_job_dependencies(depend_jobs)
657

    
658
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
659
    # kw['os'] = settings.GANETI_OS_PROVIDER
660
    kw['ip_check'] = False
661
    kw['name_check'] = False
662

    
663
    # Do not specific a node explicitly, have
664
    # Ganeti use an iallocator instead
665
    #kw['pnode'] = rapi.GetNodes()[0]
666

    
667
    kw['dry_run'] = settings.TEST
668

    
669
    kw['beparams'] = {
670
        'auto_balance': True,
671
        'vcpus': flavor.cpu,
672
        'memory': flavor.ram}
673

    
674
    kw['osparams'] = {
675
        'config_url': vm.config_url,
676
        # Store image id and format to Ganeti
677
        'img_id': image['backend_id'],
678
        'img_format': image['format']}
679

    
680
    # Use opportunistic locking
681
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
682

    
683
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
684
    # kw['hvparams'] = dict(serial_console=False)
685

    
686
    log.debug("Creating instance %s", utils.hide_pass(kw))
687
    with pooled_rapi_client(vm) as client:
688
        return client.CreateInstance(**kw)
689

    
690

    
691
def delete_instance(vm, shutdown_timeout=None):
692
    with pooled_rapi_client(vm) as client:
693
        return client.DeleteInstance(vm.backend_vm_id,
694
                                     shutdown_timeout=shutdown_timeout,
695
                                     dry_run=settings.TEST)
696

    
697

    
698
def reboot_instance(vm, reboot_type, shutdown_timeout=None):
699
    assert reboot_type in ('soft', 'hard')
700
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
701
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
702
    # 'shutdown_timeout'.
703
    kwargs = {"instance": vm.backend_vm_id,
704
              "reboot_type": "hard"}
705
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
706
    # Ganeti > 2.10. In other versions this parameter will be ignored and
707
    # we will fallback to default timeout of Ganeti (120s).
708
    if shutdown_timeout is not None:
709
        kwargs["shutdown_timeout"] = shutdown_timeout
710
    if reboot_type == "hard":
711
        kwargs["shutdown_timeout"] = 0
712
    if settings.TEST:
713
        kwargs["dry_run"] = True
714
    with pooled_rapi_client(vm) as client:
715
        return client.RebootInstance(**kwargs)
716

    
717

    
718
def startup_instance(vm):
719
    with pooled_rapi_client(vm) as client:
720
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
721

    
722

    
723
def shutdown_instance(vm, shutdown_timeout=None):
724
    with pooled_rapi_client(vm) as client:
725
        return client.ShutdownInstance(vm.backend_vm_id,
726
                                       timeout=shutdown_timeout,
727
                                       dry_run=settings.TEST)
728

    
729

    
730
def resize_instance(vm, vcpus, memory):
731
    beparams = {"vcpus": int(vcpus),
732
                "minmem": int(memory),
733
                "maxmem": int(memory)}
734
    with pooled_rapi_client(vm) as client:
735
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
736

    
737

    
738
def get_instance_console(vm):
739
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
740
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
741
    # useless (see #783).
742
    #
743
    # Until this is fixed on the Ganeti side, construct a console info reply
744
    # directly.
745
    #
746
    # WARNING: This assumes that VNC runs on port network_port on
747
    #          the instance's primary node, and is probably
748
    #          hypervisor-specific.
749
    #
750
    log.debug("Getting console for vm %s", vm)
751

    
752
    console = {}
753
    console['kind'] = 'vnc'
754

    
755
    with pooled_rapi_client(vm) as client:
756
        i = client.GetInstance(vm.backend_vm_id)
757

    
758
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
759
        raise Exception("hv parameter serial_console cannot be true")
760
    console['host'] = i['pnode']
761
    console['port'] = i['network_port']
762

    
763
    return console
764

    
765

    
766
def get_instance_info(vm):
767
    with pooled_rapi_client(vm) as client:
768
        return client.GetInstance(vm.backend_vm_id)
769

    
770

    
771
def vm_exists_in_backend(vm):
772
    try:
773
        get_instance_info(vm)
774
        return True
775
    except rapi.GanetiApiError as e:
776
        if e.code == 404:
777
            return False
778
        raise e
779

    
780

    
781
def get_network_info(backend_network):
782
    with pooled_rapi_client(backend_network) as client:
783
        return client.GetNetwork(backend_network.network.backend_id)
784

    
785

    
786
def network_exists_in_backend(backend_network):
787
    try:
788
        get_network_info(backend_network)
789
        return True
790
    except rapi.GanetiApiError as e:
791
        if e.code == 404:
792
            return False
793

    
794

    
795
def job_is_still_running(vm, job_id=None):
796
    with pooled_rapi_client(vm) as c:
797
        try:
798
            if job_id is None:
799
                job_id = vm.backendjobid
800
            job_info = c.GetJobStatus(job_id)
801
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
802
        except rapi.GanetiApiError:
803
            return False
804

    
805

    
806
def nic_is_stale(vm, nic, timeout=60):
807
    """Check if a NIC is stale or exists in the Ganeti backend."""
808
    # First check the state of the NIC and if there is a pending CONNECT
809
    if nic.state == "BUILD" and vm.task == "CONNECT":
810
        if datetime.now() < nic.created + timedelta(seconds=timeout):
811
            # Do not check for too recent NICs to avoid the time overhead
812
            return False
813
        if job_is_still_running(vm, job_id=vm.task_job_id):
814
            return False
815
        else:
816
            # If job has finished, check that the NIC exists, because the
817
            # message may have been lost or stuck in the queue.
818
            vm_info = get_instance_info(vm)
819
            if nic.backend_uuid in vm_info["nic.names"]:
820
                return False
821
    return True
822

    
823

    
824
def ensure_network_is_active(backend, network_id):
825
    """Ensure that a network is active in the specified backend
826

827
    Check that a network exists and is active in the specified backend. If not
828
    (re-)create the network. Return the corresponding BackendNetwork object
829
    and the IDs of the Ganeti job to create the network.
830

831
    """
832
    job_ids = []
833
    try:
834
        bnet = BackendNetwork.objects.select_related("network")\
835
                                     .get(backend=backend, network=network_id)
836
        if bnet.operstate != "ACTIVE":
837
            job_ids = create_network(bnet.network, backend, connect=True)
838
    except BackendNetwork.DoesNotExist:
839
        network = Network.objects.select_for_update().get(id=network_id)
840
        bnet = BackendNetwork.objects.create(backend=backend, network=network)
841
        job_ids = create_network(network, backend, connect=True)
842

    
843
    return bnet, job_ids
844

    
845

    
846
def create_network(network, backend, connect=True):
847
    """Create a network in a Ganeti backend"""
848
    log.debug("Creating network %s in backend %s", network, backend)
849

    
850
    job_id = _create_network(network, backend)
851

    
852
    if connect:
853
        job_ids = connect_network(network, backend, depends=[job_id])
854
        return job_ids
855
    else:
856
        return [job_id]
857

    
858

    
859
def _create_network(network, backend):
860
    """Create a network."""
861

    
862
    tags = network.backend_tag
863
    subnet = None
864
    subnet6 = None
865
    gateway = None
866
    gateway6 = None
867
    for _subnet in network.subnets.all():
868
        if _subnet.dhcp and not "nfdhcpd" in tags:
869
            tags.append("nfdhcpd")
870
        if _subnet.ipversion == 4:
871
            subnet = _subnet.cidr
872
            gateway = _subnet.gateway
873
        elif _subnet.ipversion == 6:
874
            subnet6 = _subnet.cidr
875
            gateway6 = _subnet.gateway
876

    
877
    conflicts_check = False
878
    if network.public:
879
        tags.append('public')
880
        if subnet is not None:
881
            conflicts_check = True
882
    else:
883
        tags.append('private')
884

    
885
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
886
    # not support IPv6 only networks. To bypass this limitation, we create the
887
    # network with a dummy network subnet, and make Cyclades connect instances
888
    # to such networks, with address=None.
889
    if subnet is None:
890
        subnet = "10.0.0.0/29"
891

    
892
    try:
893
        bn = BackendNetwork.objects.get(network=network, backend=backend)
894
        mac_prefix = bn.mac_prefix
895
    except BackendNetwork.DoesNotExist:
896
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
897
                        " does not exist" % (network.id, backend.id))
898

    
899
    with pooled_rapi_client(backend) as client:
900
        return client.CreateNetwork(network_name=network.backend_id,
901
                                    network=subnet,
902
                                    network6=subnet6,
903
                                    gateway=gateway,
904
                                    gateway6=gateway6,
905
                                    mac_prefix=mac_prefix,
906
                                    conflicts_check=conflicts_check,
907
                                    tags=tags)
908

    
909

    
910
def connect_network(network, backend, depends=[], group=None):
911
    """Connect a network to nodegroups."""
912
    log.debug("Connecting network %s to backend %s", network, backend)
913

    
914
    conflicts_check = False
915
    if network.public and (network.subnet4 is not None):
916
        conflicts_check = True
917

    
918
    depends = create_job_dependencies(depends)
919
    with pooled_rapi_client(backend) as client:
920
        groups = [group] if group is not None else client.GetGroups()
921
        job_ids = []
922
        for group in groups:
923
            job_id = client.ConnectNetwork(network.backend_id, group,
924
                                           network.mode, network.link,
925
                                           conflicts_check,
926
                                           depends=depends)
927
            job_ids.append(job_id)
928
    return job_ids
929

    
930

    
931
def delete_network(network, backend, disconnect=True):
932
    log.debug("Deleting network %s from backend %s", network, backend)
933

    
934
    depends = []
935
    if disconnect:
936
        depends = disconnect_network(network, backend)
937
    _delete_network(network, backend, depends=depends)
938

    
939

    
940
def _delete_network(network, backend, depends=[]):
941
    depends = create_job_dependencies(depends)
942
    with pooled_rapi_client(backend) as client:
943
        return client.DeleteNetwork(network.backend_id, depends)
944

    
945

    
946
def disconnect_network(network, backend, group=None):
947
    log.debug("Disconnecting network %s to backend %s", network, backend)
948

    
949
    with pooled_rapi_client(backend) as client:
950
        groups = [group] if group is not None else client.GetGroups()
951
        job_ids = []
952
        for group in groups:
953
            job_id = client.DisconnectNetwork(network.backend_id, group)
954
            job_ids.append(job_id)
955
    return job_ids
956

    
957

    
958
def connect_to_network(vm, nic):
959
    network = nic.network
960
    backend = vm.backend
961
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
962

    
963
    depends = create_job_dependencies(depend_jobs)
964

    
965
    nic = {'name': nic.backend_uuid,
966
           'network': network.backend_id,
967
           'ip': nic.ipv4_address}
968

    
969
    log.debug("Adding NIC %s to VM %s", nic, vm)
970

    
971
    kwargs = {
972
        "instance": vm.backend_vm_id,
973
        "nics": [("add", "-1", nic)],
974
        "depends": depends,
975
    }
976
    if vm.backend.use_hotplug():
977
        kwargs["hotplug_if_possible"] = True
978
    if settings.TEST:
979
        kwargs["dry_run"] = True
980

    
981
    with pooled_rapi_client(vm) as client:
982
        return client.ModifyInstance(**kwargs)
983

    
984

    
985
def disconnect_from_network(vm, nic):
986
    log.debug("Removing NIC %s of VM %s", nic, vm)
987

    
988
    kwargs = {
989
        "instance": vm.backend_vm_id,
990
        "nics": [("remove", nic.backend_uuid, {})],
991
    }
992
    if vm.backend.use_hotplug():
993
        kwargs["hotplug_if_possible"] = True
994
    if settings.TEST:
995
        kwargs["dry_run"] = True
996

    
997
    with pooled_rapi_client(vm) as client:
998
        jobID = client.ModifyInstance(**kwargs)
999
        firewall_profile = nic.firewall_profile
1000
        if firewall_profile and firewall_profile != "DISABLED":
1001
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
1002
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
1003
                                      dry_run=settings.TEST)
1004

    
1005
        return jobID
1006

    
1007

    
1008
def set_firewall_profile(vm, profile, nic):
1009
    uuid = nic.backend_uuid
1010
    try:
1011
        tag = _firewall_tags[profile] % uuid
1012
    except KeyError:
1013
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
1014

    
1015
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
1016

    
1017
    with pooled_rapi_client(vm) as client:
1018
        # Delete previous firewall tags
1019
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
1020
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
1021
                       if (t % uuid) in old_tags]
1022
        if delete_tags:
1023
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1024
                                      dry_run=settings.TEST)
1025

    
1026
        if profile != "DISABLED":
1027
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1028
                                   dry_run=settings.TEST)
1029

    
1030
        # XXX NOP ModifyInstance call to force process_net_status to run
1031
        # on the dispatcher
1032
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1033
        client.ModifyInstance(vm.backend_vm_id,
1034
                              os_name=os_name)
1035
    return None
1036

    
1037

    
1038
def get_instances(backend, bulk=True):
1039
    with pooled_rapi_client(backend) as c:
1040
        return c.GetInstances(bulk=bulk)
1041

    
1042

    
1043
def get_nodes(backend, bulk=True):
1044
    with pooled_rapi_client(backend) as c:
1045
        return c.GetNodes(bulk=bulk)
1046

    
1047

    
1048
def get_jobs(backend, bulk=True):
1049
    with pooled_rapi_client(backend) as c:
1050
        return c.GetJobs(bulk=bulk)
1051

    
1052

    
1053
def get_physical_resources(backend):
1054
    """ Get the physical resources of a backend.
1055

1056
    Get the resources of a backend as reported by the backend (not the db).
1057

1058
    """
1059
    nodes = get_nodes(backend, bulk=True)
1060
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1061
    res = {}
1062
    for a in attr:
1063
        res[a] = 0
1064
    for n in nodes:
1065
        # Filter out drained, offline and not vm_capable nodes since they will
1066
        # not take part in the vm allocation process
1067
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1068
        if can_host_vms and n['cnodes']:
1069
            for a in attr:
1070
                res[a] += int(n[a] or 0)
1071
    return res
1072

    
1073

    
1074
def update_backend_resources(backend, resources=None):
1075
    """ Update the state of the backend resources in db.
1076

1077
    """
1078

    
1079
    if not resources:
1080
        resources = get_physical_resources(backend)
1081

    
1082
    backend.mfree = resources['mfree']
1083
    backend.mtotal = resources['mtotal']
1084
    backend.dfree = resources['dfree']
1085
    backend.dtotal = resources['dtotal']
1086
    backend.pinst_cnt = resources['pinst_cnt']
1087
    backend.ctotal = resources['ctotal']
1088
    backend.updated = datetime.now()
1089
    backend.save()
1090

    
1091

    
1092
def get_memory_from_instances(backend):
1093
    """ Get the memory that is used from instances.
1094

1095
    Get the used memory of a backend. Note: This is different for
1096
    the real memory used, due to kvm's memory de-duplication.
1097

1098
    """
1099
    with pooled_rapi_client(backend) as client:
1100
        instances = client.GetInstances(bulk=True)
1101
    mem = 0
1102
    for i in instances:
1103
        mem += i['oper_ram']
1104
    return mem
1105

    
1106

    
1107
def get_available_disk_templates(backend):
1108
    """Get the list of available disk templates of a Ganeti backend.
1109

1110
    The list contains the disk templates that are enabled in the Ganeti backend
1111
    and also included in ipolicy-disk-templates.
1112

1113
    """
1114
    with pooled_rapi_client(backend) as c:
1115
        info = c.GetInfo()
1116
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1117
    try:
1118
        enabled_disk_templates = info["enabled_disk_templates"]
1119
        return [dp for dp in enabled_disk_templates
1120
                if dp in ipolicy_disk_templates]
1121
    except KeyError:
1122
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1123
        return ipolicy_disk_templates
1124

    
1125

    
1126
def update_backend_disk_templates(backend):
1127
    disk_templates = get_available_disk_templates(backend)
1128
    backend.disk_templates = disk_templates
1129
    backend.save()
1130

    
1131

    
1132
##
1133
## Synchronized operations for reconciliation
1134
##
1135

    
1136

    
1137
def create_network_synced(network, backend):
1138
    result = _create_network_synced(network, backend)
1139
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1140
        return result
1141
    result = connect_network_synced(network, backend)
1142
    return result
1143

    
1144

    
1145
def _create_network_synced(network, backend):
1146
    with pooled_rapi_client(backend) as client:
1147
        job = _create_network(network, backend)
1148
        result = wait_for_job(client, job)
1149
    return result
1150

    
1151

    
1152
def connect_network_synced(network, backend):
1153
    with pooled_rapi_client(backend) as client:
1154
        for group in client.GetGroups():
1155
            job = client.ConnectNetwork(network.backend_id, group,
1156
                                        network.mode, network.link)
1157
            result = wait_for_job(client, job)
1158
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1159
                return result
1160

    
1161
    return result
1162

    
1163

    
1164
def wait_for_job(client, jobid):
1165
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1166
    status = result['job_info'][0]
1167
    while status not in rapi.JOB_STATUS_FINALIZED:
1168
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1169
                                         [result], None)
1170
        status = result['job_info'][0]
1171

    
1172
    if status == rapi.JOB_STATUS_SUCCESS:
1173
        return (status, None)
1174
    else:
1175
        error = result['job_info'][1]
1176
        return (status, error)
1177

    
1178

    
1179
def create_job_dependencies(job_ids=[], job_states=None):
1180
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1181
    if job_states is None:
1182
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1183
    assert(type(job_states) == list)
1184
    return [[job_id, job_states] for job_id in job_ids]