Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 1316db51

History | View | Annotate | Download (44.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
59
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
60
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
UNKNOWN_NIC_PREFIX = "unknown-"
62

    
63

    
64
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
65
    """Handle quotas for updated VirtualMachine.
66

67
    Update quotas for the updated VirtualMachine based on the job that run on
68
    the Ganeti backend. If a commission has been already issued for this job,
69
    then this commission is just accepted or rejected based on the job status.
70
    Otherwise, a new commission for the given change is issued, that is also in
71
    force and auto-accept mode. In this case, previous commissions are
72
    rejected, since they reflect a previous state of the VM.
73

74
    """
75
    if job_status not in rapi.JOB_STATUS_FINALIZED:
76
        return vm
77

    
78
    # Check successful completion of a job will trigger any quotable change in
79
    # the VM state.
80
    action = utils.get_action_from_opcode(job_opcode, job_fields)
81
    if action == "BUILD":
82
        # Quotas for new VMs are automatically accepted by the API
83
        return vm
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == rapi.JOB_STATUS_SUCCESS:
92
            quotas.accept_resource_serial(vm)
93
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_resource_serial(vm)
97
    elif job_status == rapi.JOB_STATUS_SUCCESS:
98
        commission_info = quotas.get_commission_info(resource=vm,
99
                                                     action=action,
100
                                                     action_fields=job_fields)
101
        if commission_info is not None:
102
            # Commission for this change has not been issued, or the issued
103
            # commission was unaware of the current change. Reject all previous
104
            # commissions and create a new one in forced mode!
105
            log.debug("Expected job was %s. Processing job %s. "
106
                      "Attached serial %s",
107
                      vm.task_job_id, job_id, vm.serial)
108
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
109
                      % (vm, job_id))
110
            serial = quotas.handle_resource_commission(
111
                vm, action,
112
                action_fields=job_fields,
113
                commission_name=reason,
114
                force=True,
115
                auto_accept=True)
116
            log.debug("Issued new commission: %s", serial)
117
    return vm
118

    
119

    
120
@transaction.commit_on_success
121
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
123
    """Process a job progress notification from the backend
124

125
    Process an incoming message from the backend (currently Ganeti).
126
    Job notifications with a terminating status (sucess, error, or canceled),
127
    also update the operating state of the VM.
128

129
    """
130
    # See #1492, #1031, #1111 why this line has been removed
131
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
132
    if status not in [x[0] for x in BACKEND_STATUSES]:
133
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
134

    
135
    vm.backendjobid = jobid
136
    vm.backendjobstatus = status
137
    vm.backendopcode = opcode
138
    vm.backendlogmsg = logmsg
139

    
140
    if status not in rapi.JOB_STATUS_FINALIZED:
141
        vm.save()
142
        return
143

    
144
    if job_fields is None:
145
        job_fields = {}
146

    
147
    new_operstate = None
148
    new_flavor = None
149
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
150

    
151
    if status == rapi.JOB_STATUS_SUCCESS:
152
        # If job succeeds, change operating state if needed
153
        if state_for_success is not None:
154
            new_operstate = state_for_success
155

    
156
        beparams = job_fields.get("beparams", None)
157
        if beparams:
158
            # Change the flavor of the VM
159
            new_flavor = _process_resize(vm, beparams)
160

    
161
        # Update backendtime only for jobs that have been successfully
162
        # completed, since only these jobs update the state of the VM. Else a
163
        # "race condition" may occur when a successful job (e.g.
164
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
165
        # in reversed order.
166
        vm.backendtime = etime
167

    
168
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
169
        # Update the NICs of the VM
170
        _process_net_status(vm, etime, nics)
171

    
172
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
173
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
174
                                                     rapi.JOB_STATUS_ERROR):
175
        new_operstate = "ERROR"
176
        vm.backendtime = etime
177
        # Update state of associated NICs
178
        vm.nics.all().update(state="ERROR")
179
    elif opcode == 'OP_INSTANCE_REMOVE':
180
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
181
        # when no instance exists at the Ganeti backend.
182
        # See ticket #799 for all the details.
183
        if (status == rapi.JOB_STATUS_SUCCESS or
184
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
185
            # VM has been deleted
186
            for nic in vm.nics.all():
187
                # Release the IP
188
                remove_nic_ips(nic)
189
                # And delete the NIC.
190
                nic.delete()
191
            vm.deleted = True
192
            new_operstate = state_for_success
193
            vm.backendtime = etime
194
            status = rapi.JOB_STATUS_SUCCESS
195

    
196
    if status in rapi.JOB_STATUS_FINALIZED:
197
        # Job is finalized: Handle quotas/commissioning
198
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
199
                              job_status=status, job_fields=job_fields)
200
        # and clear task fields
201
        if vm.task_job_id == jobid:
202
            vm.task = None
203
            vm.task_job_id = None
204

    
205
    if new_operstate is not None:
206
        vm.operstate = new_operstate
207
    if new_flavor is not None:
208
        vm.flavor = new_flavor
209

    
210
    vm.save()
