Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 7b438672

History | View | Annotate | Download (41.6 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
59
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
60
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
UNKNOWN_NIC_PREFIX = "unknown-"
62

    
63

    
64
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
65
    """Handle quotas for updated VirtualMachine.
66

67
    Update quotas for the updated VirtualMachine based on the job that run on
68
    the Ganeti backend. If a commission has been already issued for this job,
69
    then this commission is just accepted or rejected based on the job status.
70
    Otherwise, a new commission for the given change is issued, that is also in
71
    force and auto-accept mode. In this case, previous commissions are
72
    rejected, since they reflect a previous state of the VM.
73

74
    """
75
    if job_status not in rapi.JOB_STATUS_FINALIZED:
76
        return vm
77

    
78
    # Check successful completion of a job will trigger any quotable change in
79
    # the VM state.
80
    action = utils.get_action_from_opcode(job_opcode, job_fields)
81
    if action == "BUILD":
82
        # Quotas for new VMs are automatically accepted by the API
83
        return vm
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == rapi.JOB_STATUS_SUCCESS:
92
            quotas.accept_serial(serial)
93
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_serial(serial)
97
        vm.serial = None
98
    elif job_status == rapi.JOB_STATUS_SUCCESS:
99
        commission_info = quotas.get_commission_info(resource=vm,
100
                                                     action=action,
101
                                                     action_fields=job_fields)
102
        if commission_info is not None:
103
            # Commission for this change has not been issued, or the issued
104
            # commission was unaware of the current change. Reject all previous
105
            # commissions and create a new one in forced mode!
106
            log.debug("Expected job was %s. Processing job %s.",
107
                      vm.task_job_id, job_id)
108
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
109
                      % (vm, job_id))
110
            quotas.handle_resource_commission(vm, action,
111
                                              action_fields=job_fields,
112
                                              commission_name=reason,
113
                                              force=True,
114
                                              auto_accept=True)
115
            log.debug("Issued new commission: %s", vm.serial)
116

    
117
    return vm
118

    
119

    
120
@transaction.commit_on_success
121
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
123
    """Process a job progress notification from the backend
124

125
    Process an incoming message from the backend (currently Ganeti).
126
    Job notifications with a terminating status (sucess, error, or canceled),
127
    also update the operating state of the VM.
128

129
    """
130
    # See #1492, #1031, #1111 why this line has been removed
131
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
132
    if status not in [x[0] for x in BACKEND_STATUSES]:
133
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
134

    
135
    vm.backendjobid = jobid
136
    vm.backendjobstatus = status
137
    vm.backendopcode = opcode
138
    vm.backendlogmsg = logmsg
139

    
140
    if status not in rapi.JOB_STATUS_FINALIZED:
141
        vm.save()
142
        return
143

    
144
    if job_fields is None:
145
        job_fields = {}
146

    
147
    new_operstate = None
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

    
150
    if status == rapi.JOB_STATUS_SUCCESS:
151
        # If job succeeds, change operating state if needed
152
        if state_for_success is not None:
153
            new_operstate = state_for_success
154

    
155
        beparams = job_fields.get("beparams", None)
156
        if beparams:
157
            # Change the flavor of the VM
158
            _process_resize(vm, beparams)
159

    
160
        # Update backendtime only for jobs that have been successfully
161
        # completed, since only these jobs update the state of the VM. Else a
162
        # "race condition" may occur when a successful job (e.g.
163
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
164
        # in reversed order.
165
        vm.backendtime = etime
166

    
167
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
168
        # Update the NICs of the VM
169
        _process_net_status(vm, etime, nics)
170

    
171
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
172
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
173
                                                     rapi.JOB_STATUS_ERROR):
174
        new_operstate = "ERROR"
175
        vm.backendtime = etime
176
        # Update state of associated NICs
177
        vm.nics.all().update(state="ERROR")
178
    elif opcode == 'OP_INSTANCE_REMOVE':
179
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
180
        # when no instance exists at the Ganeti backend.
181
        # See ticket #799 for all the details.
182
        if (status == rapi.JOB_STATUS_SUCCESS or
183
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
184
            # VM has been deleted
185
            for nic in vm.nics.all():
186
                # Release the IP
187
                remove_nic_ips(nic)
188
                # And delete the NIC.
189
                nic.delete()
190
            vm.deleted = True
191
            new_operstate = state_for_success
192
            vm.backendtime = etime
193
            status = rapi.JOB_STATUS_SUCCESS
194

    
195
    if status in rapi.JOB_STATUS_FINALIZED:
196
        # Job is finalized: Handle quotas/commissioning
197
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
198
                              job_status=status, job_fields=job_fields)
199
        # and clear task fields
