Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 5920f82c

History | View | Annotate | Download (41.9 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
59
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
60
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
UNKNOWN_NIC_PREFIX = "unknown-"
62

    
63

    
64
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
65
    """Handle quotas for updated VirtualMachine.
66

67
    Update quotas for the updated VirtualMachine based on the job that run on
68
    the Ganeti backend. If a commission has been already issued for this job,
69
    then this commission is just accepted or rejected based on the job status.
70
    Otherwise, a new commission for the given change is issued, that is also in
71
    force and auto-accept mode. In this case, previous commissions are
72
    rejected, since they reflect a previous state of the VM.
73

74
    """
75
    if job_status not in rapi.JOB_STATUS_FINALIZED:
76
        return vm
77

    
78
    # Check successful completion of a job will trigger any quotable change in
79
    # the VM state.
80
    action = utils.get_action_from_opcode(job_opcode, job_fields)
81
    if action == "BUILD":
82
        # Quotas for new VMs are automatically accepted by the API
83
        return vm
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == rapi.JOB_STATUS_SUCCESS:
92
            quotas.accept_serial(serial)
93
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_serial(serial)
97
        vm.serial = None
98
    elif job_status == rapi.JOB_STATUS_SUCCESS:
99
        commission_info = quotas.get_commission_info(resource=vm,
100
                                                     action=action,
101
                                                     action_fields=job_fields)
102
        if commission_info is not None:
103
            # Commission for this change has not been issued, or the issued
104
            # commission was unaware of the current change. Reject all previous
105
            # commissions and create a new one in forced mode!
106
            log.debug("Expected job was %s. Processing job %s.",
107
                      vm.task_job_id, job_id)
108
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
109
                      % (vm, job_id))
110
            quotas.handle_resource_commission(vm, action,
111
                                              action_fields=job_fields,
112
                                              commission_name=reason,
113
                                              force=True,
114
                                              auto_accept=True)
115
            log.debug("Issued new commission: %s", vm.serial)
116

    
117
    return vm
118

    
119

    
120
@transaction.commit_on_success
121
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
123
    """Process a job progress notification from the backend
124

125
    Process an incoming message from the backend (currently Ganeti).
126
    Job notifications with a terminating status (sucess, error, or canceled),
127
    also update the operating state of the VM.
128

129
    """
130
    # See #1492, #1031, #1111 why this line has been removed
131
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
132
    if status not in [x[0] for x in BACKEND_STATUSES]:
133
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
134

    
135
    vm.backendjobid = jobid
136
    vm.backendjobstatus = status
137
    vm.backendopcode = opcode
138
    vm.backendlogmsg = logmsg
139

    
140
    if status not in rapi.JOB_STATUS_FINALIZED:
141
        vm.save()
142
        return
143

    
144
    if job_fields is None:
145
        job_fields = {}
146

    
147
    new_operstate = None
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

    
150
    if status == rapi.JOB_STATUS_SUCCESS:
151
        # If job succeeds, change operating state if needed
152
        if state_for_success is not None:
153
            new_operstate = state_for_success
154

    
155
        beparams = job_fields.get("beparams", None)
156
        if beparams:
157
            # Change the flavor of the VM
158
            _process_resize(vm, beparams)
159

    
160
        # Update backendtime only for jobs that have been successfully
161
        # completed, since only these jobs update the state of the VM. Else a
162
        # "race condition" may occur when a successful job (e.g.
163
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
164
        # in reversed order.
165
        vm.backendtime = etime
166

    
167
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
168
        # Update the NICs of the VM
169
        _process_net_status(vm, etime, nics)
170

    
171
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
172
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
173
                                                     rapi.JOB_STATUS_ERROR):
174
        new_operstate = "ERROR"
175
        vm.backendtime = etime
176
        # Update state of associated NICs
177
        vm.nics.all().update(state="ERROR")
178
    elif opcode == 'OP_INSTANCE_REMOVE':
179
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
180
        # when no instance exists at the Ganeti backend.
181
        # See ticket #799 for all the details.
182
        if (status == rapi.JOB_STATUS_SUCCESS or
183
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
184
            # VM has been deleted
185
            for nic in vm.nics.all():
186
                # Release the IP
187
                remove_nic_ips(nic)
188
                # And delete the NIC.
189
                nic.delete()
190
            vm.deleted = True
191
            new_operstate = state_for_success
192
            vm.backendtime = etime
193
            status = rapi.JOB_STATUS_SUCCESS
194

    
195
    if status in rapi.JOB_STATUS_FINALIZED:
196
        # Job is finalized: Handle quotas/commissioning
197
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
198
                              job_status=status, job_fields=job_fields)
199
        # and clear task fields
200
        if vm.task_job_id == jobid:
201
            vm.task = None
202
            vm.task_job_id = None
203

    
204
    if new_operstate is not None:
205
        vm.operstate = new_operstate
206

    
207
    vm.save()
208

    
209

    
210
def _process_resize(vm, beparams):
211
    """Change flavor of a VirtualMachine based on new beparams."""
212
    old_flavor = vm.flavor
213
    vcpus = beparams.get("vcpus", old_flavor.cpu)
214
    ram = beparams.get("maxmem", old_flavor.ram)
215
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
216
        return
217
    try:
218
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
219
                                        disk=old_flavor.disk,
220
                                        disk_template=old_flavor.disk_template)
221
    except Flavor.DoesNotExist:
222
        raise Exception("Cannot find flavor for VM")
223
    vm.flavor = new_flavor
224
    vm.save()
225

    
226

    
227
@transaction.commit_on_success
228
def process_net_status(vm, etime, nics):
229
    """Wrap _process_net_status inside transaction."""
230
    _process_net_status(vm, etime, nics)
231

    
232

    
233
def _process_net_status(vm, etime, nics):
234
    """Process a net status notification from the backend
235

236
    Process an incoming message from the Ganeti backend,
237
    detailing the NIC configuration of a VM instance.
238

239
    Update the state of the VM in the DB accordingly.
240

241
    """
242
    ganeti_nics = process_ganeti_nics(nics)
243
    db_nics = dict([(nic.id, nic)
244
                    for nic in vm.nics.prefetch_related("ips__subnet")])
245

    
246
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
247
        db_nic = db_nics.get(nic_name)
248
        ganeti_nic = ganeti_nics.get(nic_name)
249
        if ganeti_nic is None:
250
            if nic_is_stale(vm, nic):
251
                log.debug("Removing stale NIC '%s'" % db_nic)
252
                remove_nic_ips(db_nic)
253
                db_nic.delete()
254
            else:
255
                log.info("NIC '%s' is still being created" % db_nic)
256
        elif db_nic is None:
257
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
258
                   " fix this issue!" % (nic_name, vm))
259
            log.error(msg)
260
            continue
261
        elif not nics_are_equal(db_nic, ganeti_nic):
262
            for f in SIMPLE_NIC_FIELDS:
263
                # Update the NIC in DB with the values from Ganeti NIC
264
                setattr(db_nic, f, ganeti_nic[f])
265
                db_nic.save()
266

    
267
            # Special case where the IPv4 address has changed, because you
268
            # need to release the old IPv4 address and reserve the new one
269
            ipv4_address = ganeti_nic["ipv4_address"]
270
            if db_nic.ipv4_address != ipv4_address:
271
                change_address_of_port(db_nic, vm.userid,
272
                                       old_address=db_nic.ipv4_address,
273
                                       new_address=ipv4_address,
274
                                       version=4)
275

    
276
            ipv6_address = ganeti_nic["ipv6_address"]
277
            if db_nic.ipv6_address != ipv6_address:
278
                change_address_of_port(db_nic, vm.userid,
279
                                       old_address=db_nic.ipv6_address,
280
                                       new_address=ipv6_address,
281
                                       version=6)
282

    
283
    vm.backendtime = etime
284
    vm.save()
285

    
286

    
287
def change_address_of_port(port, userid, old_address, new_address, version):
288
    """Change."""
289
    if old_address is not None:
290
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
291
               % (version, port.machine_id, old_address, new_address))
292
        log.error(msg)
293

    
294
    # Remove the old IP address
295
    remove_nic_ips(port, version=version)
296

    
297
    if version == 4:
298
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
299
        ipaddress.nic = port
300
        ipaddress.save()
301
    elif version == 6:
302
        subnet6 = port.network.subnet6
303
        ipaddress = IPAddress.objects.create(userid=userid,
304
                                             network=port.network,
305
                                             subnet=subnet6,
306
                                             nic=port,
307
                                             address=new_address,
308
                                             ipversion=6)
309
    else:
310
        raise ValueError("Unknown version: %s" % version)
311

    
312
    # New address log
313
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
314
                                         network_id=port.network_id,
315
                                         address=new_address,
316
                                         active=True)
317
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
318
             ip_log.id, new_address, port.machine_id)
319

    
320
    return ipaddress
321

    
322

    
323
def nics_are_equal(db_nic, gnt_nic):
324
    for field in NIC_FIELDS:
325
        if getattr(db_nic, field) != gnt_nic[field]:
326
            return False
327
    return True
328

    
329

    
330
def process_ganeti_nics(ganeti_nics):
331
    """Process NIC dict from ganeti"""
332
    new_nics = []
333
    for index, gnic in enumerate(ganeti_nics):
334
        nic_name = gnic.get("name", None)