211

    
212

    
213
def _process_resize(vm, beparams):
214
    """Change flavor of a VirtualMachine based on new beparams."""
215
    old_flavor = vm.flavor
216
    vcpus = beparams.get("vcpus", old_flavor.cpu)
217
    ram = beparams.get("maxmem", old_flavor.ram)
218
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
219
        return
220
    try:
221
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
222
                                        disk=old_flavor.disk,
223
                                        disk_template=old_flavor.disk_template)
224
    except Flavor.DoesNotExist:
225
        raise Exception("Cannot find flavor for VM")
226
    return new_flavor
227

    
228

    
229
@transaction.commit_on_success
230
def process_net_status(vm, etime, nics):
231
    """Wrap _process_net_status inside transaction."""
232
    _process_net_status(vm, etime, nics)
233

    
234

    
235
def _process_net_status(vm, etime, nics):
236
    """Process a net status notification from the backend
237

238
    Process an incoming message from the Ganeti backend,
239
    detailing the NIC configuration of a VM instance.
240

241
    Update the state of the VM in the DB accordingly.
242

243
    """
244
    ganeti_nics = process_ganeti_nics(nics)
245
    db_nics = dict([(nic.id, nic)
246
                    for nic in vm.nics.select_related("network")
247
                                      .prefetch_related("ips")])
248

    
249
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
250
        db_nic = db_nics.get(nic_name)
251
        ganeti_nic = ganeti_nics.get(nic_name)
252
        if ganeti_nic is None:
253
            if nic_is_stale(vm, nic):
254
                log.debug("Removing stale NIC '%s'" % db_nic)
255
                remove_nic_ips(db_nic)
256
                db_nic.delete()
257
            else:
258
                log.info("NIC '%s' is still being created" % db_nic)
259
        elif db_nic is None:
260
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
261
                   " fix this issue!" % (nic_name, vm))
262
            log.error(msg)
263
            continue
264
        elif not nics_are_equal(db_nic, ganeti_nic):
265
            for f in SIMPLE_NIC_FIELDS:
266
                # Update the NIC in DB with the values from Ganeti NIC
267
                setattr(db_nic, f, ganeti_nic[f])
268
                db_nic.save()
269

    
270
            # Special case where the IPv4 address has changed, because you
271
            # need to release the old IPv4 address and reserve the new one
272
            gnt_ipv4_address = ganeti_nic["ipv4_address"]
273
            db_ipv4_address = db_nic.ipv4_address
274
            if db_ipv4_address != gnt_ipv4_address:
275
                change_address_of_port(db_nic, vm.userid,
276
                                       old_address=db_ipv4_address,
277
                                       new_address=gnt_ipv4_address,
278
                                       version=4)
279

    
280
            gnt_ipv6_address = ganeti_nic["ipv6_address"]
281
            db_ipv6_address = db_nic.ipv6_address
282
            if db_ipv6_address != gnt_ipv6_address:
283
                change_address_of_port(db_nic, vm.userid,
284
                                       old_address=db_ipv6_address,
285
                                       new_address=gnt_ipv6_address,
286
                                       version=6)
287

    
288
    vm.backendtime = etime
289
    vm.save()
290

    
291

    
292
def change_address_of_port(port, userid, old_address, new_address, version):
293
    """Change."""
294
    if old_address is not None:
295
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
296
               % (version, port.machine_id, old_address, new_address))
297
        log.error(msg)
298

    
299
    # Remove the old IP address
300
    remove_nic_ips(port, version=version)
301

    
302
    if version == 4:
303
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
304
        ipaddress.nic = port
305
        ipaddress.save()
306
    elif version == 6:
307
        subnet6 = port.network.subnet6
308
        ipaddress = IPAddress.objects.create(userid=userid,
309
                                             network=port.network,
310
                                             subnet=subnet6,
311
                                             nic=port,
312
                                             address=new_address,
313
                                             ipversion=6)
314
    else:
315
        raise ValueError("Unknown version: %s" % version)
316

    
317
    # New address log
318
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
319
                                         network_id=port.network_id,
320
                                         address=new_address,
321
                                         active=True)
322
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
323
             ip_log.id, new_address, port.machine_id)
324

    
325
    return ipaddress
326

    
327

    
328
def nics_are_equal(db_nic, gnt_nic):
329
    for field in NIC_FIELDS:
330
        if getattr(db_nic, field) != gnt_nic[field]:
331
            return False
332
    return True
333

    
334

    
335
def process_ganeti_nics(ganeti_nics):
336
    """Process NIC dict from ganeti"""
337
    new_nics = []
338
    for index, gnic in enumerate(ganeti_nics):
339
        nic_name = gnic.get("name", None)
340
        if nic_name is not None:
341
            nic_id = utils.id_from_nic_name(nic_name)
342
        else:
343
            # Put as default value the index. If it is an unknown NIC to
344
            # synnefo it will be created automaticaly.
345
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
346
        network_name = gnic.get('network', '')
347
        network_id = utils.id_from_network_name(network_name)
348
        network = Network.objects.get(id=network_id)
349

    
350
        # Get the new nic info
351
        mac = gnic.get('mac')
