Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ a3acfc5b

History | View | Annotate | Download (41.8 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
59
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
60
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
UNKNOWN_NIC_PREFIX = "unknown-"
62

    
63

    
64
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
65
    """Handle quotas for updated VirtualMachine.
66

67
    Update quotas for the updated VirtualMachine based on the job that run on
68
    the Ganeti backend. If a commission has been already issued for this job,
69
    then this commission is just accepted or rejected based on the job status.
70
    Otherwise, a new commission for the given change is issued, that is also in
71
    force and auto-accept mode. In this case, previous commissions are
72
    rejected, since they reflect a previous state of the VM.
73

74
    """
75
    if job_status not in rapi.JOB_STATUS_FINALIZED:
76
        return vm
77

    
78
    # Check successful completion of a job will trigger any quotable change in
79
    # the VM state.
80
    action = utils.get_action_from_opcode(job_opcode, job_fields)
81
    if action == "BUILD":
82
        # Quotas for new VMs are automatically accepted by the API
83
        return vm
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == rapi.JOB_STATUS_SUCCESS:
92
            quotas.accept_serial(serial)
93
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_serial(serial)
97
        vm.serial = None
98
    elif job_status == rapi.JOB_STATUS_SUCCESS:
99
        commission_info = quotas.get_commission_info(resource=vm,
100
                                                     action=action,
101
                                                     action_fields=job_fields)
102
        if commission_info is not None:
103
            # Commission for this change has not been issued, or the issued
104
            # commission was unaware of the current change. Reject all previous
105
            # commissions and create a new one in forced mode!
106
            log.debug("Expected job was %s. Processing job %s.",
107
                      vm.task_job_id, job_id)
108
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
109
                      % (vm, job_id))
110
            quotas.handle_resource_commission(vm, action,
111
                                              action_fields=job_fields,
112
                                              commission_name=reason,
113
                                              force=True,
114
                                              auto_accept=True)
115
            log.debug("Issued new commission: %s", vm.serial)
116

    
117
    return vm
118

    
119

    
120
@transaction.commit_on_success
121
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
123
    """Process a job progress notification from the backend
124

125
    Process an incoming message from the backend (currently Ganeti).
126
    Job notifications with a terminating status (sucess, error, or canceled),
127
    also update the operating state of the VM.
128

129
    """
130
    # See #1492, #1031, #1111 why this line has been removed
131
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
132
    if status not in [x[0] for x in BACKEND_STATUSES]:
133
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
134

    
135
    vm.backendjobid = jobid
136
    vm.backendjobstatus = status
137
    vm.backendopcode = opcode
138
    vm.backendlogmsg = logmsg
139

    
140
    if status not in rapi.JOB_STATUS_FINALIZED:
141
        vm.save()
142
        return
143

    
144
    if job_fields is None:
145
        job_fields = {}
146

    
147
    new_operstate = None
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

    
150
    if status == rapi.JOB_STATUS_SUCCESS:
151
        # If job succeeds, change operating state if needed
152
        if state_for_success is not None:
153
            new_operstate = state_for_success
154

    
155
        beparams = job_fields.get("beparams", None)
156
        if beparams:
157
            # Change the flavor of the VM
158
            _process_resize(vm, beparams)
159

    
160
        # Update backendtime only for jobs that have been successfully
161
        # completed, since only these jobs update the state of the VM. Else a
162
        # "race condition" may occur when a successful job (e.g.
163
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
164
        # in reversed order.
165
        vm.backendtime = etime
166

    
167
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
168
        # Update the NICs of the VM
169
        _process_net_status(vm, etime, nics)
170

    
171
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
172
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
173
                                                     rapi.JOB_STATUS_ERROR):
174
        new_operstate = "ERROR"
175
        vm.backendtime = etime
176
        # Update state of associated NICs
177
        vm.nics.all().update(state="ERROR")
178
    elif opcode == 'OP_INSTANCE_REMOVE':
179
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
180
        # when no instance exists at the Ganeti backend.
181
        # See ticket #799 for all the details.
182
        if (status == rapi.JOB_STATUS_SUCCESS or
183
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
184
            # VM has been deleted
185
            for nic in vm.nics.all():
186
                # Release the IP
187
                remove_nic_ips(nic)
188
                # And delete the NIC.
189
                nic.delete()
190
            vm.deleted = True
191
            new_operstate = state_for_success
192
            vm.backendtime = etime
193
            status = rapi.JOB_STATUS_SUCCESS
194

    
195
    if status in rapi.JOB_STATUS_FINALIZED:
196
        # Job is finalized: Handle quotas/commissioning
197
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
198
                              job_status=status, job_fields=job_fields)
199
        # and clear task fields
200
        if vm.task_job_id == jobid:
201
            vm.task = None
202
            vm.task_job_id = None
203

    
204
    if new_operstate is not None:
205
        vm.operstate = new_operstate
206

    
207
    vm.save()
208

    
209

    
210
def _process_resize(vm, beparams):
211
    """Change flavor of a VirtualMachine based on new beparams."""
212
    old_flavor = vm.flavor
213
    vcpus = beparams.get("vcpus", old_flavor.cpu)
214
    ram = beparams.get("maxmem", old_flavor.ram)
215
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
216
        return
217
    try:
218
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
219
                                        disk=old_flavor.disk,
220
                                        disk_template=old_flavor.disk_template)
221
    except Flavor.DoesNotExist:
222
        raise Exception("Cannot find flavor for VM")
223
    vm.flavor = new_flavor
224
    vm.save()
225

    
226

    
227
@transaction.commit_on_success
228
def process_net_status(vm, etime, nics):
229
    """Wrap _process_net_status inside transaction."""
230
    _process_net_status(vm, etime, nics)
231

    
232

    
233
def _process_net_status(vm, etime, nics):
234
    """Process a net status notification from the backend
235

236
    Process an incoming message from the Ganeti backend,
237
    detailing the NIC configuration of a VM instance.
238

239
    Update the state of the VM in the DB accordingly.
240

241
    """
242
    ganeti_nics = process_ganeti_nics(nics)
243
    db_nics = dict([(nic.id, nic)
244
                    for nic in vm.nics.prefetch_related("ips__subnet")])
245

    
246
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
247
        db_nic = db_nics.get(nic_name)
248
        ganeti_nic = ganeti_nics.get(nic_name)
249
        if ganeti_nic is None:
250
            if nic_is_stale(vm, nic):
251
                log.debug("Removing stale NIC '%s'" % db_nic)
252
                remove_nic_ips(db_nic)
253
                db_nic.delete()
254
            else:
255
                log.info("NIC '%s' is still being created" % db_nic)
256
        elif db_nic is None:
257
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
258
                   " fix this issue!" % (nic_name, vm))
259
            log.error(msg)
260
            continue
261
        elif not nics_are_equal(db_nic, ganeti_nic):
262
            for f in SIMPLE_NIC_FIELDS:
263
                # Update the NIC in DB with the values from Ganeti NIC
264
                setattr(db_nic, f, ganeti_nic[f])
265
                db_nic.save()
266

    
267
            # Special case where the IPv4 address has changed, because you
268
            # need to release the old IPv4 address and reserve the new one
269
            ipv4_address = ganeti_nic["ipv4_address"]
270
            if db_nic.ipv4_address != ipv4_address:
271
                change_address_of_port(db_nic, vm.userid,
272
                                       old_address=db_nic.ipv4_address,
273
                                       new_address=ipv4_address,
274
                                       version=4)
275

    
276
            ipv6_address = ganeti_nic["ipv6_address"]
277
            if db_nic.ipv6_address != ipv6_address:
278
                change_address_of_port(db_nic, vm.userid,
279
                                       old_address=db_nic.ipv6_address,
280
                                       new_address=ipv6_address,
281
                                       version=6)
282

    
283
    vm.backendtime = etime
284
    vm.save()
285

    
286

    
287
def change_address_of_port(port, userid, old_address, new_address, version):
288
    """Change."""
289
    if old_address is not None:
290
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
291
               % (version, port.machine_id, old_address, new_address))
292
        log.error(msg)
293

    
294
    # Remove the old IP address
295
    remove_nic_ips(port, version=version)
296

    
297
    if version == 4:
298
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
299
        ipaddress.nic = port
300
        ipaddress.save()
301
    elif version == 6:
302
        subnet6 = port.network.subnet6
303
        ipaddress = IPAddress.objects.create(userid=userid,
304
                                             network=port.network,
305
                                             subnet=subnet6,
306
                                             nic=port,
307
                                             address=new_address)
308
    else:
309
        raise ValueError("Unknown version: %s" % version)
310

    
311
    # New address log
312
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
313
                                         network_id=port.network_id,
314
                                         address=new_address,
315
                                         active=True)
316
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
317
             ip_log.id, new_address, port.machine_id)
318

    
319
    return ipaddress
320

    
321

    
322
def nics_are_equal(db_nic, gnt_nic):
323
    for field in NIC_FIELDS:
324
        if getattr(db_nic, field) != gnt_nic[field]:
325
            return False
326
    return True
327

    
328

    
329
def process_ganeti_nics(ganeti_nics):
330
    """Process NIC dict from ganeti"""
331
    new_nics = []
332
    for index, gnic in enumerate(ganeti_nics):
333
        nic_name = gnic.get("name", None)
