Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 51136096

History | View | Annotate | Download (41.4 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
59
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
60
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
UNKNOWN_NIC_PREFIX = "unknown-"
62

    
63

    
64
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
65
    """Handle quotas for updated VirtualMachine.
66

67
    Update quotas for the updated VirtualMachine based on the job that run on
68
    the Ganeti backend. If a commission has been already issued for this job,
69
    then this commission is just accepted or rejected based on the job status.
70
    Otherwise, a new commission for the given change is issued, that is also in
71
    force and auto-accept mode. In this case, previous commissions are
72
    rejected, since they reflect a previous state of the VM.
73

74
    """
75
    if job_status not in rapi.JOB_STATUS_FINALIZED:
76
        return vm
77

    
78
    # Check successful completion of a job will trigger any quotable change in
79
    # the VM state.
80
    action = utils.get_action_from_opcode(job_opcode, job_fields)
81
    if action == "BUILD":
82
        # Quotas for new VMs are automatically accepted by the API
83
        return vm
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == rapi.JOB_STATUS_SUCCESS:
92
            quotas.accept_serial(serial)
93
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_serial(serial)
97
        vm.serial = None
98
    elif job_status == rapi.JOB_STATUS_SUCCESS:
99
        commission_info = quotas.get_commission_info(resource=vm,
100
                                                     action=action,
101
                                                     action_fields=job_fields)
102
        if commission_info is not None:
103
            # Commission for this change has not been issued, or the issued
104
            # commission was unaware of the current change. Reject all previous
105
            # commissions and create a new one in forced mode!
106
            log.debug("Expected job was %s. Processing job %s.",
107
                      vm.task_job_id, job_id)
108
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
109
                      % (vm, job_id))
110
            quotas.handle_resource_commission(vm, action,
111
                                              action_fields=job_fields,
112
                                              commission_name=reason,
113
                                              force=True,
114
                                              auto_accept=True)
115
            log.debug("Issued new commission: %s", vm.serial)
116

    
117
    return vm
118

    
119

    
120
@transaction.commit_on_success
121
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
123
    """Process a job progress notification from the backend
124

125
    Process an incoming message from the backend (currently Ganeti).
126
    Job notifications with a terminating status (sucess, error, or canceled),
127
    also update the operating state of the VM.
128

129
    """
130
    # See #1492, #1031, #1111 why this line has been removed
131
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
132
    if status not in [x[0] for x in BACKEND_STATUSES]:
133
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
134

    
135
    vm.backendjobid = jobid
136
    vm.backendjobstatus = status
137
    vm.backendopcode = opcode
138
    vm.backendlogmsg = logmsg
139

    
140
    if status not in rapi.JOB_STATUS_FINALIZED:
141
        vm.save()
142
        return
143

    
144
    if job_fields is None:
145
        job_fields = {}
146

    
147
    new_operstate = None
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

    
150
    if status == rapi.JOB_STATUS_SUCCESS:
151
        # If job succeeds, change operating state if needed
152
        if state_for_success is not None:
153
            new_operstate = state_for_success
154

    
155
        beparams = job_fields.get("beparams", None)
156
        if beparams:
157
            # Change the flavor of the VM
158
            _process_resize(vm, beparams)
159

    
160
        # Update backendtime only for jobs that have been successfully
161
        # completed, since only these jobs update the state of the VM. Else a
162
        # "race condition" may occur when a successful job (e.g.
163
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
164
        # in reversed order.
165
        vm.backendtime = etime
166

    
167
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
168
        # Update the NICs of the VM
169
        _process_net_status(vm, etime, nics)
170

    
171
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
172
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
173
                                                     rapi.JOB_STATUS_ERROR):
174
        new_operstate = "ERROR"
175
        vm.backendtime = etime
176
        # Update state of associated NICs
177
        vm.nics.all().update(state="ERROR")
178
    elif opcode == 'OP_INSTANCE_REMOVE':
179
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
180
        # when no instance exists at the Ganeti backend.
181
        # See ticket #799 for all the details.
182
        if (status == rapi.JOB_STATUS_SUCCESS or
183
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
184
            # VM has been deleted
185
            for nic in vm.nics.all():
186
                # Release the IP
187
                remove_nic_ips(nic)
188
                # And delete the NIC.
189
                nic.delete()
190
            vm.deleted = True
191
            new_operstate = state_for_success
192
            vm.backendtime = etime
193
            status = rapi.JOB_STATUS_SUCCESS
194

    
195
    if status in rapi.JOB_STATUS_FINALIZED:
196
        # Job is finalized: Handle quotas/commissioning
197
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
198
                              job_status=status, job_fields=job_fields)