352
        ipv4 = gnic.get('ip')
353
        subnet6 = network.subnet6
354
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
355

    
356
        firewall = gnic.get('firewall')
357
        firewall_profile = _reverse_tags.get(firewall)
358
        if not firewall_profile and network.public:
359
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
360

    
361
        nic_info = {
362
            'index': index,
363
            'network': network,
364
            'mac': mac,
365
            'ipv4_address': ipv4,
366
            'ipv6_address': ipv6,
367
            'firewall_profile': firewall_profile,
368
            'state': 'ACTIVE'}
369

    
370
        new_nics.append((nic_id, nic_info))
371
    return dict(new_nics)
372

    
373

    
374
def remove_nic_ips(nic, version=None):
375
    """Remove IP addresses associated with a NetworkInterface.
376

377
    Remove all IP addresses that are associated with the NetworkInterface
378
    object, by returning them to the pool and deleting the IPAddress object. If
379
    the IP is a floating IP, then it is just disassociated from the NIC.
380
    If version is specified, then only IP addressses of that version will be
381
    removed.
382

383
    """
384
    for ip in nic.ips.all():
385
        if version and ip.ipversion != version:
386
            continue
387

    
388
        # Update the DB table holding the logging of all IP addresses
389
        terminate_active_ipaddress_log(nic, ip)
390

    
391
        if ip.floating_ip:
392
            ip.nic = None
393
            ip.save()
394
        else:
395
            # Release the IPv4 address
396
            ip.release_address()
397
            ip.delete()
398

    
399

    
400
def terminate_active_ipaddress_log(nic, ip):
401
    """Update DB logging entry for this IP address."""
402
    if not ip.network.public or nic.machine is None:
403
        return
404
    try:
405
        ip_log, created = \
406
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
407
                                               network_id=ip.network_id,
408
                                               address=ip.address,
409
                                               active=True)
410
    except IPAddressLog.MultipleObjectsReturned:
411
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
412
                  "Server %s. Cannot proceed!"
413
                  % (ip.address, ip.network, nic.machine))
414
        log.error(logmsg)
415
        raise
416

    
417
    if created:
418
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
419
                  " but with wrong creation timestamp."
420
                  % (ip.address, ip.network, nic.machine))
421
        log.error(logmsg)
422
    ip_log.released_at = datetime.now()
423
    ip_log.active = False
424
    ip_log.save()
425

    
426

    
427
@transaction.commit_on_success
428
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
429
    if status not in [x[0] for x in BACKEND_STATUSES]:
430
        raise Network.InvalidBackendMsgError(opcode, status)
431

    
432
    back_network.backendjobid = jobid
433
    back_network.backendjobstatus = status
434
    back_network.backendopcode = opcode
435
    back_network.backendlogmsg = logmsg
436

    
437
    # Note: Network is already locked!
438
    network = back_network.network
439

    
440
    # Notifications of success change the operating state
441
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
442
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
443
        back_network.operstate = state_for_success
444

    
445
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
446
       and opcode == 'OP_NETWORK_ADD'):
447
        back_network.operstate = 'ERROR'
448
        back_network.backendtime = etime
449

    
450
    if opcode == 'OP_NETWORK_REMOVE':
451
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
452
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
453
                                  network_exists_in_backend(back_network)):
454
            back_network.operstate = state_for_success
455
            back_network.deleted = True
456
            back_network.backendtime = etime
457

    
458
    if status == rapi.JOB_STATUS_SUCCESS:
459
        back_network.backendtime = etime
460
    back_network.save()
461
    # Also you must update the state of the Network!!
462
    update_network_state(network)
463

    
464

    
465
def update_network_state(network):
466
    """Update the state of a Network based on BackendNetwork states.
467

468
    Update the state of a Network based on the operstate of the networks in the
469
    backends that network exists.
470

471
    The state of the network is:
472
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
473
    * DELETED: If it is is 'DELETED' in all backends that have been created.
474

475
    This function also releases the resources (MAC prefix or Bridge) and the
476
    quotas for the network.
477

478
    """
479
    if network.deleted:
480
        # Network has already been deleted. Just assert that state is also
481
        # DELETED
482
        if not network.state == "DELETED":
483
            network.state = "DELETED"
484
            network.save()
485
        return
486

    
487
    backend_states = [s.operstate for s in network.backend_networks.all()]
488
    if not backend_states and network.action != "DESTROY":
489
        if network.state != "ACTIVE":
490
            network.state = "ACTIVE"
491
            network.save()
492
            return
493

    
494
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
495
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
496
                     "DELETED")
497

    
498
    # Release the resources on the deletion of the Network
499
    if deleted:
500
        if network.ips.filter(deleted=False, floating_ip=True).exists():
501
            msg = "Cannot delete network %s! Floating IPs still in use!"
502
            log.error(msg % network)
503
            raise Exception(msg % network)
504
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
505
                 network.id, network.mac_prefix, network.link)
506
        network.deleted = True
507
        network.state = "DELETED"
508
        # Undrain the network, otherwise the network state will remain
509
        # as 'SNF:DRAINED'