200
        if vm.task_job_id == jobid:
201
            vm.task = None
202
            vm.task_job_id = None
203

    
204
    if new_operstate is not None:
205
        vm.operstate = new_operstate
206

    
207
    vm.save()
208

    
209

    
210
def _process_resize(vm, beparams):
211
    """Change flavor of a VirtualMachine based on new beparams."""
212
    old_flavor = vm.flavor
213
    vcpus = beparams.get("vcpus", old_flavor.cpu)
214
    ram = beparams.get("maxmem", old_flavor.ram)
215
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
216
        return
217
    try:
218
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
219
                                        disk=old_flavor.disk,
220
                                        disk_template=old_flavor.disk_template)
221
    except Flavor.DoesNotExist:
222
        raise Exception("Cannot find flavor for VM")
223
    vm.flavor = new_flavor
224
    vm.save()
225

    
226

    
227
@transaction.commit_on_success
228
def process_net_status(vm, etime, nics):
229
    """Wrap _process_net_status inside transaction."""
230
    _process_net_status(vm, etime, nics)
231

    
232

    
233
def _process_net_status(vm, etime, nics):
234
    """Process a net status notification from the backend
235

236
    Process an incoming message from the Ganeti backend,
237
    detailing the NIC configuration of a VM instance.
238

239
    Update the state of the VM in the DB accordingly.
240

241
    """
242
    ganeti_nics = process_ganeti_nics(nics)
243
    db_nics = dict([(nic.id, nic)
244
                    for nic in vm.nics.prefetch_related("ips__subnet")])
245

    
246
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
247
        db_nic = db_nics.get(nic_name)
248
        ganeti_nic = ganeti_nics.get(nic_name)
249
        if ganeti_nic is None:
250
            if nic_is_stale(vm, nic):
251
                log.debug("Removing stale NIC '%s'" % db_nic)
252
                remove_nic_ips(db_nic)
253
                db_nic.delete()
254
            else:
255
                log.info("NIC '%s' is still being created" % db_nic)
256
        elif db_nic is None:
257
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
258
                   " fix this issue!" % (nic_name, vm))
259
            log.error(msg)
260
            continue
261
        elif not nics_are_equal(db_nic, ganeti_nic):
262
            for f in SIMPLE_NIC_FIELDS:
263
                # Update the NIC in DB with the values from Ganeti NIC
264
                setattr(db_nic, f, ganeti_nic[f])
265
                db_nic.save()
266

    
267
            # Special case where the IPv4 address has changed, because you
268
            # need to release the old IPv4 address and reserve the new one
269
            ipv4_address = ganeti_nic["ipv4_address"]
270
            if db_nic.ipv4_address != ipv4_address:
271
                change_address_of_port(db_nic, vm.userid,
272
                                       old_address=db_nic.ipv4_address,
273
                                       new_address=ipv4_address,
274
                                       version=4)
275

    
276
            ipv6_address = ganeti_nic["ipv6_address"]
277
            if db_nic.ipv6_address != ipv6_address:
278
                change_address_of_port(db_nic, vm.userid,
279
                                       old_address=db_nic.ipv6_address,
280
                                       new_address=ipv6_address,
281
                                       version=6)
282

    
283
    vm.backendtime = etime
284
    vm.save()
285

    
286

    
287
def change_address_of_port(port, userid, old_address, new_address, version):
288
    """Change."""
289
    if old_address is not None:
290
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
291
               % (version, port.machine_id, old_address, new_address))
292
        log.error(msg)
293

    
294
    # Remove the old IP address
295
    remove_nic_ips(port, version=version)
296

    
297
    if version == 4:
298
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
299
        ipaddress.nic = port
300
        ipaddress.save()
301
    elif version == 6:
302
        subnet6 = port.network.subnet6
303
        ipaddress = IPAddress.objects.create(userid=userid,
304
                                             network=port.network,
305
                                             subnet=subnet6,
306
                                             nic=port,
307
                                             address=new_address)
308
    else:
309
        raise ValueError("Unknown version: %s" % version)
310

    
311
    # New address log
312
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
313
                                         network_id=port.network_id,
314
                                         address=new_address,
315
                                         active=True)
316
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
317
             ip_log.id, new_address, port.machine_id)
318

    
319
    return ipaddress
320

    
321

    
322
def nics_are_equal(db_nic, gnt_nic):
323
    for field in NIC_FIELDS:
324
        if getattr(db_nic, field) != gnt_nic[field]:
325
            return False
326
    return True
327

    
328

    
329
def process_ganeti_nics(ganeti_nics):
330
    """Process NIC dict from ganeti"""
331
    new_nics = []
332
    for index, gnic in enumerate(ganeti_nics):