334
        if nic_name is not None:
335
            nic_id = utils.id_from_nic_name(nic_name)
336
        else:
337
            # Put as default value the index. If it is an unknown NIC to
338
            # synnefo it will be created automaticaly.
339
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
340
        network_name = gnic.get('network', '')
341
        network_id = utils.id_from_network_name(network_name)
342
        network = Network.objects.get(id=network_id)
343

    
344
        # Get the new nic info
345
        mac = gnic.get('mac')
346
        ipv4 = gnic.get('ip')
347
        subnet6 = network.subnet6
348
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
349

    
350
        firewall = gnic.get('firewall')
351
        firewall_profile = _reverse_tags.get(firewall)
352
        if not firewall_profile and network.public:
353
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
354

    
355
        nic_info = {
356
            'index': index,
357
            'network': network,
358
            'mac': mac,
359
            'ipv4_address': ipv4,
360
            'ipv6_address': ipv6,
361
            'firewall_profile': firewall_profile,
362
            'state': 'ACTIVE'}
363

    
364
        new_nics.append((nic_id, nic_info))
365
    return dict(new_nics)
366

    
367

    
368
def remove_nic_ips(nic, version=None):
369
    """Remove IP addresses associated with a NetworkInterface.
370

371
    Remove all IP addresses that are associated with the NetworkInterface
372
    object, by returning them to the pool and deleting the IPAddress object. If
373
    the IP is a floating IP, then it is just disassociated from the NIC.
374
    If version is specified, then only IP addressses of that version will be
375
    removed.
376

377
    """
378
    for ip in nic.ips.all():
379
        if version and ip.ipversion != version:
380
            continue
381

    
382
        # Update the DB table holding the logging of all IP addresses
383
        terminate_active_ipaddress_log(nic, ip)
384

    
385
        if ip.floating_ip:
386
            ip.nic = None
387
            ip.save()
388
        else:
389
            # Release the IPv4 address
390
            ip.release_address()
391
            ip.delete()
392

    
393

    
394
def terminate_active_ipaddress_log(nic, ip):
395
    """Update DB logging entry for this IP address."""
396
    if not ip.network.public or nic.machine is None:
397
        return
398
    try:
399
        ip_log, created = \
400
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
401
                                               network_id=ip.network_id,
402
                                               address=ip.address,
403
                                               active=True)
404
    except IPAddressLog.MultipleObjectsReturned:
405
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
406
                  "Server %s. Cannot proceed!"
407
                  % (ip.address, ip.network, nic.machine))
408
        log.error(logmsg)
409
        raise
410

    
411
    if created:
412
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
413
                  " but with wrong creation timestamp."
414
                  % (ip.address, ip.network, nic.machine))
415
        log.error(logmsg)
416
    ip_log.released_at = datetime.now()
417
    ip_log.active = False
418
    ip_log.save()
419

    
420

    
421
@transaction.commit_on_success
422
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
423
    if status not in [x[0] for x in BACKEND_STATUSES]:
424
        raise Network.InvalidBackendMsgError(opcode, status)
425

    
426
    back_network.backendjobid = jobid
427
    back_network.backendjobstatus = status
428
    back_network.backendopcode = opcode
429
    back_network.backendlogmsg = logmsg
430

    
431
    # Note: Network is already locked!
432
    network = back_network.network
433

    
434
    # Notifications of success change the operating state
435
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
436
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
437
        back_network.operstate = state_for_success
438

    
439
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
440
       and opcode == 'OP_NETWORK_ADD'):
441
        back_network.operstate = 'ERROR'
442
        back_network.backendtime = etime
443

    
444
    if opcode == 'OP_NETWORK_REMOVE':
445
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
446
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
447
                                  network_exists_in_backend(back_network)):
448
            back_network.operstate = state_for_success
449
            back_network.deleted = True
450
            back_network.backendtime = etime
451

    
452
    if status == rapi.JOB_STATUS_SUCCESS:
453
        back_network.backendtime = etime
454
    back_network.save()
455
    # Also you must update the state of the Network!!
456
    update_network_state(network)
457

    
458

    
459
def update_network_state(network):
460
    """Update the state of a Network based on BackendNetwork states.
461

462
    Update the state of a Network based on the operstate of the networks in the
463
    backends that network exists.
464

465
    The state of the network is:
466
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
467
    * DELETED: If it is is 'DELETED' in all backends that have been created.
468

469
    This function also releases the resources (MAC prefix or Bridge) and the
470
    quotas for the network.
471

472
    """
473
    if network.deleted:
474
        # Network has already been deleted. Just assert that state is also
475
        # DELETED
476
        if not network.state == "DELETED":
477
            network.state = "DELETED"
478
            network.save()