199
        # and clear task fields
200
        if vm.task_job_id == jobid:
201
            vm.task = None
202
            vm.task_job_id = None
203

    
204
    if new_operstate is not None:
205
        vm.operstate = new_operstate
206

    
207
    vm.save()
208

    
209

    
210
def _process_resize(vm, beparams):
211
    """Change flavor of a VirtualMachine based on new beparams."""
212
    old_flavor = vm.flavor
213
    vcpus = beparams.get("vcpus", old_flavor.cpu)
214
    ram = beparams.get("maxmem", old_flavor.ram)
215
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
216
        return
217
    try:
218
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
219
                                        disk=old_flavor.disk,
220
                                        disk_template=old_flavor.disk_template)
221
    except Flavor.DoesNotExist:
222
        raise Exception("Cannot find flavor for VM")
223
    vm.flavor = new_flavor
224
    vm.save()
225

    
226

    
227
@transaction.commit_on_success
228
def process_net_status(vm, etime, nics):
229
    """Wrap _process_net_status inside transaction."""
230
    _process_net_status(vm, etime, nics)
231

    
232

    
233
def _process_net_status(vm, etime, nics):
234
    """Process a net status notification from the backend
235

236
    Process an incoming message from the Ganeti backend,
237
    detailing the NIC configuration of a VM instance.
238

239
    Update the state of the VM in the DB accordingly.
240

241
    """
242
    ganeti_nics = process_ganeti_nics(nics)
243
    db_nics = dict([(nic.id, nic)
244
                    for nic in vm.nics.prefetch_related("ips__subnet")])
245

    
246
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
247
    # guarantee that no deadlock will occur with Backend allocator.
248
    Backend.objects.select_for_update().get(id=vm.backend_id)
249

    
250
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
251
        db_nic = db_nics.get(nic_name)
252
        ganeti_nic = ganeti_nics.get(nic_name)
253
        if ganeti_nic is None:
254
            if nic_is_stale(vm, nic):
255
                log.debug("Removing stale NIC '%s'" % db_nic)
256
                remove_nic_ips(db_nic)
257
                db_nic.delete()
258
            else:
259
                log.info("NIC '%s' is still being created" % db_nic)
260
        elif db_nic is None:
261
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
262
                   " fix this issue!" % (nic_name, vm))
263
            log.error(msg)
264
            continue
265
        elif not nics_are_equal(db_nic, ganeti_nic):
266
            for f in SIMPLE_NIC_FIELDS:
267
                # Update the NIC in DB with the values from Ganeti NIC
268
                setattr(db_nic, f, ganeti_nic[f])
269
                db_nic.save()
270

    
271
            # Special case where the IPv4 address has changed, because you
272
            # need to release the old IPv4 address and reserve the new one
273
            ipv4_address = ganeti_nic["ipv4_address"]
274
            if db_nic.ipv4_address != ipv4_address:
275
                change_address_of_port(db_nic, vm.userid,
276
                                       old_address=db_nic.ipv4_address,
277
                                       new_address=ipv4_address,
278
                                       version=4)
279

    
280
            ipv6_address = ganeti_nic["ipv6_address"]
281
            if db_nic.ipv6_address != ipv6_address:
282
                change_address_of_port(db_nic, vm.userid,
283
                                       old_address=db_nic.ipv6_address,
284
                                       new_address=ipv6_address,
285
                                       version=6)
286

    
287
    vm.backendtime = etime
288
    vm.save()
289

    
290

    
291
def change_address_of_port(port, userid, old_address, new_address, version):
292
    """Change."""
293
    if old_address is not None:
294
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
295
               % (version, port.machine_id, old_address, new_address))
296
        log.critical(msg)
297

    
298
    # Remove the old IP address
299
    remove_nic_ips(port, version=version)
300

    
301
    if version == 4:
302
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
303
        ipaddress.nic = port
304
        ipaddress.save()
305
    elif version == 6:
306
        subnet6 = port.network.subnet6
307
        ipaddress = IPAddress.objects.create(userid=userid,
308
                                             network=port.network,
309
                                             subnet=subnet6,
310
                                             nic=port,
311
                                             address=new_address)
312
    else:
313
        raise ValueError("Unknown version: %s" % version)
314

    
315
    # New address log
316
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
317
                                         network_id=port.network_id,
318
                                         address=new_address,
319
                                         active=True)
320
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
321
             ip_log.id, new_address, port.machine_id)
322

    
323
    return ipaddress
324

    
325

    
326
def nics_are_equal(db_nic, gnt_nic):
327
    for field in NIC_FIELDS:
328
        if getattr(db_nic, field) != gnt_nic[field]:
329
            return False