333
        nic_name = gnic.get("name", None)
334
        if nic_name is not None:
335
            nic_id = utils.id_from_nic_name(nic_name)
336
        else:
337
            # Put as default value the index. If it is an unknown NIC to
338
            # synnefo it will be created automaticaly.
339
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
340
        network_name = gnic.get('network', '')
341
        network_id = utils.id_from_network_name(network_name)
342
        network = Network.objects.get(id=network_id)
343

    
344
        # Get the new nic info
345
        mac = gnic.get('mac')
346
        ipv4 = gnic.get('ip')
347
        subnet6 = network.subnet6
348
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
349

    
350
        firewall = gnic.get('firewall')
351
        firewall_profile = _reverse_tags.get(firewall)
352
        if not firewall_profile and network.public:
353
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
354

    
355
        nic_info = {
356
            'index': index,
357
            'network': network,
358
            'mac': mac,
359
            'ipv4_address': ipv4,
360
            'ipv6_address': ipv6,
361
            'firewall_profile': firewall_profile,
362
            'state': 'ACTIVE'}
363

    
364
        new_nics.append((nic_id, nic_info))
365
    return dict(new_nics)
366

    
367

    
368
def remove_nic_ips(nic, version=None):
369
    """Remove IP addresses associated with a NetworkInterface.
370

371
    Remove all IP addresses that are associated with the NetworkInterface
372
    object, by returning them to the pool and deleting the IPAddress object. If
373
    the IP is a floating IP, then it is just disassociated from the NIC.
374
    If version is specified, then only IP addressses of that version will be
375
    removed.
376

377
    """
378
    for ip in nic.ips.all():
379
        if version and ip.ipversion != version:
380
            continue
381

    
382
        # Update the DB table holding the logging of all IP addresses
383
        terminate_active_ipaddress_log(nic, ip)
384

    
385
        if ip.floating_ip:
386
            ip.nic = None
387
            ip.save()
388
        else:
389
            # Release the IPv4 address
390
            ip.release_address()
391
            ip.delete()
392

    
393

    
394
def terminate_active_ipaddress_log(nic, ip):
395
    """Update DB logging entry for this IP address."""
396
    if not ip.network.public or nic.machine is None:
397
        return
398
    try:
399
        ip_log, created = \
400
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
401
                                               network_id=ip.network_id,
402
                                               address=ip.address,
403
                                               active=True)
404
    except IPAddressLog.MultipleObjectsReturned:
405
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
406
                  "Server %s. Cannot proceed!"
407
                  % (ip.address, ip.network, nic.machine))
408
        log.error(logmsg)
409
        raise
410

    
411
    if created:
412
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
413
                  " but with wrong creation timestamp."
414
                  % (ip.address, ip.network, nic.machine))
415
        log.error(logmsg)
416
    ip_log.released_at = datetime.now()
417
    ip_log.active = False
418
    ip_log.save()
419

    
420

    
421
@transaction.commit_on_success
422
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
423
    if status not in [x[0] for x in BACKEND_STATUSES]:
424
        raise Network.InvalidBackendMsgError(opcode, status)
425

    
426
    back_network.backendjobid = jobid
427
    back_network.backendjobstatus = status
428
    back_network.backendopcode = opcode
429
    back_network.backendlogmsg = logmsg
430

    
431
    # Note: Network is already locked!
432
    network = back_network.network
433

    
434
    # Notifications of success change the operating state
435
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
436
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
437
        back_network.operstate = state_for_success
438

    
439
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
440
       and opcode == 'OP_NETWORK_ADD'):
441
        back_network.operstate = 'ERROR'
442
        back_network.backendtime = etime
443

    
444
    if opcode == 'OP_NETWORK_REMOVE':
445
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
446
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
447
                                  network_exists_in_backend(back_network)):
448
            back_network.operstate = state_for_success
449
            back_network.deleted = True
450
            back_network.backendtime = etime
451

    
452
    if status == rapi.JOB_STATUS_SUCCESS:
453
        back_network.backendtime = etime
454
    back_network.save()
455
    # Also you must update the state of the Network!!
456
    update_network_state(network)
457

    
458

    
459
def update_network_state(network):
460
    """Update the state of a Network based on BackendNetwork states.
461

462
    Update the state of a Network based on the operstate of the networks in the
463
    backends that network exists.
464

465
    The state of the network is:
466
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
467
    * DELETED: If it is is 'DELETED' in all backends that have been created.
468

469
    This function also releases the resources (MAC prefix or Bridge) and the
470
    quotas for the network.
471

472
    """
473
    if network.deleted:
474
        # Network has already been deleted. Just assert that state is also
475
        # DELETED
476
        if not network.state == "DELETED":