335
        if nic_name is not None:
336
            nic_id = utils.id_from_nic_name(nic_name)
337
        else:
338
            # Put as default value the index. If it is an unknown NIC to
339
            # synnefo it will be created automaticaly.
340
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
341
        network_name = gnic.get('network', '')
342
        network_id = utils.id_from_network_name(network_name)
343
        network = Network.objects.get(id=network_id)
344

    
345
        # Get the new nic info
346
        mac = gnic.get('mac')
347
        ipv4 = gnic.get('ip')
348
        subnet6 = network.subnet6
349
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
350

    
351
        firewall = gnic.get('firewall')
352
        firewall_profile = _reverse_tags.get(firewall)
353
        if not firewall_profile and network.public:
354
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
355

    
356
        nic_info = {
357
            'index': index,
358
            'network': network,
359
            'mac': mac,
360
            'ipv4_address': ipv4,
361
            'ipv6_address': ipv6,
362
            'firewall_profile': firewall_profile,
363
            'state': 'ACTIVE'}
364

    
365
        new_nics.append((nic_id, nic_info))
366
    return dict(new_nics)
367

    
368

    
369
def remove_nic_ips(nic, version=None):
370
    """Remove IP addresses associated with a NetworkInterface.
371

372
    Remove all IP addresses that are associated with the NetworkInterface
373
    object, by returning them to the pool and deleting the IPAddress object. If
374
    the IP is a floating IP, then it is just disassociated from the NIC.
375
    If version is specified, then only IP addressses of that version will be
376
    removed.
377

378
    """
379
    for ip in nic.ips.all():
380
        if version and ip.ipversion != version:
381
            continue
382

    
383
        # Update the DB table holding the logging of all IP addresses
384
        terminate_active_ipaddress_log(nic, ip)
385

    
386
        if ip.floating_ip:
387
            ip.nic = None
388
            ip.save()
389
        else:
390
            # Release the IPv4 address
391
            ip.release_address()
392
            ip.delete()
393

    
394

    
395
def terminate_active_ipaddress_log(nic, ip):
396
    """Update DB logging entry for this IP address."""
397
    if not ip.network.public or nic.machine is None:
398
        return
399
    try:
400
        ip_log, created = \
401
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
402
                                               network_id=ip.network_id,
403
                                               address=ip.address,
404
                                               active=True)
405
    except IPAddressLog.MultipleObjectsReturned:
406
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
407
                  "Server %s. Cannot proceed!"
408
                  % (ip.address, ip.network, nic.machine))
409
        log.error(logmsg)
410
        raise
411

    
412
    if created:
413
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
414
                  " but with wrong creation timestamp."
415
                  % (ip.address, ip.network, nic.machine))
416
        log.error(logmsg)
417
    ip_log.released_at = datetime.now()
418
    ip_log.active = False
419
    ip_log.save()
420

    
421

    
422
@transaction.commit_on_success
423
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
424
    if status not in [x[0] for x in BACKEND_STATUSES]:
425
        raise Network.InvalidBackendMsgError(opcode, status)
426

    
427
    back_network.backendjobid = jobid
428
    back_network.backendjobstatus = status
429
    back_network.backendopcode = opcode
430
    back_network.backendlogmsg = logmsg
431

    
432
    # Note: Network is already locked!
433
    network = back_network.network
434

    
435
    # Notifications of success change the operating state
436
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
437
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
438
        back_network.operstate = state_for_success
439

    
440
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
441
       and opcode == 'OP_NETWORK_ADD'):
442
        back_network.operstate = 'ERROR'
443
        back_network.backendtime = etime
444

    
445
    if opcode == 'OP_NETWORK_REMOVE':
446
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
447
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
448
                                  network_exists_in_backend(back_network)):
449
            back_network.operstate = state_for_success
450
            back_network.deleted = True
451
            back_network.backendtime = etime
452

    
453
    if status == rapi.JOB_STATUS_SUCCESS:
454
        back_network.backendtime = etime
455
    back_network.save()
456
    # Also you must update the state of the Network!!
457
    update_network_state(network)
458

    
459

    
460
def update_network_state(network):
461
    """Update the state of a Network based on BackendNetwork states.
462

463
    Update the state of a Network based on the operstate of the networks in the
464
    backends that network exists.
465

466
    The state of the network is:
467
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
468
    * DELETED: If it is is 'DELETED' in all backends that have been created.
469

470
    This function also releases the resources (MAC prefix or Bridge) and the
471
    quotas for the network.
472

473
    """
474
    if network.deleted:
475
        # Network has already been deleted. Just assert that state is also
476
        # DELETED
477
        if not network.state == "DELETED":
478
            network.state = "DELETED"