330
    return True
331

    
332

    
333
def process_ganeti_nics(ganeti_nics):
334
    """Process NIC dict from ganeti"""
335
    new_nics = []
336
    for index, gnic in enumerate(ganeti_nics):
337
        nic_name = gnic.get("name", None)
338
        if nic_name is not None:
339
            nic_id = utils.id_from_nic_name(nic_name)
340
        else:
341
            # Put as default value the index. If it is an unknown NIC to
342
            # synnefo it will be created automaticaly.
343
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
344
        network_name = gnic.get('network', '')
345
        network_id = utils.id_from_network_name(network_name)
346
        network = Network.objects.get(id=network_id)
347

    
348
        # Get the new nic info
349
        mac = gnic.get('mac')
350
        ipv4 = gnic.get('ip')
351
        subnet6 = network.subnet6
352
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
353

    
354
        firewall = gnic.get('firewall')
355
        firewall_profile = _reverse_tags.get(firewall)
356
        if not firewall_profile and network.public:
357
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
358

    
359
        nic_info = {
360
            'index': index,
361
            'network': network,
362
            'mac': mac,
363
            'ipv4_address': ipv4,
364
            'ipv6_address': ipv6,
365
            'firewall_profile': firewall_profile,
366
            'state': 'ACTIVE'}
367

    
368
        new_nics.append((nic_id, nic_info))
369
    return dict(new_nics)
370

    
371

    
372
def remove_nic_ips(nic, version=None):
373
    """Remove IP addresses associated with a NetworkInterface.
374

375
    Remove all IP addresses that are associated with the NetworkInterface
376
    object, by returning them to the pool and deleting the IPAddress object. If
377
    the IP is a floating IP, then it is just disassociated from the NIC.
378
    If version is specified, then only IP addressses of that version will be
379
    removed.
380

381
    """
382
    for ip in nic.ips.all():
383
        if version and ip.ipversion != version:
384
            continue
385

    
386
        # Update the DB table holding the logging of all IP addresses
387
        terminate_active_ipaddress_log(nic, ip)
388

    
389
        if ip.floating_ip:
390
            ip.nic = None
391
            ip.save()
392
        else:
393
            # Release the IPv4 address
394
            ip.release_address()
395
            ip.delete()
396

    
397

    
398
def terminate_active_ipaddress_log(nic, ip):
399
    """Update DB logging entry for this IP address."""
400
    if not ip.network.public or nic.machine is None:
401
        return
402
    try:
403
        ip_log, created = \
404
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
405
                                               network_id=ip.network_id,
406
                                               address=ip.address,
407
                                               active=True)
408
    except IPAddressLog.MultipleObjectsReturned:
409
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
410
                  "Server %s. Cannot proceed!"
411
                  % (ip.address, ip.network, nic.machine))
412
        log.error(logmsg)
413
        raise
414

    
415
    if created:
416
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
417
                  " but with wrong creation timestamp."
418
                  % (ip.address, ip.network, nic.machine))
419
        log.error(logmsg)
420
    ip_log.released_at = datetime.now()
421
    ip_log.active = False
422
    ip_log.save()
423

    
424

    
425
@transaction.commit_on_success
426
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
427
    if status not in [x[0] for x in BACKEND_STATUSES]:
428
        raise Network.InvalidBackendMsgError(opcode, status)
429

    
430
    back_network.backendjobid = jobid
431
    back_network.backendjobstatus = status
432
    back_network.backendopcode = opcode
433
    back_network.backendlogmsg = logmsg
434

    
435
    # Note: Network is already locked!
436
    network = back_network.network
437

    
438
    # Notifications of success change the operating state
439
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
440
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
441
        back_network.operstate = state_for_success
442

    
443
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
444
       and opcode == 'OP_NETWORK_ADD'):
445
        back_network.operstate = 'ERROR'
446
        back_network.backendtime = etime
447

    
448
    if opcode == 'OP_NETWORK_REMOVE':
449
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
450
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
451
                                  network_exists_in_backend(back_network)):
452
            back_network.operstate = state_for_success
453
            back_network.deleted = True
454
            back_network.backendtime = etime
455

    
456
    if status == rapi.JOB_STATUS_SUCCESS:
457
        back_network.backendtime = etime
458
    back_network.save()
459
    # Also you must update the state of the Network!!
460
    update_network_state(network)
461

    
462

    
463
def update_network_state(network):
464
    """Update the state of a Network based on BackendNetwork states.
465

466
    Update the state of a Network based on the operstate of the networks in the
467
    backends that network exists.
468

469
    The state of the network is:
470
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
471
    * DELETED: If it is is 'DELETED' in all backends that have been created.
472

473
    This function also releases the resources (MAC prefix or Bridge) and the
474
    quotas for the network.
475

476
    """