477
            network.state = "DELETED"
478
            network.save()
479
        return
480

    
481
    backend_states = [s.operstate for s in network.backend_networks.all()]
482
    if not backend_states and network.action != "DESTROY":
483
        if network.state != "ACTIVE":
484
            network.state = "ACTIVE"
485
            network.save()
486
            return
487

    
488
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
489
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
490
                     "DELETED")
491

    
492
    # Release the resources on the deletion of the Network
493
    if deleted:
494
        if network.ips.filter(deleted=False, floating_ip=True).exists():
495
            msg = "Cannot delete network %s! Floating IPs still in use!"
496
            log.error(msg % network)
497
            raise Exception(msg % network)
498
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
499
                 network.id, network.mac_prefix, network.link)
500
        network.deleted = True
501
        network.state = "DELETED"
502
        # Undrain the network, otherwise the network state will remain
503
        # as 'SNF:DRAINED'
504
        network.drained = False
505
        if network.mac_prefix:
506
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
507
                release_resource(res_type="mac_prefix",
508
                                 value=network.mac_prefix)
509
        if network.link:
510
            if network.FLAVORS[network.flavor]["link"] == "pool":
511
                release_resource(res_type="bridge", value=network.link)
512

    
513
        # Set all subnets as deleted
514
        network.subnets.update(deleted=True)
515
        # And delete the IP pools
516
        for subnet in network.subnets.all():
517
            if subnet.ipversion == 4:
518
                subnet.ip_pools.all().delete()
519
        # And all the backend networks since there are useless
520
        network.backend_networks.all().delete()
521

    
522
        # Issue commission
523
        if network.userid:
524
            quotas.issue_and_accept_commission(network, action="DESTROY")
525
            # the above has already saved the object and committed;
526
            # a second save would override others' changes, since the
527
            # object is now unlocked
528
            return
529
        elif not network.public:
530
            log.warning("Network %s does not have an owner!", network.id)
531
    network.save()
532

    
533

    
534
@transaction.commit_on_success
535
def process_network_modify(back_network, etime, jobid, opcode, status,
536
                           job_fields):
537
    assert (opcode == "OP_NETWORK_SET_PARAMS")
538
    if status not in [x[0] for x in BACKEND_STATUSES]:
539
        raise Network.InvalidBackendMsgError(opcode, status)
540

    
541
    back_network.backendjobid = jobid
542
    back_network.backendjobstatus = status
543
    back_network.opcode = opcode
544

    
545
    add_reserved_ips = job_fields.get("add_reserved_ips")
546
    if add_reserved_ips:
547
        network = back_network.network
548
        for ip in add_reserved_ips:
549
            network.reserve_address(ip, external=True)
550

    
551
    if status == rapi.JOB_STATUS_SUCCESS:
552
        back_network.backendtime = etime
553
    back_network.save()
554

    
555

    
556
@transaction.commit_on_success
557
def process_create_progress(vm, etime, progress):
558

    
559
    percentage = int(progress)
560

    
561
    # The percentage may exceed 100%, due to the way
562
    # snf-image:copy-progress tracks bytes read by image handling processes
563
    percentage = 100 if percentage > 100 else percentage
564
    if percentage < 0:
565
        raise ValueError("Percentage cannot be negative")
566

    
567
    # FIXME: log a warning here, see #1033
568
#   if last_update > percentage:
569
#       raise ValueError("Build percentage should increase monotonically " \
570
#                        "(old = %d, new = %d)" % (last_update, percentage))
571

    
572
    # This assumes that no message of type 'ganeti-create-progress' is going to
573
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
574
    # the instance is STARTED.  What if the two messages are processed by two
575
    # separate dispatcher threads, and the 'ganeti-op-status' message for
576
    # successful creation gets processed before the 'ganeti-create-progress'
577
    # message? [vkoukis]
578
    #
579
    #if not vm.operstate == 'BUILD':
580
    #    raise VirtualMachine.IllegalState("VM is not in building state")
581

    
582
    vm.buildpercentage = percentage
583
    vm.backendtime = etime
584
    vm.save()
585

    
586

    
587
@transaction.commit_on_success
588
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
589
                               details=None):
590
    """
591
    Create virtual machine instance diagnostic entry.
592

593
    :param vm: VirtualMachine instance to create diagnostic for.
594
    :param message: Diagnostic message.
595
    :param source: Diagnostic source identifier (e.g. image-helper).
596
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
597
    :param etime: The time the message occured (if available).
598
    :param details: Additional details or debug information.
599
    """
600
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
601
                                                   source_date=etime,
602
                                                   message=message,
603
                                                   details=details)