479
            network.save()
480
        return
481

    
482
    backend_states = [s.operstate for s in network.backend_networks.all()]
483
    if not backend_states and network.action != "DESTROY":
484
        if network.state != "ACTIVE":
485
            network.state = "ACTIVE"
486
            network.save()
487
            return
488

    
489
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
490
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
491
                     "DELETED")
492

    
493
    # Release the resources on the deletion of the Network
494
    if deleted:
495
        if network.ips.filter(deleted=False, floating_ip=True).exists():
496
            msg = "Cannot delete network %s! Floating IPs still in use!"
497
            log.error(msg % network)
498
            raise Exception(msg % network)
499
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
500
                 network.id, network.mac_prefix, network.link)
501
        network.deleted = True
502
        network.state = "DELETED"
503
        # Undrain the network, otherwise the network state will remain
504
        # as 'SNF:DRAINED'
505
        network.drained = False
506
        if network.mac_prefix:
507
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
508
                release_resource(res_type="mac_prefix",
509
                                 value=network.mac_prefix)
510
        if network.link:
511
            if network.FLAVORS[network.flavor]["link"] == "pool":
512
                release_resource(res_type="bridge", value=network.link)
513

    
514
        # Set all subnets as deleted
515
        network.subnets.update(deleted=True)
516
        # And delete the IP pools
517
        for subnet in network.subnets.all():
518
            if subnet.ipversion == 4:
519
                subnet.ip_pools.all().delete()
520
        # And all the backend networks since there are useless
521
        network.backend_networks.all().delete()
522

    
523
        # Issue commission
524
        if network.userid:
525
            quotas.issue_and_accept_commission(network, action="DESTROY")
526
            # the above has already saved the object and committed;
527
            # a second save would override others' changes, since the
528
            # object is now unlocked
529
            return
530
        elif not network.public:
531
            log.warning("Network %s does not have an owner!", network.id)
532
    network.save()
533

    
534

    
535
@transaction.commit_on_success
536
def process_network_modify(back_network, etime, jobid, opcode, status,
537
                           job_fields):
538
    assert (opcode == "OP_NETWORK_SET_PARAMS")
539
    if status not in [x[0] for x in BACKEND_STATUSES]:
540
        raise Network.InvalidBackendMsgError(opcode, status)
541

    
542
    back_network.backendjobid = jobid
543
    back_network.backendjobstatus = status
544
    back_network.opcode = opcode
545

    
546
    add_reserved_ips = job_fields.get("add_reserved_ips")
547
    if add_reserved_ips:
548
        network = back_network.network
549
        for ip in add_reserved_ips:
550
            network.reserve_address(ip, external=True)
551

    
552
    if status == rapi.JOB_STATUS_SUCCESS:
553
        back_network.backendtime = etime
554
    back_network.save()
555

    
556

    
557
@transaction.commit_on_success
558
def process_create_progress(vm, etime, progress):
559

    
560
    percentage = int(progress)
561

    
562
    # The percentage may exceed 100%, due to the way
563
    # snf-image:copy-progress tracks bytes read by image handling processes
564
    percentage = 100 if percentage > 100 else percentage
565
    if percentage < 0:
566
        raise ValueError("Percentage cannot be negative")
567

    
568
    # FIXME: log a warning here, see #1033
569
#   if last_update > percentage:
570
#       raise ValueError("Build percentage should increase monotonically " \
571
#                        "(old = %d, new = %d)" % (last_update, percentage))
572

    
573
    # This assumes that no message of type 'ganeti-create-progress' is going to
574
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
575
    # the instance is STARTED.  What if the two messages are processed by two
576
    # separate dispatcher threads, and the 'ganeti-op-status' message for
577
    # successful creation gets processed before the 'ganeti-create-progress'
578
    # message? [vkoukis]
579
    #
580
    #if not vm.operstate == 'BUILD':
581
    #    raise VirtualMachine.IllegalState("VM is not in building state")
582

    
583
    vm.buildpercentage = percentage
584
    vm.backendtime = etime
585
    vm.save()
586

    
587

    
588
@transaction.commit_on_success
589
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
590
                               details=None):
591
    """
592
    Create virtual machine instance diagnostic entry.
593

594
    :param vm: VirtualMachine instance to create diagnostic for.
595
    :param message: Diagnostic message.
596
    :param source: Diagnostic source identifier (e.g. image-helper).
597
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
598
    :param etime: The time the message occured (if available).
599
    :param details: Additional details or debug information.
600
    """
601
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
602
                                                   source_date=etime,
603
                                                   message=message,
604
                                                   details=details)
605

    
606

    
607
def create_instance(vm, nics, flavor, image):
608
    """`image` is a dictionary which should contain the keys:
609
            'backend_id', 'format' and 'metadata'
610

611
        metadata value should be a dictionary.
612
    """