510
        network.drained = False
511
        if network.mac_prefix:
512
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
513
                release_resource(res_type="mac_prefix",
514
                                 value=network.mac_prefix)
515
        if network.link:
516
            if network.FLAVORS[network.flavor]["link"] == "pool":
517
                release_resource(res_type="bridge", value=network.link)
518

    
519
        # Set all subnets as deleted
520
        network.subnets.update(deleted=True)
521
        # And delete the IP pools
522
        for subnet in network.subnets.all():
523
            if subnet.ipversion == 4:
524
                subnet.ip_pools.all().delete()
525
        # And all the backend networks since there are useless
526
        network.backend_networks.all().delete()
527

    
528
        # Issue commission
529
        if network.userid:
530
            quotas.issue_and_accept_commission(network, action="DESTROY")
531
            # the above has already saved the object and committed;
532
            # a second save would override others' changes, since the
533
            # object is now unlocked
534
            return
535
        elif not network.public:
536
            log.warning("Network %s does not have an owner!", network.id)
537
    network.save()
538

    
539

    
540
@transaction.commit_on_success
541
def process_network_modify(back_network, etime, jobid, opcode, status,
542
                           job_fields):
543
    assert (opcode == "OP_NETWORK_SET_PARAMS")
544
    if status not in [x[0] for x in BACKEND_STATUSES]:
545
        raise Network.InvalidBackendMsgError(opcode, status)
546

    
547
    back_network.backendjobid = jobid
548
    back_network.backendjobstatus = status
549
    back_network.opcode = opcode
550

    
551
    add_reserved_ips = job_fields.get("add_reserved_ips")
552
    if add_reserved_ips:
553
        network = back_network.network
554
        for ip in add_reserved_ips:
555
            network.reserve_address(ip, external=True)
556

    
557
    if status == rapi.JOB_STATUS_SUCCESS:
558
        back_network.backendtime = etime
559
    back_network.save()
560

    
561

    
562
@transaction.commit_on_success
563
def process_create_progress(vm, etime, progress):
564

    
565
    percentage = int(progress)
566

    
567
    # The percentage may exceed 100%, due to the way
568
    # snf-image:copy-progress tracks bytes read by image handling processes
569
    percentage = 100 if percentage > 100 else percentage
570
    if percentage < 0:
571
        raise ValueError("Percentage cannot be negative")
572

    
573
    # FIXME: log a warning here, see #1033
574
#   if last_update > percentage:
575
#       raise ValueError("Build percentage should increase monotonically " \
576
#                        "(old = %d, new = %d)" % (last_update, percentage))
577

    
578
    # This assumes that no message of type 'ganeti-create-progress' is going to
579
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
580
    # the instance is STARTED.  What if the two messages are processed by two
581
    # separate dispatcher threads, and the 'ganeti-op-status' message for
582
    # successful creation gets processed before the 'ganeti-create-progress'
583
    # message? [vkoukis]
584
    #
585
    #if not vm.operstate == 'BUILD':
586
    #    raise VirtualMachine.IllegalState("VM is not in building state")
587

    
588
    vm.buildpercentage = percentage
589
    vm.backendtime = etime
590
    vm.save()
591

    
592

    
593
@transaction.commit_on_success
594
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
595
                               details=None):
596
    """
597
    Create virtual machine instance diagnostic entry.
598

599
    :param vm: VirtualMachine instance to create diagnostic for.
600
    :param message: Diagnostic message.
601
    :param source: Diagnostic source identifier (e.g. image-helper).
602
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
603
    :param etime: The time the message occured (if available).
604
    :param details: Additional details or debug information.
605
    """
606
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
607
                                                   source_date=etime,
608
                                                   message=message,
609
                                                   details=details)
610

    
611

    
612
def create_instance(vm, nics, volumes, flavor, image):
613
    """`image` is a dictionary which should contain the keys:
614
            'backend_id', 'format' and 'metadata'
615

616
        metadata value should be a dictionary.
617
    """
618

    
619
    # Handle arguments to CreateInstance() as a dictionary,
620
    # initialize it based on a deployment-specific value.
621
    # This enables the administrator to override deployment-specific
622
    # arguments, such as the disk template to use, name of os provider
623
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
624
    #
625
    kw = vm.backend.get_create_params()
626
    kw['mode'] = 'create'
627
    kw['name'] = vm.backend_vm_id
628
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
629

    
630
    kw['disk_template'] = flavor.disk_template
631
    disks = []
632
    for volume in volumes:
633
        disk = {}
634
        disk["size"] = volume.size * 1024
635
        provider = flavor.disk_provider
636
        if provider is not None:
637
            disk["provider"] = provider
638
            disk["origin"] = volume.source_image["checksum"]
639
            extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS\
640
                                        .get(provider)
641
            if extra_disk_params is not None:
642
                disk.update(extra_disk_params)
643
        disks.append(disk)
644

    
645
    kw["disks"] = disks
646

    
647
    kw['nics'] = [{"name": nic.backend_uuid,
648
                   "network": nic.network.backend_id,
649
                   "ip": nic.ipv4_address}
650
                  for nic in nics]