477
    if network.deleted:
478
        # Network has already been deleted. Just assert that state is also
479
        # DELETED
480
        if not network.state == "DELETED":
481
            network.state = "DELETED"
482
            network.save()
483
        return
484

    
485
    backend_states = [s.operstate for s in network.backend_networks.all()]
486
    if not backend_states and network.action != "DESTROY":
487
        if network.state != "ACTIVE":
488
            network.state = "ACTIVE"
489
            network.save()
490
            return
491

    
492
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
493
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
494
                     "DELETED")
495

    
496
    # Release the resources on the deletion of the Network
497
    if deleted:
498
        if network.ips.filter(deleted=False, floating_ip=True).exists():
499
            msg = "Cannot delete network %s! Floating IPs still in use!"
500
            log.error(msg % network)
501
            raise Exception(msg % network)
502
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
503
                 network.id, network.mac_prefix, network.link)
504
        network.deleted = True
505
        network.state = "DELETED"
506
        if network.mac_prefix:
507
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
508
                release_resource(res_type="mac_prefix",
509
                                 value=network.mac_prefix)
510
        if network.link:
511
            if network.FLAVORS[network.flavor]["link"] == "pool":
512
                release_resource(res_type="bridge", value=network.link)
513

    
514
        # Set all subnets as deleted
515
        network.subnets.update(deleted=True)
516
        # And delete the IP pools
517
        for subnet in network.subnets.all():
518
            if subnet.ipversion == 4:
519
                subnet.ip_pools.all().delete()
520
        # And all the backend networks since there are useless
521
        network.backend_networks.all().delete()
522

    
523
        # Issue commission
524
        if network.userid:
525
            quotas.issue_and_accept_commission(network, action="DESTROY")
526
            # the above has already saved the object and committed;
527
            # a second save would override others' changes, since the
528
            # object is now unlocked
529
            return
530
        elif not network.public:
531
            log.warning("Network %s does not have an owner!", network.id)
532
    network.save()
533

    
534

    
535
@transaction.commit_on_success
536
def process_network_modify(back_network, etime, jobid, opcode, status,
537
                           job_fields):
538
    assert (opcode == "OP_NETWORK_SET_PARAMS")
539
    if status not in [x[0] for x in BACKEND_STATUSES]:
540
        raise Network.InvalidBackendMsgError(opcode, status)
541

    
542
    back_network.backendjobid = jobid
543
    back_network.backendjobstatus = status
544
    back_network.opcode = opcode
545

    
546
    add_reserved_ips = job_fields.get("add_reserved_ips")
547
    if add_reserved_ips:
548
        network = back_network.network
549
        for ip in add_reserved_ips:
550
            network.reserve_address(ip, external=True)
551

    
552
    if status == rapi.JOB_STATUS_SUCCESS:
553
        back_network.backendtime = etime
554
    back_network.save()
555

    
556

    
557
@transaction.commit_on_success
558
def process_create_progress(vm, etime, progress):
559

    
560
    percentage = int(progress)
561

    
562
    # The percentage may exceed 100%, due to the way
563
    # snf-image:copy-progress tracks bytes read by image handling processes
564
    percentage = 100 if percentage > 100 else percentage
565
    if percentage < 0:
566
        raise ValueError("Percentage cannot be negative")
567

    
568
    # FIXME: log a warning here, see #1033
569
#   if last_update > percentage:
570
#       raise ValueError("Build percentage should increase monotonically " \
571
#                        "(old = %d, new = %d)" % (last_update, percentage))
572

    
573
    # This assumes that no message of type 'ganeti-create-progress' is going to
574
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
575
    # the instance is STARTED.  What if the two messages are processed by two
576
    # separate dispatcher threads, and the 'ganeti-op-status' message for
577
    # successful creation gets processed before the 'ganeti-create-progress'
578
    # message? [vkoukis]
579
    #
580
    #if not vm.operstate == 'BUILD':
581
    #    raise VirtualMachine.IllegalState("VM is not in building state")
582

    
583
    vm.buildpercentage = percentage
584
    vm.backendtime = etime
585
    vm.save()
586

    
587

    
588
@transaction.commit_on_success
589
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
590
                               details=None):
591
    """
592
    Create virtual machine instance diagnostic entry.
593

594
    :param vm: VirtualMachine instance to create diagnostic for.
595
    :param message: Diagnostic message.
596
    :param source: Diagnostic source identifier (e.g. image-helper).
597
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
598
    :param etime: The time the message occured (if available).
599
    :param details: Additional details or debug information.
600
    """