604

    
605

    
606
def create_instance(vm, nics, flavor, image):
607
    """`image` is a dictionary which should contain the keys:
608
            'backend_id', 'format' and 'metadata'
609

610
        metadata value should be a dictionary.
611
    """
612

    
613
    # Handle arguments to CreateInstance() as a dictionary,
614
    # initialize it based on a deployment-specific value.
615
    # This enables the administrator to override deployment-specific
616
    # arguments, such as the disk template to use, name of os provider
617
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
618
    #
619
    kw = vm.backend.get_create_params()
620
    kw['mode'] = 'create'
621
    kw['name'] = vm.backend_vm_id
622
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
623

    
624
    kw['disk_template'] = flavor.disk_template
625
    kw['disks'] = [{"size": flavor.disk * 1024}]
626
    provider = flavor.disk_provider
627
    if provider:
628
        kw['disks'][0]['provider'] = provider
629
        kw['disks'][0]['origin'] = flavor.disk_origin
630

    
631
    kw['nics'] = [{"name": nic.backend_uuid,
632
                   "network": nic.network.backend_id,
633
                   "ip": nic.ipv4_address}
634
                  for nic in nics]
635

    
636
    backend = vm.backend
637
    depend_jobs = []
638
    for nic in nics:
639
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
640
        depend_jobs.extend(job_ids)
641

    
642
    kw["depends"] = create_job_dependencies(depend_jobs)
643

    
644
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
645
    # kw['os'] = settings.GANETI_OS_PROVIDER
646
    kw['ip_check'] = False
647
    kw['name_check'] = False
648

    
649
    # Do not specific a node explicitly, have
650
    # Ganeti use an iallocator instead
651
    #kw['pnode'] = rapi.GetNodes()[0]
652

    
653
    kw['dry_run'] = settings.TEST
654

    
655
    kw['beparams'] = {
656
        'auto_balance': True,
657
        'vcpus': flavor.cpu,
658
        'memory': flavor.ram}
659

    
660
    kw['osparams'] = {
661
        'config_url': vm.config_url,
662
        # Store image id and format to Ganeti
663
        'img_id': image['backend_id'],
664
        'img_format': image['format']}
665

    
666
    # Use opportunistic locking
667
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
668

    
669
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
670
    # kw['hvparams'] = dict(serial_console=False)
671

    
672
    log.debug("Creating instance %s", utils.hide_pass(kw))
673
    with pooled_rapi_client(vm) as client:
674
        return client.CreateInstance(**kw)
675

    
676

    
677
def delete_instance(vm):
678
    with pooled_rapi_client(vm) as client:
679
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
680

    
681

    
682
def reboot_instance(vm, reboot_type):
683
    assert reboot_type in ('soft', 'hard')
684
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
685
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
686
    # 'shutdown_timeout'.
687
    kwargs = {"instance": vm.backend_vm_id,
688
              "reboot_type": "hard"}
689
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
690
    # Ganeti > 2.10. In other versions this parameter will be ignored and
691
    # we will fallback to default timeout of Ganeti (120s).
692
    if reboot_type == "hard":
693
        kwargs["shutdown_timeout"] = 0
694
    if settings.TEST:
695
        kwargs["dry_run"] = True
696
    with pooled_rapi_client(vm) as client:
697
        return client.RebootInstance(**kwargs)
698

    
699

    
700
def startup_instance(vm):
701
    with pooled_rapi_client(vm) as client:
702
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
703

    
704

    
705
def shutdown_instance(vm):
706
    with pooled_rapi_client(vm) as client:
707
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
708

    
709

    
710
def resize_instance(vm, vcpus, memory):
711
    beparams = {"vcpus": int(vcpus),
712
                "minmem": int(memory),
713
                "maxmem": int(memory)}
714
    with pooled_rapi_client(vm) as client:
715
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
716

    
717

    
718
def get_instance_console(vm):
719
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
720
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
721
    # useless (see #783).
722
    #
723
    # Until this is fixed on the Ganeti side, construct a console info reply
724
    # directly.
725
    #
726
    # WARNING: This assumes that VNC runs on port network_port on
727
    #          the instance's primary node, and is probably
728
    #          hypervisor-specific.
729
    #
730
    log.debug("Getting console for vm %s", vm)
731

    
732
    console = {}
733
    console['kind'] = 'vnc'
734

    
735
    with pooled_rapi_client(vm) as client:
736
        i = client.GetInstance(vm.backend_vm_id)
737

    
738
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
739
        raise Exception("hv parameter serial_console cannot be true")
740
    console['host'] = i['pnode']
741
    console['port'] = i['network_port']
742

    
743
    return console
744

    
745

    
746
def get_instance_info(vm):
747
    with pooled_rapi_client(vm) as client:
748
        return client.GetInstance(vm.backend_vm_id)
749

    
750

    
751
def vm_exists_in_backend(vm):
752
    try:
753
        get_instance_info(vm)
754
        return True
755
    except rapi.GanetiApiError as e:
756
        if e.code == 404:
757
            return False
758
        raise e
759

    
760

    
761
def get_network_info(backend_network):
762
    with pooled_rapi_client(backend_network) as client:
763
        return client.GetNetwork(backend_network.network.backend_id)
764

    
765

    
766
def network_exists_in_backend(backend_network):
767
    try:
768
        get_network_info(backend_network)
769
        return True
770
    except rapi.GanetiApiError as e:
771
        if e.code == 404:
772
            return False
773

    
774

    
775
def job_is_still_running(vm, job_id=None):
776
    with pooled_rapi_client(vm) as c:
777
        try:
778
            if job_id is None:
779
                job_id = vm.backendjobid
780
            job_info = c.GetJobStatus(job_id)
781
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
782
        except rapi.GanetiApiError:
783
            return False
784

    
785

    
786
def nic_is_stale(vm, nic, timeout=60):
787
    """Check if a NIC is stale or exists in the Ganeti backend."""
788
    # First check the state of the NIC and if there is a pending CONNECT
789
    if nic.state == "BUILD" and vm.task == "CONNECT":
790
        if datetime.now() < nic.created + timedelta(seconds=timeout):
791
            # Do not check for too recent NICs to avoid the time overhead
792
            return False
793
        if job_is_still_running(vm, job_id=vm.task_job_id):
794
            return False
795
        else:
796
            # If job has finished, check that the NIC exists, because the
797
            # message may have been lost or stuck in the queue.
798
            vm_info = get_instance_info(vm)
799
            if nic.backend_uuid in vm_info["nic.names"]:
800
                return False
801
    return True
802

    
803

    
804
def ensure_network_is_active(backend, network_id):
805
    """Ensure that a network is active in the specified backend
806

807
    Check that a network exists and is active in the specified backend. If not
808
    (re-)create the network. Return the corresponding BackendNetwork object
809
    and the IDs of the Ganeti job to create the network.
810

811
    """
812
    job_ids = []
813
    try:
814
        bnet = BackendNetwork.objects.select_related("network")\
815
                                     .get(backend=backend, network=network_id)
816
        if bnet.operstate != "ACTIVE":
817
            job_ids = create_network(bnet.network, backend, connect=True)
818
    except BackendNetwork.DoesNotExist:
819
        network = Network.objects.select_for_update().get(id=network_id)
820
        bnet = BackendNetwork.objects.create(backend=backend, network=network)
821
        job_ids = create_network(network, backend, connect=True)
822

    
823
    return bnet, job_ids
824

    
825

    
826
def create_network(network, backend, connect=True):
827
    """Create a network in a Ganeti backend"""
828
    log.debug("Creating network %s in backend %s", network, backend)
829

    
830
    job_id = _create_network(network, backend)
831

    
832
    if connect:
833
        job_ids = connect_network(network, backend, depends=[job_id])
834
        return job_ids
835
    else:
836
        return [job_id]
837

    
838

    
839
def _create_network(network, backend):
840
    """Create a network."""
841

    
842
    tags = network.backend_tag
843
    subnet = None
844
    subnet6 = None
845
    gateway = None
846
    gateway6 = None
847
    for _subnet in network.subnets.all():
848
        if _subnet.dhcp and not "nfdhcpd" in tags:
849
            tags.append("nfdhcpd")
850
        if _subnet.ipversion == 4:
851
            subnet = _subnet.cidr
852
            gateway = _subnet.gateway
853
        elif _subnet.ipversion == 6:
854
            subnet6 = _subnet.cidr
855
            gateway6 = _subnet.gateway
856

    
857
    if network.public:
858
        conflicts_check = True
859
        tags.append('public')
860
    else:
861
        conflicts_check = False
862
        tags.append('private')
863

    
864
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
865
    # not support IPv6 only networks. To bypass this limitation, we create the
866
    # network with a dummy network subnet, and make Cyclades connect instances
867
    # to such networks, with address=None.
868
    if subnet is None:
869
        subnet = "10.0.0.0/29"
870

    
871
    try:
872
        bn = BackendNetwork.objects.get(network=network, backend=backend)
873
        mac_prefix = bn.mac_prefix
874
    except BackendNetwork.DoesNotExist:
875
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
876
                        " does not exist" % (network.id, backend.id))
877

    
878
    with pooled_rapi_client(backend) as client:
879
        return client.CreateNetwork(network_name=network.backend_id,
880
                                    network=subnet,
881
                                    network6=subnet6,
882
                                    gateway=gateway,
883
                                    gateway6=gateway6,
884
                                    mac_prefix=mac_prefix,
885
                                    conflicts_check=conflicts_check,
886
                                    tags=tags)
887

    
888

    
889
def connect_network(network, backend, depends=[], group=None):
890
    """Connect a network to nodegroups."""
891
    log.debug("Connecting network %s to backend %s", network, backend)
892

    
893
    if network.public:
894
        conflicts_check = True
895
    else:
896
        conflicts_check = False
897

    
898
    depends = create_job_dependencies(depends)
899
    with pooled_rapi_client(backend) as client:
900
        groups = [group] if group is not None else client.GetGroups()
901
        job_ids = []
902
        for group in groups:
903
            job_id = client.ConnectNetwork(network.backend_id, group,
904
                                           network.mode, network.link,
905
                                           conflicts_check,
906
                                           depends=depends)
907
            job_ids.append(job_id)
908
    return job_ids
909

    
910

    
911
def delete_network(network, backend, disconnect=True):
912
    log.debug("Deleting network %s from backend %s", network, backend)
913

    
914
    depends = []
915
    if disconnect:
916
        depends = disconnect_network(network, backend)
917
    _delete_network(network, backend, depends=depends)
918

    
919

    
920
def _delete_network(network, backend, depends=[]):
921
    depends = create_job_dependencies(depends)
922
    with pooled_rapi_client(backend) as client:
923
        return client.DeleteNetwork(network.backend_id, depends)
924

    
925

    
926
def disconnect_network(network, backend, group=None):
927
    log.debug("Disconnecting network %s to backend %s", network, backend)
928

    
929
    with pooled_rapi_client(backend) as client:
930
        groups = [group] if group is not None else client.GetGroups()
931
        job_ids = []
932
        for group in groups:
933
            job_id = client.DisconnectNetwork(network.backend_id, group)
934
            job_ids.append(job_id)
935
    return job_ids
936

    
937

    
938
def connect_to_network(vm, nic):
939
    network = nic.network
940
    backend = vm.backend
941
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
942

    
943
    depends = create_job_dependencies(depend_jobs)
944

    
945
    nic = {'name': nic.backend_uuid,
946
           'network': network.backend_id,
947
           'ip': nic.ipv4_address}
948

    
949
    log.debug("Adding NIC %s to VM %s", nic, vm)
950

    
951
    kwargs = {
952
        "instance": vm.backend_vm_id,
953
        "nics": [("add", "-1", nic)],
954
        "depends": depends,
955
    }
956
    if vm.backend.use_hotplug():
957
        kwargs["hotplug_if_possible"] = True
958
    if settings.TEST:
959
        kwargs["dry_run"] = True
960

    
961
    with pooled_rapi_client(vm) as client:
962
        return client.ModifyInstance(**kwargs)
963

    
964

    
965
def disconnect_from_network(vm, nic):
966
    log.debug("Removing NIC %s of VM %s", nic, vm)
967

    
968
    kwargs = {
969
        "instance": vm.backend_vm_id,
970
        "nics": [("remove", nic.backend_uuid, {})],
971
    }
972
    if vm.backend.use_hotplug():
973
        kwargs["hotplug_if_possible"] = True
974
    if settings.TEST:
975
        kwargs["dry_run"] = True
976

    
977
    with pooled_rapi_client(vm) as client:
978
        jobID = client.ModifyInstance(**kwargs)
979
        firewall_profile = nic.firewall_profile
980
        if firewall_profile and firewall_profile != "DISABLED":
981
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
982
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
983
                                      dry_run=settings.TEST)
984

    
985
        return jobID
986

    
987

    
988
def set_firewall_profile(vm, profile, nic):
989
    uuid = nic.backend_uuid
990
    try:
991
        tag = _firewall_tags[profile] % uuid
992
    except KeyError:
993
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
994

    
995
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
996

    
997
    with pooled_rapi_client(vm) as client:
998
        # Delete previous firewall tags
999
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
1000
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
1001
                       if (t % uuid) in old_tags]
1002
        if delete_tags:
1003
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1004
                                      dry_run=settings.TEST)
1005

    
1006
        if profile != "DISABLED":
1007
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1008
                                   dry_run=settings.TEST)
1009

    
1010
        # XXX NOP ModifyInstance call to force process_net_status to run
1011
        # on the dispatcher
1012
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1013
        client.ModifyInstance(vm.backend_vm_id,
1014
                              os_name=os_name)
1015
    return None
1016

    
1017

    
1018
def get_instances(backend, bulk=True):
1019
    with pooled_rapi_client(backend) as c:
1020
        return c.GetInstances(bulk=bulk)