651

    
652
    backend = vm.backend
653
    depend_jobs = []
654
    for nic in nics:
655
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
656
        depend_jobs.extend(job_ids)
657

    
658
    kw["depends"] = create_job_dependencies(depend_jobs)
659

    
660
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
661
    # kw['os'] = settings.GANETI_OS_PROVIDER
662
    kw['ip_check'] = False
663
    kw['name_check'] = False
664

    
665
    # Do not specific a node explicitly, have
666
    # Ganeti use an iallocator instead
667
    #kw['pnode'] = rapi.GetNodes()[0]
668

    
669
    kw['dry_run'] = settings.TEST
670

    
671
    kw['beparams'] = {
672
        'auto_balance': True,
673
        'vcpus': flavor.cpu,
674
        'memory': flavor.ram}
675

    
676
    kw['osparams'] = {
677
        'config_url': vm.config_url,
678
        # Store image id and format to Ganeti
679
        'img_id': image['backend_id'],
680
        'img_format': image['format']}
681

    
682
    # Use opportunistic locking
683
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
684

    
685
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
686
    # kw['hvparams'] = dict(serial_console=False)
687

    
688
    log.debug("Creating instance %s", utils.hide_pass(kw))
689
    with pooled_rapi_client(vm) as client:
690
        return client.CreateInstance(**kw)
691

    
692

    
693
def delete_instance(vm, shutdown_timeout=None):
694
    with pooled_rapi_client(vm) as client:
695
        return client.DeleteInstance(vm.backend_vm_id,
696
                                     shutdown_timeout=shutdown_timeout,
697
                                     dry_run=settings.TEST)
698

    
699

    
700
def reboot_instance(vm, reboot_type, shutdown_timeout=None):
701
    assert reboot_type in ('soft', 'hard')
702
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
703
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
704
    # 'shutdown_timeout'.
705
    kwargs = {"instance": vm.backend_vm_id,
706
              "reboot_type": "hard"}
707
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
708
    # Ganeti > 2.10. In other versions this parameter will be ignored and
709
    # we will fallback to default timeout of Ganeti (120s).
710
    if shutdown_timeout is not None:
711
        kwargs["shutdown_timeout"] = shutdown_timeout
712
    if reboot_type == "hard":
713
        kwargs["shutdown_timeout"] = 0
714
    if settings.TEST:
715
        kwargs["dry_run"] = True
716
    with pooled_rapi_client(vm) as client:
717
        return client.RebootInstance(**kwargs)
718

    
719

    
720
def startup_instance(vm):
721
    with pooled_rapi_client(vm) as client:
722
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
723

    
724

    
725
def shutdown_instance(vm, shutdown_timeout=None):
726
    with pooled_rapi_client(vm) as client:
727
        return client.ShutdownInstance(vm.backend_vm_id,
728
                                       timeout=shutdown_timeout,
729
                                       dry_run=settings.TEST)
730

    
731

    
732
def resize_instance(vm, vcpus, memory):
733
    beparams = {"vcpus": int(vcpus),
734
                "minmem": int(memory),
735
                "maxmem": int(memory)}
736
    with pooled_rapi_client(vm) as client:
737
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
738

    
739

    
740
def get_instance_console(vm):
741
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
742
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
743
    # useless (see #783).
744
    #
745
    # Until this is fixed on the Ganeti side, construct a console info reply
746
    # directly.
747
    #
748
    # WARNING: This assumes that VNC runs on port network_port on
749
    #          the instance's primary node, and is probably
750
    #          hypervisor-specific.
751
    #
752
    log.debug("Getting console for vm %s", vm)
753

    
754
    console = {}
755
    console['kind'] = 'vnc'
756

    
757
    with pooled_rapi_client(vm) as client:
758
        i = client.GetInstance(vm.backend_vm_id)
759

    
760
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
761
        raise Exception("hv parameter serial_console cannot be true")
762
    console['host'] = i['pnode']
763
    console['port'] = i['network_port']
764

    
765
    return console
766

    
767

    
768
def get_instance_info(vm):
769
    with pooled_rapi_client(vm) as client:
770
        return client.GetInstance(vm.backend_vm_id)
771

    
772

    
773
def vm_exists_in_backend(vm):
774
    try:
775
        get_instance_info(vm)
776
        return True
777
    except rapi.GanetiApiError as e:
778
        if e.code == 404:
779
            return False
780
        raise e
781

    
782

    
783
def get_network_info(backend_network):
784
    with pooled_rapi_client(backend_network) as client:
785
        return client.GetNetwork(backend_network.network.backend_id)
786

    
787

    
788
def network_exists_in_backend(backend_network):
789
    try:
790
        get_network_info(backend_network)
791
        return True
792
    except rapi.GanetiApiError as e:
793
        if e.code == 404:
794
            return False
795

    
796

    
797
def job_is_still_running(vm, job_id=None):
798
    with pooled_rapi_client(vm) as c:
799
        try:
800
            if job_id is None:
801
                job_id = vm.backendjobid
802
            job_info = c.GetJobStatus(job_id)