601
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
602
                                                   source_date=etime,
603
                                                   message=message,
604
                                                   details=details)
605

    
606

    
607
def create_instance(vm, nics, flavor, image):
608
    """`image` is a dictionary which should contain the keys:
609
            'backend_id', 'format' and 'metadata'
610

611
        metadata value should be a dictionary.
612
    """
613

    
614
    # Handle arguments to CreateInstance() as a dictionary,
615
    # initialize it based on a deployment-specific value.
616
    # This enables the administrator to override deployment-specific
617
    # arguments, such as the disk template to use, name of os provider
618
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
619
    #
620
    kw = vm.backend.get_create_params()
621
    kw['mode'] = 'create'
622
    kw['name'] = vm.backend_vm_id
623
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
624

    
625
    kw['disk_template'] = flavor.disk_template
626
    kw['disks'] = [{"size": flavor.disk * 1024}]
627
    provider = flavor.disk_provider
628
    if provider:
629
        kw['disks'][0]['provider'] = provider
630
        kw['disks'][0]['origin'] = flavor.disk_origin
631

    
632
    kw['nics'] = [{"name": nic.backend_uuid,
633
                   "network": nic.network.backend_id,
634
                   "ip": nic.ipv4_address}
635
                  for nic in nics]
636

    
637
    backend = vm.backend
638
    depend_jobs = []
639
    for nic in nics:
640
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
641
        depend_jobs.extend(job_ids)
642

    
643
    kw["depends"] = create_job_dependencies(depend_jobs)
644

    
645
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
646
    # kw['os'] = settings.GANETI_OS_PROVIDER
647
    kw['ip_check'] = False
648
    kw['name_check'] = False
649

    
650
    # Do not specific a node explicitly, have
651
    # Ganeti use an iallocator instead
652
    #kw['pnode'] = rapi.GetNodes()[0]
653

    
654
    kw['dry_run'] = settings.TEST
655

    
656
    kw['beparams'] = {
657
        'auto_balance': True,
658
        'vcpus': flavor.cpu,
659
        'memory': flavor.ram}
660

    
661
    kw['osparams'] = {
662
        'config_url': vm.config_url,
663
        # Store image id and format to Ganeti
664
        'img_id': image['backend_id'],
665
        'img_format': image['format']}
666

    
667
    # Use opportunistic locking
668
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
669

    
670
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
671
    # kw['hvparams'] = dict(serial_console=False)
672

    
673
    log.debug("Creating instance %s", utils.hide_pass(kw))
674
    with pooled_rapi_client(vm) as client:
675
        return client.CreateInstance(**kw)
676

    
677

    
678
def delete_instance(vm):
679
    with pooled_rapi_client(vm) as client:
680
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
681

    
682

    
683
def reboot_instance(vm, reboot_type):
684
    assert reboot_type in ('soft', 'hard')
685
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
686
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
687
    # 'shutdown_timeout'.
688
    kwargs = {"instance": vm.backend_vm_id,
689
              "reboot_type": "hard"}
690
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
691
    # Ganeti > 2.10. In other versions this parameter will be ignored and
692
    # we will fallback to default timeout of Ganeti (120s).
693
    if reboot_type == "hard":
694
        kwargs["shutdown_timeout"] = 0
695
    if settings.TEST:
696
        kwargs["dry_run"] = True
697
    with pooled_rapi_client(vm) as client:
698
        return client.RebootInstance(**kwargs)
699

    
700

    
701
def startup_instance(vm):
702
    with pooled_rapi_client(vm) as client:
703
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
704

    
705

    
706
def shutdown_instance(vm):
707
    with pooled_rapi_client(vm) as client:
708
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
709

    
710

    
711
def resize_instance(vm, vcpus, memory):
712
    beparams = {"vcpus": int(vcpus),
713
                "minmem": int(memory),
714
                "maxmem": int(memory)}
715
    with pooled_rapi_client(vm) as client:
716
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
717

    
718

    
719
def get_instance_console(vm):
720
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
721
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
722
    # useless (see #783).
723
    #
724
    # Until this is fixed on the Ganeti side, construct a console info reply
725
    # directly.
726
    #
727
    # WARNING: This assumes that VNC runs on port network_port on
728
    #          the instance's primary node, and is probably
729
    #          hypervisor-specific.
730
    #
731
    log.debug("Getting console for vm %s", vm)
732

    
733
    console = {}
734
    console['kind'] = 'vnc'
735

    
736
    with pooled_rapi_client(vm) as client:
737
        i = client.GetInstance(vm.backend_vm_id)
738

    
739
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
740
        raise Exception("hv parameter serial_console cannot be true")