479
        return
480

    
481
    backend_states = [s.operstate for s in network.backend_networks.all()]
482
    if not backend_states and network.action != "DESTROY":
483
        if network.state != "ACTIVE":
484
            network.state = "ACTIVE"
485
            network.save()
486
            return
487

    
488
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
489
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
490
                     "DELETED")
491

    
492
    # Release the resources on the deletion of the Network
493
    if deleted:
494
        if network.ips.filter(deleted=False, floating_ip=True).exists():
495
            msg = "Cannot delete network %s! Floating IPs still in use!"
496
            log.error(msg % network)
497
            raise Exception(msg % network)
498
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
499
                 network.id, network.mac_prefix, network.link)
500
        network.deleted = True
501
        network.state = "DELETED"
502
        # Undrain the network, otherwise the network state will remain
503
        # as 'SNF:DRAINED'
504
        network.drained = False
505
        if network.mac_prefix:
506
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
507
                release_resource(res_type="mac_prefix",
508
                                 value=network.mac_prefix)
509
        if network.link:
510
            if network.FLAVORS[network.flavor]["link"] == "pool":
511
                release_resource(res_type="bridge", value=network.link)
512

    
513
        # Set all subnets as deleted
514
        network.subnets.update(deleted=True)
515
        # And delete the IP pools
516
        for subnet in network.subnets.all():
517
            if subnet.ipversion == 4:
518
                subnet.ip_pools.all().delete()
519
        # And all the backend networks since there are useless
520
        network.backend_networks.all().delete()
521

    
522
        # Issue commission
523
        if network.userid:
524
            quotas.issue_and_accept_commission(network, action="DESTROY")
525
            # the above has already saved the object and committed;
526
            # a second save would override others' changes, since the
527
            # object is now unlocked
528
            return
529
        elif not network.public:
530
            log.warning("Network %s does not have an owner!", network.id)
531
    network.save()
532

    
533

    
534
@transaction.commit_on_success
535
def process_network_modify(back_network, etime, jobid, opcode, status,
536
                           job_fields):
537
    assert (opcode == "OP_NETWORK_SET_PARAMS")
538
    if status not in [x[0] for x in BACKEND_STATUSES]:
539
        raise Network.InvalidBackendMsgError(opcode, status)
540

    
541
    back_network.backendjobid = jobid
542
    back_network.backendjobstatus = status
543
    back_network.opcode = opcode
544

    
545
    add_reserved_ips = job_fields.get("add_reserved_ips")
546
    if add_reserved_ips:
547
        network = back_network.network
548
        for ip in add_reserved_ips:
549
            network.reserve_address(ip, external=True)
550

    
551
    if status == rapi.JOB_STATUS_SUCCESS:
552
        back_network.backendtime = etime
553
    back_network.save()
554

    
555

    
556
@transaction.commit_on_success
557
def process_create_progress(vm, etime, progress):
558

    
559
    percentage = int(progress)
560

    
561
    # The percentage may exceed 100%, due to the way
562
    # snf-image:copy-progress tracks bytes read by image handling processes
563
    percentage = 100 if percentage > 100 else percentage
564
    if percentage < 0:
565
        raise ValueError("Percentage cannot be negative")
566

    
567
    # FIXME: log a warning here, see #1033
568
#   if last_update > percentage:
569
#       raise ValueError("Build percentage should increase monotonically " \
570
#                        "(old = %d, new = %d)" % (last_update, percentage))
571

    
572
    # This assumes that no message of type 'ganeti-create-progress' is going to
573
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
574
    # the instance is STARTED.  What if the two messages are processed by two
575
    # separate dispatcher threads, and the 'ganeti-op-status' message for
576
    # successful creation gets processed before the 'ganeti-create-progress'
577
    # message? [vkoukis]
578
    #
579
    #if not vm.operstate == 'BUILD':
580
    #    raise VirtualMachine.IllegalState("VM is not in building state")
581

    
582
    vm.buildpercentage = percentage
583
    vm.backendtime = etime
584
    vm.save()
585

    
586

    
587
@transaction.commit_on_success
588
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
589
                               details=None):
590
    """
591
    Create virtual machine instance diagnostic entry.
592

593
    :param vm: VirtualMachine instance to create diagnostic for.
594
    :param message: Diagnostic message.
595
    :param source: Diagnostic source identifier (e.g. image-helper).
596
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
597
    :param etime: The time the message occured (if available).
598
    :param details: Additional details or debug information.
599
    """
600
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
601
                                                   source_date=etime,
602
                                                   message=message,
603
                                                   details=details)
604

    
605

    
606
def create_instance(vm, nics, flavor, image):
607
    """`image` is a dictionary which should contain the keys:
608
            'backend_id', 'format' and 'metadata'
609

610
        metadata value should be a dictionary.
611
    """