803
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
804
        except rapi.GanetiApiError:
805
            return False
806

    
807

    
808
def nic_is_stale(vm, nic, timeout=60):
809
    """Check if a NIC is stale or exists in the Ganeti backend."""
810
    # First check the state of the NIC and if there is a pending CONNECT
811
    if nic.state == "BUILD" and vm.task == "CONNECT":
812
        if datetime.now() < nic.created + timedelta(seconds=timeout):
813
            # Do not check for too recent NICs to avoid the time overhead
814
            return False
815
        if job_is_still_running(vm, job_id=vm.task_job_id):
816
            return False
817
        else:
818
            # If job has finished, check that the NIC exists, because the
819
            # message may have been lost or stuck in the queue.
820
            vm_info = get_instance_info(vm)
821
            if nic.backend_uuid in vm_info["nic.names"]:
822
                return False
823
    return True
824

    
825

    
826
def ensure_network_is_active(backend, network_id):
827
    """Ensure that a network is active in the specified backend
828

829
    Check that a network exists and is active in the specified backend. If not
830
    (re-)create the network. Return the corresponding BackendNetwork object
831
    and the IDs of the Ganeti job to create the network.
832

833
    """
834
    job_ids = []
835
    try:
836
        bnet = BackendNetwork.objects.select_related("network")\
837
                                     .get(backend=backend, network=network_id)
838
        if bnet.operstate != "ACTIVE":
839
            job_ids = create_network(bnet.network, backend, connect=True)
840
    except BackendNetwork.DoesNotExist:
841
        network = Network.objects.select_for_update().get(id=network_id)
842
        bnet = BackendNetwork.objects.create(backend=backend, network=network)
843
        job_ids = create_network(network, backend, connect=True)
844

    
845
    return bnet, job_ids
846

    
847

    
848
def create_network(network, backend, connect=True):
849
    """Create a network in a Ganeti backend"""
850
    log.debug("Creating network %s in backend %s", network, backend)
851

    
852
    job_id = _create_network(network, backend)
853

    
854
    if connect:
855
        job_ids = connect_network(network, backend, depends=[job_id])
856
        return job_ids
857
    else:
858
        return [job_id]
859

    
860

    
861
def _create_network(network, backend):
862
    """Create a network."""
863

    
864
    tags = network.backend_tag
865
    subnet = None
866
    subnet6 = None
867
    gateway = None
868
    gateway6 = None
869
    for _subnet in network.subnets.all():
870
        if _subnet.dhcp and not "nfdhcpd" in tags:
871
            tags.append("nfdhcpd")
872
        if _subnet.ipversion == 4:
873
            subnet = _subnet.cidr
874
            gateway = _subnet.gateway
875
        elif _subnet.ipversion == 6:
876
            subnet6 = _subnet.cidr
877
            gateway6 = _subnet.gateway
878

    
879
    conflicts_check = False
880
    if network.public:
881
        tags.append('public')
882
        if subnet is not None:
883
            conflicts_check = True
884
    else:
885
        tags.append('private')
886

    
887
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
888
    # not support IPv6 only networks. To bypass this limitation, we create the
889
    # network with a dummy network subnet, and make Cyclades connect instances
890
    # to such networks, with address=None.
891
    if subnet is None:
892
        subnet = "10.0.0.0/29"
893

    
894
    try:
895
        bn = BackendNetwork.objects.get(network=network, backend=backend)
896
        mac_prefix = bn.mac_prefix
897
    except BackendNetwork.DoesNotExist:
898
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
899
                        " does not exist" % (network.id, backend.id))
900

    
901
    with pooled_rapi_client(backend) as client:
902
        return client.CreateNetwork(network_name=network.backend_id,
903
                                    network=subnet,
904
                                    network6=subnet6,
905
                                    gateway=gateway,
906
                                    gateway6=gateway6,
907
                                    mac_prefix=mac_prefix,
908
                                    conflicts_check=conflicts_check,
909
                                    tags=tags)
910

    
911

    
912
def connect_network(network, backend, depends=[], group=None):
913
    """Connect a network to nodegroups."""
914
    log.debug("Connecting network %s to backend %s", network, backend)
915

    
916
    conflicts_check = False
917
    if network.public and (network.subnet4 is not None):
918
        conflicts_check = True
919

    
920
    depends = create_job_dependencies(depends)
921
    with pooled_rapi_client(backend) as client:
922
        groups = [group] if group is not None else client.GetGroups()
923
        job_ids = []
924
        for group in groups:
925
            job_id = client.ConnectNetwork(network.backend_id, group,
926
                                           network.mode, network.link,
927
                                           conflicts_check,
928
                                           depends=depends)
929
            job_ids.append(job_id)
930
    return job_ids
931

    
932

    
933
def delete_network(network, backend, disconnect=True):
934
    log.debug("Deleting network %s from backend %s", network, backend)
935

    
936
    depends = []
937
    if disconnect:
938
        depends = disconnect_network(network, backend)
939
    _delete_network(network, backend, depends=depends)
940

    
941

    
942
def _delete_network(network, backend, depends=[]):
943
    depends = create_job_dependencies(depends)