741
    console['host'] = i['pnode']
742
    console['port'] = i['network_port']
743

    
744
    return console
745

    
746

    
747
def get_instance_info(vm):
748
    with pooled_rapi_client(vm) as client:
749
        return client.GetInstance(vm.backend_vm_id)
750

    
751

    
752
def vm_exists_in_backend(vm):
753
    try:
754
        get_instance_info(vm)
755
        return True
756
    except rapi.GanetiApiError as e:
757
        if e.code == 404:
758
            return False
759
        raise e
760

    
761

    
762
def get_network_info(backend_network):
763
    with pooled_rapi_client(backend_network) as client:
764
        return client.GetNetwork(backend_network.network.backend_id)
765

    
766

    
767
def network_exists_in_backend(backend_network):
768
    try:
769
        get_network_info(backend_network)
770
        return True
771
    except rapi.GanetiApiError as e:
772
        if e.code == 404:
773
            return False
774

    
775

    
776
def job_is_still_running(vm, job_id=None):
777
    with pooled_rapi_client(vm) as c:
778
        try:
779
            if job_id is None:
780
                job_id = vm.backendjobid
781
            job_info = c.GetJobStatus(job_id)
782
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
783
        except rapi.GanetiApiError:
784
            return False
785

    
786

    
787
def nic_is_stale(vm, nic, timeout=60):
788
    """Check if a NIC is stale or exists in the Ganeti backend."""
789
    # First check the state of the NIC and if there is a pending CONNECT
790
    if nic.state == "BUILD" and vm.task == "CONNECT":
791
        if datetime.now() < nic.created + timedelta(seconds=timeout):
792
            # Do not check for too recent NICs to avoid the time overhead
793
            return False
794
        if job_is_still_running(vm, job_id=vm.task_job_id):
795
            return False
796
        else:
797
            # If job has finished, check that the NIC exists, because the
798
            # message may have been lost or stuck in the queue.
799
            vm_info = get_instance_info(vm)
800
            if nic.backend_uuid in vm_info["nic.names"]:
801
                return False
802
    return True
803

    
804

    
805
def ensure_network_is_active(backend, network_id):
806
    """Ensure that a network is active in the specified backend
807

808
    Check that a network exists and is active in the specified backend. If not
809
    (re-)create the network. Return the corresponding BackendNetwork object
810
    and the IDs of the Ganeti job to create the network.
811

812
    """
813
    network = Network.objects.select_for_update().get(id=network_id)
814
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
815
                                                         network=network)
816
    job_ids = []
817
    if bnet.operstate != "ACTIVE":
818
        job_ids = create_network(network, backend, connect=True)
819

    
820
    return bnet, job_ids
821

    
822

    
823
def create_network(network, backend, connect=True):
824
    """Create a network in a Ganeti backend"""
825
    log.debug("Creating network %s in backend %s", network, backend)
826

    
827
    job_id = _create_network(network, backend)
828

    
829
    if connect:
830
        job_ids = connect_network(network, backend, depends=[job_id])
831
        return job_ids
832
    else:
833
        return [job_id]
834

    
835

    
836
def _create_network(network, backend):
837
    """Create a network."""
838

    
839
    tags = network.backend_tag
840
    subnet = None
841
    subnet6 = None
842
    gateway = None
843
    gateway6 = None
844
    for _subnet in network.subnets.all():
845
        if _subnet.dhcp and not "nfdhcpd" in tags:
846
            tags.append("nfdhcpd")
847
        if _subnet.ipversion == 4:
848
            subnet = _subnet.cidr
849
            gateway = _subnet.gateway
850
        elif _subnet.ipversion == 6:
851
            subnet6 = _subnet.cidr
852
            gateway6 = _subnet.gateway
853

    
854
    if network.public:
855
        conflicts_check = True
856
        tags.append('public')
857
    else:
858
        conflicts_check = False
859
        tags.append('private')
860

    
861
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
862
    # not support IPv6 only networks. To bypass this limitation, we create the
863
    # network with a dummy network subnet, and make Cyclades connect instances
864
    # to such networks, with address=None.
865
    if subnet is None:
866
        subnet = "10.0.0.0/29"
867

    
868
    try:
869
        bn = BackendNetwork.objects.get(network=network, backend=backend)
870
        mac_prefix = bn.mac_prefix
871
    except BackendNetwork.DoesNotExist:
872
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
873
                        " does not exist" % (network.id, backend.id))
874

    
875
    with pooled_rapi_client(backend) as client:
876
        return client.CreateNetwork(network_name=network.backend_id,
877
                                    network=subnet,
878
                                    network6=subnet6,
879
                                    gateway=gateway,
880
                                    gateway6=gateway6,
881
                                    mac_prefix=mac_prefix,
882
                                    conflicts_check=conflicts_check,
883
                                    tags=tags)
884

    
885

    
886
def connect_network(network, backend, depends=[], group=None):
887
    """Connect a network to nodegroups."""
888
    log.debug("Connecting network %s to backend %s", network, backend)
889

    
890
    if network.public:
891
        conflicts_check = True
892
    else:
893
        conflicts_check = False
894

    
895
    depends = create_job_dependencies(depends)
896
    with pooled_rapi_client(backend) as client:
897
        groups = [group] if group is not None else client.GetGroups()
898
        job_ids = []
899
        for group in groups:
900
            job_id = client.ConnectNetwork(network.backend_id, group,
901
                                           network.mode, network.link,
902
                                           conflicts_check,
903
                                           depends=depends)
904
            job_ids.append(job_id)
905
    return job_ids
906

    
907

    
908
def delete_network(network, backend, disconnect=True):
909
    log.debug("Deleting network %s from backend %s", network, backend)
910

    
911
    depends = []
912
    if disconnect:
913
        depends = disconnect_network(network, backend)
914
    _delete_network(network, backend, depends=depends)
915

    
916

    
917
def _delete_network(network, backend, depends=[]):
918
    depends = create_job_dependencies(depends)
919
    with pooled_rapi_client(backend) as client:
920
        return client.DeleteNetwork(network.backend_id, depends)
921

    
922

    
923
def disconnect_network(network, backend, group=None):
924
    log.debug("Disconnecting network %s to backend %s", network, backend)
925

    
926
    with pooled_rapi_client(backend) as client:
927
        groups = [group] if group is not None else client.GetGroups()
928
        job_ids = []
929
        for group in groups:
930
            job_id = client.DisconnectNetwork(network.backend_id, group)
931
            job_ids.append(job_id)
932
    return job_ids
933

    
934

    
935
def connect_to_network(vm, nic):
936
    network = nic.network
937
    backend = vm.backend
938
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
939

    
940
    depends = create_job_dependencies(depend_jobs)
941

    
942
    nic = {'name': nic.backend_uuid,
943
           'network': network.backend_id,
944
           'ip': nic.ipv4_address}
945

    
946
    log.debug("Adding NIC %s to VM %s", nic, vm)
947

    
948
    kwargs = {
949
        "instance": vm.backend_vm_id,
950
        "nics": [("add", "-1", nic)],
951
        "depends": depends,
952
    }
953
    if vm.backend.use_hotplug():
954
        kwargs["hotplug"] = True
955
    if settings.TEST:
956
        kwargs["dry_run"] = True
957

    
958
    with pooled_rapi_client(vm) as client:
959
        return client.ModifyInstance(**kwargs)
960

    
961

    
962
def disconnect_from_network(vm, nic):
963
    log.debug("Removing NIC %s of VM %s", nic, vm)
964

    
965
    kwargs = {
966
        "instance": vm.backend_vm_id,
967
        "nics": [("remove", nic.backend_uuid, {})],
968
    }
969
    if vm.backend.use_hotplug():
970
        kwargs["hotplug"] = True
971
    if settings.TEST:
972
        kwargs["dry_run"] = True
973

    
974
    with pooled_rapi_client(vm) as client:
975
        jobID = client.ModifyInstance(**kwargs)
976
        firewall_profile = nic.firewall_profile
977
        if firewall_profile and firewall_profile != "DISABLED":
978
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
979
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
980
                                      dry_run=settings.TEST)
981

    
982
        return jobID
983

    
984

    
985
def set_firewall_profile(vm, profile, nic):
986
    uuid = nic.backend_uuid
987
    try:
988
        tag = _firewall_tags[profile] % uuid
989
    except KeyError:
990
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
991

    
992
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
993

    
994
    with pooled_rapi_client(vm) as client:
995
        # Delete previous firewall tags
996
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
997
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
998
                       if (t % uuid) in old_tags]
999
        if delete_tags:
1000
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1001
                                      dry_run=settings.TEST)
1002

    
1003
        if profile != "DISABLED":
1004
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1005
                                   dry_run=settings.TEST)
1006

    
1007
        # XXX NOP ModifyInstance call to force process_net_status to run
1008
        # on the dispatcher
1009
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1010
        client.ModifyInstance(vm.backend_vm_id,
1011
                              os_name=os_name)
1012
    return None
1013

    
1014

    
1015
def get_instances(backend, bulk=True):
1016
    with pooled_rapi_client(backend) as c:
1017
        return c.GetInstances(bulk=bulk)