613

    
614
    # Handle arguments to CreateInstance() as a dictionary,
615
    # initialize it based on a deployment-specific value.
616
    # This enables the administrator to override deployment-specific
617
    # arguments, such as the disk template to use, name of os provider
618
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
619
    #
620
    kw = vm.backend.get_create_params()
621
    kw['mode'] = 'create'
622
    kw['name'] = vm.backend_vm_id
623
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
624

    
625
    kw['disk_template'] = flavor.disk_template
626
    kw['disks'] = [{"size": flavor.disk * 1024}]
627
    provider = flavor.disk_provider
628
    if provider:
629
        kw['disks'][0]['provider'] = provider
630
        kw['disks'][0]['origin'] = flavor.disk_origin
631
        extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS.get(provider)
632
        if extra_disk_params is not None:
633
            kw["disks"][0].update(extra_disk_params)
634

    
635
    kw['nics'] = [{"name": nic.backend_uuid,
636
                   "network": nic.network.backend_id,
637
                   "ip": nic.ipv4_address}
638
                  for nic in nics]
639

    
640
    backend = vm.backend
641
    depend_jobs = []
642
    for nic in nics:
643
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
644
        depend_jobs.extend(job_ids)
645

    
646
    kw["depends"] = create_job_dependencies(depend_jobs)
647

    
648
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
649
    # kw['os'] = settings.GANETI_OS_PROVIDER
650
    kw['ip_check'] = False
651
    kw['name_check'] = False
652

    
653
    # Do not specific a node explicitly, have
654
    # Ganeti use an iallocator instead
655
    #kw['pnode'] = rapi.GetNodes()[0]
656

    
657
    kw['dry_run'] = settings.TEST
658

    
659
    kw['beparams'] = {
660
        'auto_balance': True,
661
        'vcpus': flavor.cpu,
662
        'memory': flavor.ram}
663

    
664
    kw['osparams'] = {
665
        'config_url': vm.config_url,
666
        # Store image id and format to Ganeti
667
        'img_id': image['backend_id'],
668
        'img_format': image['format']}
669

    
670
    # Use opportunistic locking
671
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
672

    
673
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
674
    # kw['hvparams'] = dict(serial_console=False)
675

    
676
    log.debug("Creating instance %s", utils.hide_pass(kw))
677
    with pooled_rapi_client(vm) as client:
678
        return client.CreateInstance(**kw)
679

    
680

    
681
def delete_instance(vm):
682
    with pooled_rapi_client(vm) as client:
683
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
684

    
685

    
686
def reboot_instance(vm, reboot_type):
687
    assert reboot_type in ('soft', 'hard')
688
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
689
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
690
    # 'shutdown_timeout'.
691
    kwargs = {"instance": vm.backend_vm_id,
692
              "reboot_type": "hard"}
693
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
694
    # Ganeti > 2.10. In other versions this parameter will be ignored and
695
    # we will fallback to default timeout of Ganeti (120s).
696
    if reboot_type == "hard":
697
        kwargs["shutdown_timeout"] = 0
698
    if settings.TEST:
699
        kwargs["dry_run"] = True
700
    with pooled_rapi_client(vm) as client:
701
        return client.RebootInstance(**kwargs)
702

    
703

    
704
def startup_instance(vm):
705
    with pooled_rapi_client(vm) as client:
706
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
707

    
708

    
709
def shutdown_instance(vm):
710
    with pooled_rapi_client(vm) as client:
711
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
712

    
713

    
714
def resize_instance(vm, vcpus, memory):
715
    beparams = {"vcpus": int(vcpus),
716
                "minmem": int(memory),
717
                "maxmem": int(memory)}
718
    with pooled_rapi_client(vm) as client:
719
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
720

    
721

    
722
def get_instance_console(vm):
723
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
724
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
725
    # useless (see #783).
726
    #
727
    # Until this is fixed on the Ganeti side, construct a console info reply
728
    # directly.
729
    #
730
    # WARNING: This assumes that VNC runs on port network_port on
731
    #          the instance's primary node, and is probably
732
    #          hypervisor-specific.
733
    #
734
    log.debug("Getting console for vm %s", vm)
735

    
736
    console = {}
737
    console['kind'] = 'vnc'
738

    
739
    with pooled_rapi_client(vm) as client:
740
        i = client.GetInstance(vm.backend_vm_id)
741

    
742
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
743
        raise Exception("hv parameter serial_console cannot be true")
744
    console['host'] = i['pnode']
745
    console['port'] = i['network_port']
746

    
747
    return console
748

    
749

    
750
def get_instance_info(vm):
751
    with pooled_rapi_client(vm) as client:
752
        return client.GetInstance(vm.backend_vm_id)
753

    
754

    
755
def vm_exists_in_backend(vm):
756
    try:
757
        get_instance_info(vm)
758
        return True
759
    except rapi.GanetiApiError as e:
760
        if e.code == 404:
761
            return False
762
        raise e
763

    
764

    
765
def get_network_info(backend_network):
766
    with pooled_rapi_client(backend_network) as client:
767
        return client.GetNetwork(backend_network.network.backend_id)
768

    
769

    
770
def network_exists_in_backend(backend_network):
771
    try:
772
        get_network_info(backend_network)
773
        return True
774
    except rapi.GanetiApiError as e:
775
        if e.code == 404:
776
            return False
777

    
778

    
779
def job_is_still_running(vm, job_id=None):
780
    with pooled_rapi_client(vm) as c:
781
        try:
782
            if job_id is None:
783
                job_id = vm.backendjobid
784
            job_info = c.GetJobStatus(job_id)
785
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
786
        except rapi.GanetiApiError:
787
            return False
788

    
789

    
790
def nic_is_stale(vm, nic, timeout=60):
791
    """Check if a NIC is stale or exists in the Ganeti backend."""
792
    # First check the state of the NIC and if there is a pending CONNECT
793
    if nic.state == "BUILD" and vm.task == "CONNECT":
794
        if datetime.now() < nic.created + timedelta(seconds=timeout):
795
            # Do not check for too recent NICs to avoid the time overhead
796
            return False
797
        if job_is_still_running(vm, job_id=vm.task_job_id):
798
            return False
799
        else:
800
            # If job has finished, check that the NIC exists, because the
801
            # message may have been lost or stuck in the queue.
802
            vm_info = get_instance_info(vm)
803
            if nic.backend_uuid in vm_info["nic.names"]:
804
                return False
805
    return True
806

    
807

    
808
def ensure_network_is_active(backend, network_id):
809
    """Ensure that a network is active in the specified backend
810

811
    Check that a network exists and is active in the specified backend. If not
812
    (re-)create the network. Return the corresponding BackendNetwork object
813
    and the IDs of the Ganeti job to create the network.
814

815
    """
816
    job_ids = []
817
    try:
818
        bnet = BackendNetwork.objects.select_related("network")\
819
                                     .get(backend=backend, network=network_id)
820
        if bnet.operstate != "ACTIVE":
821
            job_ids = create_network(bnet.network, backend, connect=True)
822
    except BackendNetwork.DoesNotExist:
823
        network = Network.objects.select_for_update().get(id=network_id)
824
        bnet = BackendNetwork.objects.create(backend=backend, network=network)
825
        job_ids = create_network(network, backend, connect=True)
826

    
827
    return bnet, job_ids
828

    
829

    
830
def create_network(network, backend, connect=True):
831
    """Create a network in a Ganeti backend"""
832
    log.debug("Creating network %s in backend %s", network, backend)
833

    
834
    job_id = _create_network(network, backend)
835

    
836
    if connect:
837
        job_ids = connect_network(network, backend, depends=[job_id])
838
        return job_ids
839
    else:
840
        return [job_id]
841

    
842

    
843
def _create_network(network, backend):
844
    """Create a network."""
845

    
846
    tags = network.backend_tag
847
    subnet = None
848
    subnet6 = None
849
    gateway = None
850
    gateway6 = None
851
    for _subnet in network.subnets.all():
852
        if _subnet.dhcp and not "nfdhcpd" in tags:
853
            tags.append("nfdhcpd")
854
        if _subnet.ipversion == 4:
855
            subnet = _subnet.cidr
856
            gateway = _subnet.gateway
857
        elif _subnet.ipversion == 6:
858
            subnet6 = _subnet.cidr
859
            gateway6 = _subnet.gateway
860

    
861
    conflicts_check = False
862
    if network.public:
863
        tags.append('public')
864
        if subnet is not None:
865
            conflicts_check = True
866
    else:
867
        tags.append('private')
868

    
869
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
870
    # not support IPv6 only networks. To bypass this limitation, we create the
871
    # network with a dummy network subnet, and make Cyclades connect instances
872
    # to such networks, with address=None.
873
    if subnet is None:
874
        subnet = "10.0.0.0/29"
875

    
876
    try:
877
        bn = BackendNetwork.objects.get(network=network, backend=backend)
878
        mac_prefix = bn.mac_prefix
879
    except BackendNetwork.DoesNotExist:
880
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
881
                        " does not exist" % (network.id, backend.id))
882

    
883
    with pooled_rapi_client(backend) as client:
884
        return client.CreateNetwork(network_name=network.backend_id,
885
                                    network=subnet,
886
                                    network6=subnet6,
887
                                    gateway=gateway,
888
                                    gateway6=gateway6,
889
                                    mac_prefix=mac_prefix,
890
                                    conflicts_check=conflicts_check,
891
                                    tags=tags)
892

    
893

    
894
def connect_network(network, backend, depends=[], group=None):
895
    """Connect a network to nodegroups."""
896
    log.debug("Connecting network %s to backend %s", network, backend)
897

    
898
    conflicts_check = False
899
    if network.public and (network.subnet4 is not None):
900
        conflicts_check = True
901

    
902
    depends = create_job_dependencies(depends)
903
    with pooled_rapi_client(backend) as client:
904
        groups = [group] if group is not None else client.GetGroups()
905
        job_ids = []
906
        for group in groups:
907
            job_id = client.ConnectNetwork(network.backend_id, group,
908
                                           network.mode, network.link,
909
                                           conflicts_check,
910
                                           depends=depends)
911
            job_ids.append(job_id)
912
    return job_ids
913

    
914

    
915
def delete_network(network, backend, disconnect=True):
916
    log.debug("Deleting network %s from backend %s", network, backend)
917

    
918
    depends = []
919
    if disconnect:
920
        depends = disconnect_network(network, backend)
921
    _delete_network(network, backend, depends=depends)
922

    
923

    
924
def _delete_network(network, backend, depends=[]):
925
    depends = create_job_dependencies(depends)
926
    with pooled_rapi_client(backend) as client:
927
        return client.DeleteNetwork(network.backend_id, depends)
928

    
929

    
930
def disconnect_network(network, backend, group=None):
931
    log.debug("Disconnecting network %s to backend %s", network, backend)
932

    
933
    with pooled_rapi_client(backend) as client:
934
        groups = [group] if group is not None else client.GetGroups()
935
        job_ids = []
936
        for group in groups:
937
            job_id = client.DisconnectNetwork(network.backend_id, group)
938
            job_ids.append(job_id)
939
    return job_ids
940

    
941

    
942
def connect_to_network(vm, nic):
943
    network = nic.network
944
    backend = vm.backend
945
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
946

    
947
    depends = create_job_dependencies(depend_jobs)
948

    
949
    nic = {'name': nic.backend_uuid,
950
           'network': network.backend_id,
951
           'ip': nic.ipv4_address}
952

    
953
    log.debug("Adding NIC %s to VM %s", nic, vm)
954

    
955
    kwargs = {
956
        "instance": vm.backend_vm_id,
957
        "nics": [("add", "-1", nic)],
958
        "depends": depends,
959
    }
960
    if vm.backend.use_hotplug():
961
        kwargs["hotplug_if_possible"] = True
962
    if settings.TEST:
963
        kwargs["dry_run"] = True
964

    
965
    with pooled_rapi_client(vm) as client:
966
        return client.ModifyInstance(**kwargs)
967

    
968

    
969
def disconnect_from_network(vm, nic):
970
    log.debug("Removing NIC %s of VM %s", nic, vm)
971

    
972
    kwargs = {
973
        "instance": vm.backend_vm_id,
974
        "nics": [("remove", nic.backend_uuid, {})],
975
    }
976
    if vm.backend.use_hotplug():
977
        kwargs["hotplug_if_possible"] = True
978
    if settings.TEST:
979
        kwargs["dry_run"] = True
980

    
981
    with pooled_rapi_client(vm) as client:
982
        jobID = client.ModifyInstance(**kwargs)
983
        firewall_profile = nic.firewall_profile
984
        if firewall_profile and firewall_profile != "DISABLED":
985
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
986
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
987
                                      dry_run=settings.TEST)
988

    
989
        return jobID
990

    
991

    
992
def set_firewall_profile(vm, profile, nic):
993
    uuid = nic.backend_uuid
994
    try:
995
        tag = _firewall_tags[profile] % uuid
996
    except KeyError:
997
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
998

    
999
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
1000

    
1001
    with pooled_rapi_client(vm) as client:
1002
        # Delete previous firewall tags
1003
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
1004
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
1005
                       if (t % uuid) in old_tags]
1006
        if delete_tags:
1007
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1008
                                      dry_run=settings.TEST)
1009

    
1010
        if profile != "DISABLED":
1011
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1012
                                   dry_run=settings.TEST)
1013

    
1014
        # XXX NOP ModifyInstance call to force process_net_status to run
1015
        # on the dispatcher
1016
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1017
        client.ModifyInstance(vm.backend_vm_id,
1018
                              os_name=os_name)
1019
    return None
1020

    
1021

    
1022
def get_instances(backend, bulk=True):
1023
    with pooled_rapi_client(backend) as c:
1024
        return c.GetInstances(bulk=bulk)
1025

    
1026

    
1027
def get_nodes(backend, bulk=True):
1028
    with pooled_rapi_client(backend) as c:
1029
        return c.GetNodes(bulk=bulk)
1030

    
1031

    
1032
def get_jobs(backend, bulk=True):
1033
    with pooled_rapi_client(backend) as c:
1034
        return c.GetJobs(bulk=bulk)
1035

    
1036

    
1037
def get_physical_resources(backend):
1038
    """ Get the physical resources of a backend.
1039

1040
    Get the resources of a backend as reported by the backend (not the db).
1041

1042
    """
1043
    nodes = get_nodes(backend, bulk=True)
1044
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1045
    res = {}
1046
    for a in attr:
1047
        res[a] = 0
1048
    for n in nodes:
1049
        # Filter out drained, offline and not vm_capable nodes since they will
1050
        # not take part in the vm allocation process
1051
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1052
        if can_host_vms and n['cnodes']:
1053
            for a in attr:
1054
                res[a] += int(n[a] or 0)
1055
    return res
1056

    
1057

    
1058
def update_backend_resources(backend, resources=None):
1059
    """ Update the state of the backend resources in db.
1060

1061
    """
1062

    
1063
    if not resources:
1064
        resources = get_physical_resources(backend)
1065

    
1066
    backend.mfree = resources['mfree']
1067
    backend.mtotal = resources['mtotal']
1068
    backend.dfree = resources['dfree']
1069
    backend.dtotal = resources['dtotal']
1070
    backend.pinst_cnt = resources['pinst_cnt']
1071
    backend.ctotal = resources['ctotal']
1072
    backend.updated = datetime.now()
1073
    backend.save()
1074

    
1075

    
1076
def get_memory_from_instances(backend):
1077
    """ Get the memory that is used from instances.
1078

1079
    Get the used memory of a backend. Note: This is different for
1080
    the real memory used, due to kvm's memory de-duplication.
1081

1082
    """
1083
    with pooled_rapi_client(backend) as client:
1084
        instances = client.GetInstances(bulk=True)
1085
    mem = 0
1086
    for i in instances:
1087
        mem += i['oper_ram']
1088
    return mem
1089

    
1090

    
1091
def get_available_disk_templates(backend):
1092
    """Get the list of available disk templates of a Ganeti backend.
1093

1094
    The list contains the disk templates that are enabled in the Ganeti backend
1095
    and also included in ipolicy-disk-templates.
1096

1097
    """
1098
    with pooled_rapi_client(backend) as c:
1099
        info = c.GetInfo()
1100
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1101
    try:
1102
        enabled_disk_templates = info["enabled_disk_templates"]
1103
        return [dp for dp in enabled_disk_templates
1104
                if dp in ipolicy_disk_templates]
1105
    except KeyError:
1106
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1107
        return ipolicy_disk_templates
1108

    
1109

    
1110
def update_backend_disk_templates(backend):
1111
    disk_templates = get_available_disk_templates(backend)
1112
    backend.disk_templates = disk_templates
1113
    backend.save()
1114

    
1115

    
1116
##
1117
## Synchronized operations for reconciliation
1118
##
1119

    
1120

    
1121
def create_network_synced(network, backend):
1122
    result = _create_network_synced(network, backend)
1123
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1124
        return result
1125
    result = connect_network_synced(network, backend)
1126
    return result
1127

    
1128

    
1129
def _create_network_synced(network, backend):
1130
    with pooled_rapi_client(backend) as client:
1131
        job = _create_network(network, backend)
1132
        result = wait_for_job(client, job)
1133
    return result
1134

    
1135

    
1136
def connect_network_synced(network, backend):
1137
    with pooled_rapi_client(backend) as client:
1138
        for group in client.GetGroups():
1139
            job = client.ConnectNetwork(network.backend_id, group,
1140
                                        network.mode, network.link)
1141
            result = wait_for_job(client, job)
1142
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1143
                return result
1144

    
1145
    return result
1146

    
1147

    
1148
def wait_for_job(client, jobid):
1149
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1150
    status = result['job_info'][0]
1151
    while status not in rapi.JOB_STATUS_FINALIZED:
1152
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1153
                                         [result], None)
1154
        status = result['job_info'][0]
1155

    
1156
    if status == rapi.JOB_STATUS_SUCCESS:
1157
        return (status, None)
1158
    else:
1159
        error = result['job_info'][1]
1160
        return (status, error)
1161

    
1162

    
1163
def create_job_dependencies(job_ids=[], job_states=None):
1164
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1165
    if job_states is None:
1166
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1167
    assert(type(job_states) == list)
1168
    return [[job_id, job_states] for job_id in job_ids]