944
    with pooled_rapi_client(backend) as client:
945
        return client.DeleteNetwork(network.backend_id, depends)
946

    
947

    
948
def disconnect_network(network, backend, group=None):
949
    log.debug("Disconnecting network %s to backend %s", network, backend)
950

    
951
    with pooled_rapi_client(backend) as client:
952
        groups = [group] if group is not None else client.GetGroups()
953
        job_ids = []
954
        for group in groups:
955
            job_id = client.DisconnectNetwork(network.backend_id, group)
956
            job_ids.append(job_id)
957
    return job_ids
958

    
959

    
960
def connect_to_network(vm, nic):
961
    network = nic.network
962
    backend = vm.backend
963
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
964

    
965
    depends = create_job_dependencies(depend_jobs)
966

    
967
    nic = {'name': nic.backend_uuid,
968
           'network': network.backend_id,
969
           'ip': nic.ipv4_address}
970

    
971
    log.debug("Adding NIC %s to VM %s", nic, vm)
972

    
973
    kwargs = {
974
        "instance": vm.backend_vm_id,
975
        "nics": [("add", "-1", nic)],
976
        "depends": depends,
977
    }
978
    if vm.backend.use_hotplug():
979
        kwargs["hotplug_if_possible"] = True
980
    if settings.TEST:
981
        kwargs["dry_run"] = True
982

    
983
    with pooled_rapi_client(vm) as client:
984
        return client.ModifyInstance(**kwargs)
985

    
986

    
987
def disconnect_from_network(vm, nic):
988
    log.debug("Removing NIC %s of VM %s", nic, vm)
989

    
990
    kwargs = {
991
        "instance": vm.backend_vm_id,
992
        "nics": [("remove", nic.backend_uuid, {})],
993
    }
994
    if vm.backend.use_hotplug():
995
        kwargs["hotplug_if_possible"] = True
996
    if settings.TEST:
997
        kwargs["dry_run"] = True
998

    
999
    with pooled_rapi_client(vm) as client:
1000
        jobID = client.ModifyInstance(**kwargs)
1001
        firewall_profile = nic.firewall_profile
1002
        if firewall_profile and firewall_profile != "DISABLED":
1003
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
1004
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
1005
                                      dry_run=settings.TEST)
1006

    
1007
        return jobID
1008

    
1009

    
1010
def set_firewall_profile(vm, profile, nic):
1011
    uuid = nic.backend_uuid
1012
    try:
1013
        tag = _firewall_tags[profile] % uuid
1014
    except KeyError:
1015
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
1016

    
1017
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
1018

    
1019
    with pooled_rapi_client(vm) as client:
1020
        # Delete previous firewall tags
1021
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
1022
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
1023
                       if (t % uuid) in old_tags]
1024
        if delete_tags:
1025
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1026
                                      dry_run=settings.TEST)
1027

    
1028
        if profile != "DISABLED":
1029
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1030
                                   dry_run=settings.TEST)
1031

    
1032
        # XXX NOP ModifyInstance call to force process_net_status to run
1033
        # on the dispatcher
1034
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1035
        client.ModifyInstance(vm.backend_vm_id,
1036
                              os_name=os_name)
1037
    return None
1038

    
1039

    
1040
def attach_volume(vm, volume, depends=[]):
1041
    log.debug("Attaching volume %s to vm %s", vm, volume)
1042

    
1043
    disk = {"size": volume.size,
1044
            "name": volume.backend_volume_uuid,
1045
            "volume_name": volume.backend_volume_uuid}
1046
    if volume.source_volume_id is not None:
1047
        disk["origin"] = volume.source_volume.backend_volume_uuid
1048
    elif volume.source_snapshot is not None:
1049
        disk["origin"] = volume.source_snapshot["checksum"]
1050
    elif volume.source_image is not None:
1051
        disk["origin"] = volume.source_image["checksum"]
1052

    
1053
    kwargs = {
1054
        "instance": vm.backend_vm_id,
1055
        "disks": [("add", "-1", disk)],
1056
        "depends": depends,
1057
    }
1058
    if vm.backend.use_hotplug():
1059
        kwargs["hotplug"] = True
1060
    if settings.TEST:
1061
        kwargs["dry_run"] = True
1062

    
1063
    with pooled_rapi_client(vm) as client:
1064
        return client.ModifyInstance(**kwargs)
1065

    
1066

    
1067
def detach_volume(vm, volume):
1068
    log.debug("Removing volume %s from vm %s", volume, vm)
1069
    kwargs = {
1070
        "instance": vm.backend_vm_id,
1071
        "disks": [("remove", volume.backend_volume_uuid, {})],
1072
    }
1073
    if vm.backend.use_hotplug():
1074
        kwargs["hotplug"] = True
1075
    if settings.TEST:
1076
        kwargs["dry_run"] = True
1077

    
1078
    with pooled_rapi_client(vm) as client:
1079
        return client.ModifyInstance(**kwargs)
1080

    
1081

    
1082
def snapshot_instance(vm, snapshot_name):
1083
    #volume = instance.volumes.all()[0]