612

    
613
    # Handle arguments to CreateInstance() as a dictionary,
614
    # initialize it based on a deployment-specific value.
615
    # This enables the administrator to override deployment-specific
616
    # arguments, such as the disk template to use, name of os provider
617
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
618
    #
619
    kw = vm.backend.get_create_params()
620
    kw['mode'] = 'create'
621
    kw['name'] = vm.backend_vm_id
622
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
623

    
624
    kw['disk_template'] = flavor.disk_template
625
    kw['disks'] = [{"size": flavor.disk * 1024}]
626
    provider = flavor.disk_provider
627
    if provider:
628
        kw['disks'][0]['provider'] = provider
629
        kw['disks'][0]['origin'] = flavor.disk_origin
630
        extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS.get(provider)
631
        if extra_disk_params is not None:
632
            kw["disks"][0].update(extra_disk_params)
633

    
634
    kw['nics'] = [{"name": nic.backend_uuid,
635
                   "network": nic.network.backend_id,
636
                   "ip": nic.ipv4_address}
637
                  for nic in nics]
638

    
639
    backend = vm.backend
640
    depend_jobs = []
641
    for nic in nics:
642
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
643
        depend_jobs.extend(job_ids)
644

    
645
    kw["depends"] = create_job_dependencies(depend_jobs)
646

    
647
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
648
    # kw['os'] = settings.GANETI_OS_PROVIDER
649
    kw['ip_check'] = False
650
    kw['name_check'] = False
651

    
652
    # Do not specific a node explicitly, have
653
    # Ganeti use an iallocator instead
654
    #kw['pnode'] = rapi.GetNodes()[0]
655

    
656
    kw['dry_run'] = settings.TEST
657

    
658
    kw['beparams'] = {
659
        'auto_balance': True,
660
        'vcpus': flavor.cpu,
661
        'memory': flavor.ram}
662

    
663
    kw['osparams'] = {
664
        'config_url': vm.config_url,
665
        # Store image id and format to Ganeti
666
        'img_id': image['backend_id'],
667
        'img_format': image['format']}
668

    
669
    # Use opportunistic locking
670
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
671

    
672
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
673
    # kw['hvparams'] = dict(serial_console=False)
674

    
675
    log.debug("Creating instance %s", utils.hide_pass(kw))
676
    with pooled_rapi_client(vm) as client:
677
        return client.CreateInstance(**kw)
678

    
679

    
680
def delete_instance(vm):
681
    with pooled_rapi_client(vm) as client:
682
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
683

    
684

    
685
def reboot_instance(vm, reboot_type):
686
    assert reboot_type in ('soft', 'hard')
687
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
688
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
689
    # 'shutdown_timeout'.
690
    kwargs = {"instance": vm.backend_vm_id,
691
              "reboot_type": "hard"}
692
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
693
    # Ganeti > 2.10. In other versions this parameter will be ignored and
694
    # we will fallback to default timeout of Ganeti (120s).
695
    if reboot_type == "hard":
696
        kwargs["shutdown_timeout"] = 0
697
    if settings.TEST:
698
        kwargs["dry_run"] = True
699
    with pooled_rapi_client(vm) as client:
700
        return client.RebootInstance(**kwargs)
701

    
702

    
703
def startup_instance(vm):
704
    with pooled_rapi_client(vm) as client:
705
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
706

    
707

    
708
def shutdown_instance(vm):
709
    with pooled_rapi_client(vm) as client:
710
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
711

    
712

    
713
def resize_instance(vm, vcpus, memory):
714
    beparams = {"vcpus": int(vcpus),
715
                "minmem": int(memory),
716
                "maxmem": int(memory)}
717
    with pooled_rapi_client(vm) as client:
718
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
719

    
720

    
721
def get_instance_console(vm):
722
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
723
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
724
    # useless (see #783).
725
    #
726
    # Until this is fixed on the Ganeti side, construct a console info reply
727
    # directly.
728
    #
729
    # WARNING: This assumes that VNC runs on port network_port on
730
    #          the instance's primary node, and is probably
731
    #          hypervisor-specific.
732
    #
733
    log.debug("Getting console for vm %s", vm)
734

    
735
    console = {}
736
    console['kind'] = 'vnc'
737

    
738
    with pooled_rapi_client(vm) as client:
739
        i = client.GetInstance(vm.backend_vm_id)
740

    
741
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
742
        raise Exception("hv parameter serial_console cannot be true")
743
    console['host'] = i['pnode']
744
    console['port'] = i['network_port']
745

    
746
    return console
747

    
748

    
749
def get_instance_info(vm):
750
    with pooled_rapi_client(vm) as client:
751
        return client.GetInstance(vm.backend_vm_id)
752

    
753

    
754
def vm_exists_in_backend(vm):
755
    try:
756
        get_instance_info(vm)
757
        return True
758
    except rapi.GanetiApiError as e:
759
        if e.code == 404:
760
            return False
761
        raise e
762

    
763

    
764
def get_network_info(backend_network):
765
    with pooled_rapi_client(backend_network) as client:
766
        return client.GetNetwork(backend_network.network.backend_id)
767

    
768

    
769
def network_exists_in_backend(backend_network):
770
    try:
771
        get_network_info(backend_network)
772
        return True
773
    except rapi.GanetiApiError as e:
774
        if e.code == 404:
775
            return False
776

    
777

    
778
def job_is_still_running(vm, job_id=None):
779
    with pooled_rapi_client(vm) as c:
780
        try:
781
            if job_id is None:
782
                job_id = vm.backendjobid
783
            job_info = c.GetJobStatus(job_id)
784
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
785
        except rapi.GanetiApiError:
786
            return False
787

    
788

    
789
def nic_is_stale(vm, nic, timeout=60):
790
    """Check if a NIC is stale or exists in the Ganeti backend."""
791
    # First check the state of the NIC and if there is a pending CONNECT
792
    if nic.state == "BUILD" and vm.task == "CONNECT":
793
        if datetime.now() < nic.created + timedelta(seconds=timeout):
794
            # Do not check for too recent NICs to avoid the time overhead
795
            return False
796
        if job_is_still_running(vm, job_id=vm.task_job_id):
797
            return False
798
        else:
799
            # If job has finished, check that the NIC exists, because the
800
            # message may have been lost or stuck in the queue.
801
            vm_info = get_instance_info(vm)
802
            if nic.backend_uuid in vm_info["nic.names"]:
803
                return False
804
    return True
805

    
806

    
807
def ensure_network_is_active(backend, network_id):
808
    """Ensure that a network is active in the specified backend
809

810
    Check that a network exists and is active in the specified backend. If not
811
    (re-)create the network. Return the corresponding BackendNetwork object
812
    and the IDs of the Ganeti job to create the network.
813

814
    """
815
    job_ids = []
816
    try:
817
        bnet = BackendNetwork.objects.select_related("network")\
818
                                     .get(backend=backend, network=network_id)
819
        if bnet.operstate != "ACTIVE":
820
            job_ids = create_network(bnet.network, backend, connect=True)
821
    except BackendNetwork.DoesNotExist:
822
        network = Network.objects.select_for_update().get(id=network_id)
823
        bnet = BackendNetwork.objects.create(backend=backend, network=network)
824
        job_ids = create_network(network, backend, connect=True)
825

    
826
    return bnet, job_ids
827

    
828

    
829
def create_network(network, backend, connect=True):
830
    """Create a network in a Ganeti backend"""
831
    log.debug("Creating network %s in backend %s", network, backend)
832

    
833
    job_id = _create_network(network, backend)
834

    
835
    if connect:
836
        job_ids = connect_network(network, backend, depends=[job_id])
837
        return job_ids
838
    else:
839
        return [job_id]
840

    
841

    
842
def _create_network(network, backend):
843
    """Create a network."""
844

    
845
    tags = network.backend_tag
846
    subnet = None
847
    subnet6 = None
848
    gateway = None
849
    gateway6 = None
850
    for _subnet in network.subnets.all():
851
        if _subnet.dhcp and not "nfdhcpd" in tags:
852
            tags.append("nfdhcpd")
853
        if _subnet.ipversion == 4:
854
            subnet = _subnet.cidr
855
            gateway = _subnet.gateway
856
        elif _subnet.ipversion == 6:
857
            subnet6 = _subnet.cidr
858
            gateway6 = _subnet.gateway
859

    
860
    conflicts_check = False
861
    if network.public:
862
        tags.append('public')
863
        if subnet is not None:
864
            conflicts_check = True
865
    else:
866
        tags.append('private')
867

    
868
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
869
    # not support IPv6 only networks. To bypass this limitation, we create the
870
    # network with a dummy network subnet, and make Cyclades connect instances
871
    # to such networks, with address=None.
872
    if subnet is None:
873
        subnet = "10.0.0.0/29"
874

    
875
    try:
876
        bn = BackendNetwork.objects.get(network=network, backend=backend)
877
        mac_prefix = bn.mac_prefix
878
    except BackendNetwork.DoesNotExist:
879
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
880
                        " does not exist" % (network.id, backend.id))
881

    
882
    with pooled_rapi_client(backend) as client:
883
        return client.CreateNetwork(network_name=network.backend_id,
884
                                    network=subnet,
885
                                    network6=subnet6,
886
                                    gateway=gateway,
887
                                    gateway6=gateway6,
888
                                    mac_prefix=mac_prefix,
889
                                    conflicts_check=conflicts_check,
890
                                    tags=tags)
891

    
892

    
893
def connect_network(network, backend, depends=[], group=None):
894
    """Connect a network to nodegroups."""
895
    log.debug("Connecting network %s to backend %s", network, backend)
896

    
897
    conflicts_check = False
898
    if network.public and (network.subnet4 is not None):
899
        conflicts_check = True
900

    
901
    depends = create_job_dependencies(depends)
902
    with pooled_rapi_client(backend) as client:
903
        groups = [group] if group is not None else client.GetGroups()
904
        job_ids = []
905
        for group in groups:
906
            job_id = client.ConnectNetwork(network.backend_id, group,
907
                                           network.mode, network.link,
908
                                           conflicts_check,
909
                                           depends=depends)
910
            job_ids.append(job_id)
911
    return job_ids
912

    
913

    
914
def delete_network(network, backend, disconnect=True):
915
    log.debug("Deleting network %s from backend %s", network, backend)
916

    
917
    depends = []
918
    if disconnect:
919
        depends = disconnect_network(network, backend)
920
    _delete_network(network, backend, depends=depends)
921

    
922

    
923
def _delete_network(network, backend, depends=[]):
924
    depends = create_job_dependencies(depends)
925
    with pooled_rapi_client(backend) as client:
926
        return client.DeleteNetwork(network.backend_id, depends)
927

    
928

    
929
def disconnect_network(network, backend, group=None):
930
    log.debug("Disconnecting network %s to backend %s", network, backend)
931

    
932
    with pooled_rapi_client(backend) as client:
933
        groups = [group] if group is not None else client.GetGroups()
934
        job_ids = []
935
        for group in groups:
936
            job_id = client.DisconnectNetwork(network.backend_id, group)
937
            job_ids.append(job_id)
938
    return job_ids
939

    
940

    
941
def connect_to_network(vm, nic):
942
    network = nic.network
943
    backend = vm.backend
944
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
945

    
946
    depends = create_job_dependencies(depend_jobs)
947

    
948
    nic = {'name': nic.backend_uuid,
949
           'network': network.backend_id,
950
           'ip': nic.ipv4_address}
951

    
952
    log.debug("Adding NIC %s to VM %s", nic, vm)
953

    
954
    kwargs = {
955
        "instance": vm.backend_vm_id,
956
        "nics": [("add", "-1", nic)],
957
        "depends": depends,
958
    }
959
    if vm.backend.use_hotplug():
960
        kwargs["hotplug_if_possible"] = True
961
    if settings.TEST:
962
        kwargs["dry_run"] = True
963

    
964
    with pooled_rapi_client(vm) as client:
965
        return client.ModifyInstance(**kwargs)
966

    
967

    
968
def disconnect_from_network(vm, nic):
969
    log.debug("Removing NIC %s of VM %s", nic, vm)
970

    
971
    kwargs = {
972
        "instance": vm.backend_vm_id,
973
        "nics": [("remove", nic.backend_uuid, {})],
974
    }
975
    if vm.backend.use_hotplug():
976
        kwargs["hotplug_if_possible"] = True
977
    if settings.TEST:
978
        kwargs["dry_run"] = True
979

    
980
    with pooled_rapi_client(vm) as client:
981
        jobID = client.ModifyInstance(**kwargs)
982
        firewall_profile = nic.firewall_profile
983
        if firewall_profile and firewall_profile != "DISABLED":
984
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
985
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
986
                                      dry_run=settings.TEST)
987

    
988
        return jobID
989

    
990

    
991
def set_firewall_profile(vm, profile, nic):
992
    uuid = nic.backend_uuid
993
    try:
994
        tag = _firewall_tags[profile] % uuid
995
    except KeyError:
996
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
997

    
998
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
999

    
1000
    with pooled_rapi_client(vm) as client:
1001
        # Delete previous firewall tags
1002
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
1003
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
1004
                       if (t % uuid) in old_tags]
1005
        if delete_tags:
1006
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1007
                                      dry_run=settings.TEST)
1008

    
1009
        if profile != "DISABLED":
1010
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1011
                                   dry_run=settings.TEST)
1012

    
1013
        # XXX NOP ModifyInstance call to force process_net_status to run
1014
        # on the dispatcher
1015
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1016
        client.ModifyInstance(vm.backend_vm_id,
1017
                              os_name=os_name)
1018
    return None
1019

    
1020

    
1021
def get_instances(backend, bulk=True):
1022
    with pooled_rapi_client(backend) as c:
1023
        return c.GetInstances(bulk=bulk)
1024

    
1025

    
1026
def get_nodes(backend, bulk=True):
1027
    with pooled_rapi_client(backend) as c:
1028
        return c.GetNodes(bulk=bulk)
1029

    
1030

    
1031
def get_jobs(backend, bulk=True):
1032
    with pooled_rapi_client(backend) as c:
1033
        return c.GetJobs(bulk=bulk)
1034

    
1035

    
1036
def get_physical_resources(backend):
1037
    """ Get the physical resources of a backend.
1038

1039
    Get the resources of a backend as reported by the backend (not the db).
1040

1041
    """
1042
    nodes = get_nodes(backend, bulk=True)
1043
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1044
    res = {}
1045
    for a in attr:
1046
        res[a] = 0
1047
    for n in nodes:
1048
        # Filter out drained, offline and not vm_capable nodes since they will
1049
        # not take part in the vm allocation process
1050
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1051
        if can_host_vms and n['cnodes']:
1052
            for a in attr:
1053
                res[a] += int(n[a] or 0)
1054
    return res
1055

    
1056

    
1057
def update_backend_resources(backend, resources=None):
1058
    """ Update the state of the backend resources in db.
1059

1060
    """
1061

    
1062
    if not resources:
1063
        resources = get_physical_resources(backend)
1064

    
1065
    backend.mfree = resources['mfree']
1066
    backend.mtotal = resources['mtotal']
1067
    backend.dfree = resources['dfree']
1068
    backend.dtotal = resources['dtotal']
1069
    backend.pinst_cnt = resources['pinst_cnt']
1070
    backend.ctotal = resources['ctotal']
1071
    backend.updated = datetime.now()
1072
    backend.save()
1073

    
1074

    
1075
def get_memory_from_instances(backend):
1076
    """ Get the memory that is used from instances.
1077

1078
    Get the used memory of a backend. Note: This is different for
1079
    the real memory used, due to kvm's memory de-duplication.
1080

1081
    """
1082
    with pooled_rapi_client(backend) as client:
1083
        instances = client.GetInstances(bulk=True)
1084
    mem = 0
1085
    for i in instances:
1086
        mem += i['oper_ram']
1087
    return mem
1088

    
1089

    
1090
def get_available_disk_templates(backend):
1091
    """Get the list of available disk templates of a Ganeti backend.
1092

1093
    The list contains the disk templates that are enabled in the Ganeti backend
1094
    and also included in ipolicy-disk-templates.
1095

1096
    """
1097
    with pooled_rapi_client(backend) as c:
1098
        info = c.GetInfo()
1099
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1100
    try:
1101
        enabled_disk_templates = info["enabled_disk_templates"]
1102
        return [dp for dp in enabled_disk_templates
1103
                if dp in ipolicy_disk_templates]
1104
    except KeyError:
1105
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1106
        return ipolicy_disk_templates
1107

    
1108

    
1109
def update_backend_disk_templates(backend):
1110
    disk_templates = get_available_disk_templates(backend)
1111
    backend.disk_templates = disk_templates
1112
    backend.save()
1113

    
1114

    
1115
##
1116
## Synchronized operations for reconciliation
1117
##
1118

    
1119

    
1120
def create_network_synced(network, backend):
1121
    result = _create_network_synced(network, backend)
1122
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1123
        return result
1124
    result = connect_network_synced(network, backend)
1125
    return result
1126

    
1127

    
1128
def _create_network_synced(network, backend):
1129
    with pooled_rapi_client(backend) as client:
1130
        job = _create_network(network, backend)
1131
        result = wait_for_job(client, job)
1132
    return result
1133

    
1134

    
1135
def connect_network_synced(network, backend):
1136
    with pooled_rapi_client(backend) as client:
1137
        for group in client.GetGroups():
1138
            job = client.ConnectNetwork(network.backend_id, group,
1139
                                        network.mode, network.link)
1140
            result = wait_for_job(client, job)
1141
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1142
                return result
1143

    
1144
    return result
1145

    
1146

    
1147
def wait_for_job(client, jobid):
1148
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1149
    status = result['job_info'][0]
1150
    while status not in rapi.JOB_STATUS_FINALIZED:
1151
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1152
                                         [result], None)
1153
        status = result['job_info'][0]
1154

    
1155
    if status == rapi.JOB_STATUS_SUCCESS:
1156
        return (status, None)
1157
    else:
1158
        error = result['job_info'][1]
1159
        return (status, error)
1160

    
1161

    
1162
def create_job_dependencies(job_ids=[], job_states=None):
1163
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1164
    if job_states is None:
1165
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1166
    assert(type(job_states) == list)
1167
    return [[job_id, job_states] for job_id in job_ids]