1021

    
1022

    
1023
def get_nodes(backend, bulk=True):
1024
    with pooled_rapi_client(backend) as c:
1025
        return c.GetNodes(bulk=bulk)
1026

    
1027

    
1028
def get_jobs(backend, bulk=True):
1029
    with pooled_rapi_client(backend) as c:
1030
        return c.GetJobs(bulk=bulk)
1031

    
1032

    
1033
def get_physical_resources(backend):
1034
    """ Get the physical resources of a backend.
1035

1036
    Get the resources of a backend as reported by the backend (not the db).
1037

1038
    """
1039
    nodes = get_nodes(backend, bulk=True)
1040
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1041
    res = {}
1042
    for a in attr:
1043
        res[a] = 0
1044
    for n in nodes:
1045
        # Filter out drained, offline and not vm_capable nodes since they will
1046
        # not take part in the vm allocation process
1047
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1048
        if can_host_vms and n['cnodes']:
1049
            for a in attr:
1050
                res[a] += int(n[a] or 0)
1051
    return res
1052

    
1053

    
1054
def update_backend_resources(backend, resources=None):
1055
    """ Update the state of the backend resources in db.
1056

1057
    """
1058

    
1059
    if not resources:
1060
        resources = get_physical_resources(backend)
1061

    
1062
    backend.mfree = resources['mfree']
1063
    backend.mtotal = resources['mtotal']
1064
    backend.dfree = resources['dfree']
1065
    backend.dtotal = resources['dtotal']
1066
    backend.pinst_cnt = resources['pinst_cnt']
1067
    backend.ctotal = resources['ctotal']
1068
    backend.updated = datetime.now()
1069
    backend.save()
1070

    
1071

    
1072
def get_memory_from_instances(backend):
1073
    """ Get the memory that is used from instances.
1074

1075
    Get the used memory of a backend. Note: This is different for
1076
    the real memory used, due to kvm's memory de-duplication.
1077

1078
    """
1079
    with pooled_rapi_client(backend) as client:
1080
        instances = client.GetInstances(bulk=True)
1081
    mem = 0
1082
    for i in instances:
1083
        mem += i['oper_ram']
1084
    return mem
1085

    
1086

    
1087
def get_available_disk_templates(backend):
1088
    """Get the list of available disk templates of a Ganeti backend.
1089

1090
    The list contains the disk templates that are enabled in the Ganeti backend
1091
    and also included in ipolicy-disk-templates.
1092

1093
    """
1094
    with pooled_rapi_client(backend) as c:
1095
        info = c.GetInfo()
1096
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1097
    try:
1098
        enabled_disk_templates = info["enabled_disk_templates"]
1099
        return [dp for dp in enabled_disk_templates
1100
                if dp in ipolicy_disk_templates]
1101
    except KeyError:
1102
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1103
        return ipolicy_disk_templates
1104

    
1105

    
1106
def update_backend_disk_templates(backend):
1107
    disk_templates = get_available_disk_templates(backend)
1108
    backend.disk_templates = disk_templates
1109
    backend.save()
1110

    
1111

    
1112
##
1113
## Synchronized operations for reconciliation
1114
##
1115

    
1116

    
1117
def create_network_synced(network, backend):
1118
    result = _create_network_synced(network, backend)
1119
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1120
        return result
1121
    result = connect_network_synced(network, backend)
1122
    return result
1123

    
1124

    
1125
def _create_network_synced(network, backend):
1126
    with pooled_rapi_client(backend) as client:
1127
        job = _create_network(network, backend)
1128
        result = wait_for_job(client, job)
1129
    return result
1130

    
1131

    
1132
def connect_network_synced(network, backend):
1133
    with pooled_rapi_client(backend) as client:
1134
        for group in client.GetGroups():
1135
            job = client.ConnectNetwork(network.backend_id, group,
1136
                                        network.mode, network.link)
1137
            result = wait_for_job(client, job)
1138
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1139
                return result
1140

    
1141
    return result
1142

    
1143

    
1144
def wait_for_job(client, jobid):
1145
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1146
    status = result['job_info'][0]
1147
    while status not in rapi.JOB_STATUS_FINALIZED:
1148
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1149
                                         [result], None)
1150
        status = result['job_info'][0]
1151

    
1152
    if status == rapi.JOB_STATUS_SUCCESS:
1153
        return (status, None)
1154
    else:
1155
        error = result['job_info'][1]
1156
        return (status, error)
1157

    
1158

    
1159
def create_job_dependencies(job_ids=[], job_states=None):
1160
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1161
    if job_states is None:
1162
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1163
    assert(type(job_states) == list)
1164
    return [[job_id, job_states] for job_id in job_ids]