1084
    with pooled_rapi_client(vm) as client:
1085
        return client.SnapshotInstance(instance=vm.backend_vm_id,
1086
                                       snapshot_name=snapshot_name)
1087

    
1088

    
1089
def get_instances(backend, bulk=True):
1090
    with pooled_rapi_client(backend) as c:
1091
        return c.GetInstances(bulk=bulk)
1092

    
1093

    
1094
def get_nodes(backend, bulk=True):
1095
    with pooled_rapi_client(backend) as c:
1096
        return c.GetNodes(bulk=bulk)
1097

    
1098

    
1099
def get_jobs(backend, bulk=True):
1100
    with pooled_rapi_client(backend) as c:
1101
        return c.GetJobs(bulk=bulk)
1102

    
1103

    
1104
def get_physical_resources(backend):
1105
    """ Get the physical resources of a backend.
1106

1107
    Get the resources of a backend as reported by the backend (not the db).
1108

1109
    """
1110
    nodes = get_nodes(backend, bulk=True)
1111
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1112
    res = {}
1113
    for a in attr:
1114
        res[a] = 0
1115
    for n in nodes:
1116
        # Filter out drained, offline and not vm_capable nodes since they will
1117
        # not take part in the vm allocation process
1118
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1119
        if can_host_vms and n['cnodes']:
1120
            for a in attr:
1121
                res[a] += int(n[a] or 0)
1122
    return res
1123

    
1124

    
1125
def update_backend_resources(backend, resources=None):
1126
    """ Update the state of the backend resources in db.
1127

1128
    """
1129

    
1130
    if not resources:
1131
        resources = get_physical_resources(backend)
1132

    
1133
    backend.mfree = resources['mfree']
1134
    backend.mtotal = resources['mtotal']
1135
    backend.dfree = resources['dfree']
1136
    backend.dtotal = resources['dtotal']
1137
    backend.pinst_cnt = resources['pinst_cnt']
1138
    backend.ctotal = resources['ctotal']
1139
    backend.updated = datetime.now()
1140
    backend.save()
1141

    
1142

    
1143
def get_memory_from_instances(backend):
1144
    """ Get the memory that is used from instances.
1145

1146
    Get the used memory of a backend. Note: This is different for
1147
    the real memory used, due to kvm's memory de-duplication.
1148

1149
    """
1150
    with pooled_rapi_client(backend) as client:
1151
        instances = client.GetInstances(bulk=True)
1152
    mem = 0
1153
    for i in instances:
1154
        mem += i['oper_ram']
1155
    return mem
1156

    
1157

    
1158
def get_available_disk_templates(backend):
1159
    """Get the list of available disk templates of a Ganeti backend.
1160

1161
    The list contains the disk templates that are enabled in the Ganeti backend
1162
    and also included in ipolicy-disk-templates.
1163

1164
    """
1165
    with pooled_rapi_client(backend) as c:
1166
        info = c.GetInfo()
1167
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1168
    try:
1169
        enabled_disk_templates = info["enabled_disk_templates"]
1170
        return [dp for dp in enabled_disk_templates
1171
                if dp in ipolicy_disk_templates]
1172
    except KeyError:
1173
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1174
        return ipolicy_disk_templates
1175

    
1176

    
1177
def update_backend_disk_templates(backend):
1178
    disk_templates = get_available_disk_templates(backend)
1179
    backend.disk_templates = disk_templates
1180
    backend.save()
1181

    
1182

    
1183
##
1184
## Synchronized operations for reconciliation
1185
##
1186

    
1187

    
1188
def create_network_synced(network, backend):
1189
    result = _create_network_synced(network, backend)
1190
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1191
        return result
1192
    result = connect_network_synced(network, backend)
1193
    return result
1194

    
1195

    
1196
def _create_network_synced(network, backend):
1197
    with pooled_rapi_client(backend) as client:
1198
        job = _create_network(network, backend)
1199
        result = wait_for_job(client, job)
1200
    return result
1201

    
1202

    
1203
def connect_network_synced(network, backend):
1204
    with pooled_rapi_client(backend) as client:
1205
        for group in client.GetGroups():
1206
            job = client.ConnectNetwork(network.backend_id, group,
1207
                                        network.mode, network.link)
1208
            result = wait_for_job(client, job)
1209
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1210
                return result
1211

    
1212
    return result
1213

    
1214

    
1215
def wait_for_job(client, jobid):
1216
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1217
    status = result['job_info'][0]
1218
    while status not in rapi.JOB_STATUS_FINALIZED:
1219
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1220
                                         [result], None)
1221
        status = result['job_info'][0]
1222

    
1223
    if status == rapi.JOB_STATUS_SUCCESS:
1224
        return (status, None)
1225
    else:
1226
        error = result['job_info'][1]
1227
        return (status, error)
1228

    
1229

    
1230
def create_job_dependencies(job_ids=[], job_states=None):
1231
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1232
    if job_states is None:
1233
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1234
    assert(type(job_states) == list)
1235
    return [[job_id, job_states] for job_id in job_ids]