1018

    
1019

    
1020
def get_nodes(backend, bulk=True):
1021
    with pooled_rapi_client(backend) as c:
1022
        return c.GetNodes(bulk=bulk)
1023

    
1024

    
1025
def get_jobs(backend, bulk=True):
1026
    with pooled_rapi_client(backend) as c:
1027
        return c.GetJobs(bulk=bulk)
1028

    
1029

    
1030
def get_physical_resources(backend):
1031
    """ Get the physical resources of a backend.
1032

1033
    Get the resources of a backend as reported by the backend (not the db).
1034

1035
    """
1036
    nodes = get_nodes(backend, bulk=True)
1037
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1038
    res = {}
1039
    for a in attr:
1040
        res[a] = 0
1041
    for n in nodes:
1042
        # Filter out drained, offline and not vm_capable nodes since they will
1043
        # not take part in the vm allocation process
1044
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1045
        if can_host_vms and n['cnodes']:
1046
            for a in attr:
1047
                res[a] += int(n[a] or 0)
1048
    return res
1049

    
1050

    
1051
def update_backend_resources(backend, resources=None):
1052
    """ Update the state of the backend resources in db.
1053

1054
    """
1055

    
1056
    if not resources:
1057
        resources = get_physical_resources(backend)
1058

    
1059
    backend.mfree = resources['mfree']
1060
    backend.mtotal = resources['mtotal']
1061
    backend.dfree = resources['dfree']
1062
    backend.dtotal = resources['dtotal']
1063
    backend.pinst_cnt = resources['pinst_cnt']
1064
    backend.ctotal = resources['ctotal']
1065
    backend.updated = datetime.now()
1066
    backend.save()
1067

    
1068

    
1069
def get_memory_from_instances(backend):
1070
    """ Get the memory that is used from instances.
1071

1072
    Get the used memory of a backend. Note: This is different for
1073
    the real memory used, due to kvm's memory de-duplication.
1074

1075
    """
1076
    with pooled_rapi_client(backend) as client:
1077
        instances = client.GetInstances(bulk=True)
1078
    mem = 0
1079
    for i in instances:
1080
        mem += i['oper_ram']
1081
    return mem
1082

    
1083

    
1084
def get_available_disk_templates(backend):
1085
    """Get the list of available disk templates of a Ganeti backend.
1086

1087
    The list contains the disk templates that are enabled in the Ganeti backend
1088
    and also included in ipolicy-disk-templates.
1089

1090
    """
1091
    with pooled_rapi_client(backend) as c:
1092
        info = c.GetInfo()
1093
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1094
    try:
1095
        enabled_disk_templates = info["enabled_disk_templates"]
1096
        return [dp for dp in enabled_disk_templates
1097
                if dp in ipolicy_disk_templates]
1098
    except KeyError:
1099
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1100
        return ipolicy_disk_templates
1101

    
1102

    
1103
def update_backend_disk_templates(backend):
1104
    disk_templates = get_available_disk_templates(backend)
1105
    backend.disk_templates = disk_templates
1106
    backend.save()
1107

    
1108

    
1109
##
1110
## Synchronized operations for reconciliation
1111
##
1112

    
1113

    
1114
def create_network_synced(network, backend):
1115
    result = _create_network_synced(network, backend)
1116
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1117
        return result
1118
    result = connect_network_synced(network, backend)
1119
    return result
1120

    
1121

    
1122
def _create_network_synced(network, backend):
1123
    with pooled_rapi_client(backend) as client:
1124
        job = _create_network(network, backend)
1125
        result = wait_for_job(client, job)
1126
    return result
1127

    
1128

    
1129
def connect_network_synced(network, backend):
1130
    with pooled_rapi_client(backend) as client:
1131
        for group in client.GetGroups():
1132
            job = client.ConnectNetwork(network.backend_id, group,
1133
                                        network.mode, network.link)
1134
            result = wait_for_job(client, job)
1135
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1136
                return result
1137

    
1138
    return result
1139

    
1140

    
1141
def wait_for_job(client, jobid):
1142
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1143
    status = result['job_info'][0]
1144
    while status not in rapi.JOB_STATUS_FINALIZED:
1145
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1146
                                         [result], None)
1147
        status = result['job_info'][0]
1148

    
1149
    if status == rapi.JOB_STATUS_SUCCESS:
1150
        return (status, None)
1151
    else:
1152
        error = result['job_info'][1]
1153
        return (status, error)
1154

    
1155

    
1156
def create_job_dependencies(job_ids=[], job_states=None):
1157
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1158
    if job_states is None:
1159
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1160
    assert(type(job_states) == list)
1161
    return [[job_id, job_states] for job_id in job_ids]