Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 18cb3999

History | View | Annotate | Download (43.9 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
59
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
60
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
61
UNKNOWN_NIC_PREFIX = "unknown-"
62

    
63

    
64
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
65
    """Handle quotas for updated VirtualMachine.
66

67
    Update quotas for the updated VirtualMachine based on the job that run on
68
    the Ganeti backend. If a commission has been already issued for this job,
69
    then this commission is just accepted or rejected based on the job status.
70
    Otherwise, a new commission for the given change is issued, that is also in
71
    force and auto-accept mode. In this case, previous commissions are
72
    rejected, since they reflect a previous state of the VM.
73

74
    """
75
    if job_status not in rapi.JOB_STATUS_FINALIZED:
76
        return vm
77

    
78
    # Check successful completion of a job will trigger any quotable change in
79
    # the VM state.
80
    action = utils.get_action_from_opcode(job_opcode, job_fields)
81
    if action == "BUILD":
82
        # Quotas for new VMs are automatically accepted by the API
83
        return vm
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == rapi.JOB_STATUS_SUCCESS:
92
            quotas.accept_serial(serial)
93
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_serial(serial)
97
        vm.serial = None
98
    elif job_status == rapi.JOB_STATUS_SUCCESS:
99
        commission_info = quotas.get_commission_info(resource=vm,
100
                                                     action=action,
101
                                                     action_fields=job_fields)
102
        if commission_info is not None:
103
            # Commission for this change has not been issued, or the issued
104
            # commission was unaware of the current change. Reject all previous
105
            # commissions and create a new one in forced mode!
106
            log.debug("Expected job was %s. Processing job %s.",
107
                      vm.task_job_id, job_id)
108
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
109
                      % (vm, job_id))
110
            quotas.handle_resource_commission(vm, action,
111
                                              action_fields=job_fields,
112
                                              commission_name=reason,
113
                                              force=True,
114
                                              auto_accept=True)
115
            log.debug("Issued new commission: %s", vm.serial)
116

    
117
    return vm
118

    
119

    
120
@transaction.commit_on_success
121
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
123
    """Process a job progress notification from the backend
124

125
    Process an incoming message from the backend (currently Ganeti).
126
    Job notifications with a terminating status (sucess, error, or canceled),
127
    also update the operating state of the VM.
128

129
    """
130
    # See #1492, #1031, #1111 why this line has been removed
131
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
132
    if status not in [x[0] for x in BACKEND_STATUSES]:
133
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
134

    
135
    vm.backendjobid = jobid
136
    vm.backendjobstatus = status
137
    vm.backendopcode = opcode
138
    vm.backendlogmsg = logmsg
139

    
140
    if status not in rapi.JOB_STATUS_FINALIZED:
141
        vm.save()
142
        return
143

    
144
    if job_fields is None:
145
        job_fields = {}
146

    
147
    new_operstate = None
148
    new_flavor = None
149
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
150

    
151
    if status == rapi.JOB_STATUS_SUCCESS:
152
        # If job succeeds, change operating state if needed
153
        if state_for_success is not None:
154
            new_operstate = state_for_success
155

    
156
        beparams = job_fields.get("beparams", None)
157
        if beparams:
158
            # Change the flavor of the VM
159
            new_flavor = _process_resize(vm, beparams)
160

    
161
        # Update backendtime only for jobs that have been successfully
162
        # completed, since only these jobs update the state of the VM. Else a
163
        # "race condition" may occur when a successful job (e.g.
164
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
165
        # in reversed order.
166
        vm.backendtime = etime
167

    
168
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
169
        # Update the NICs of the VM
170
        _process_net_status(vm, etime, nics)
171

    
172
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
173
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
174
                                                     rapi.JOB_STATUS_ERROR):
175
        new_operstate = "ERROR"
176
        vm.backendtime = etime
177
        # Update state of associated NICs
178
        vm.nics.all().update(state="ERROR")
179
    elif opcode == 'OP_INSTANCE_REMOVE':
180
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
181
        # when no instance exists at the Ganeti backend.
182
        # See ticket #799 for all the details.
183
        if (status == rapi.JOB_STATUS_SUCCESS or
184
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
185
            # VM has been deleted
186
            for nic in vm.nics.all():
187
                # Release the IP
188
                remove_nic_ips(nic)
189
                # And delete the NIC.
190
                nic.delete()
191
            vm.deleted = True
192
            new_operstate = state_for_success
193
            vm.backendtime = etime
194
            status = rapi.JOB_STATUS_SUCCESS
195
            #status = "success"
196
            vm.volumes.all().update(deleted=True, machine=None)
197

    
198
    if status in rapi.JOB_STATUS_FINALIZED:
199
        # Job is finalized: Handle quotas/commissioning
200
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
201
                              job_status=status, job_fields=job_fields)
202
        # and clear task fields
203
        if vm.task_job_id == jobid:
204
            vm.task = None
205
            vm.task_job_id = None
206

    
207
    if new_operstate is not None:
208
        vm.operstate = new_operstate
209
    if new_flavor is not None:
210
        vm.flavor = new_flavor
211

    
212
    vm.save()
213

    
214

    
215
def _process_resize(vm, beparams):
216
    """Change flavor of a VirtualMachine based on new beparams."""
217
    old_flavor = vm.flavor
218
    vcpus = beparams.get("vcpus", old_flavor.cpu)
219
    ram = beparams.get("maxmem", old_flavor.ram)
220
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
221
        return
222
    try:
223
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
224
                                        disk=old_flavor.disk,
225
                                        disk_template=old_flavor.disk_template)
226
    except Flavor.DoesNotExist:
227
        raise Exception("Cannot find flavor for VM")
228
    return new_flavor
229

    
230

    
231
@transaction.commit_on_success
232
def process_net_status(vm, etime, nics):
233
    """Wrap _process_net_status inside transaction."""
234
    _process_net_status(vm, etime, nics)
235

    
236

    
237
def _process_net_status(vm, etime, nics):
238
    """Process a net status notification from the backend
239

240
    Process an incoming message from the Ganeti backend,
241
    detailing the NIC configuration of a VM instance.
242

243
    Update the state of the VM in the DB accordingly.
244

245
    """
246
    ganeti_nics = process_ganeti_nics(nics)
247
    db_nics = dict([(nic.id, nic)
248
                    for nic in vm.nics.select_related("network")
249
                                      .prefetch_related("ips")])
250

    
251
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
252
        db_nic = db_nics.get(nic_name)
253
        ganeti_nic = ganeti_nics.get(nic_name)
254
        if ganeti_nic is None:
255
            if nic_is_stale(vm, nic):
256
                log.debug("Removing stale NIC '%s'" % db_nic)
257
                remove_nic_ips(db_nic)
258
                db_nic.delete()
259
            else:
260
                log.info("NIC '%s' is still being created" % db_nic)
261
        elif db_nic is None:
262
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
263
                   " fix this issue!" % (nic_name, vm))
264
            log.error(msg)
265
            continue
266
        elif not nics_are_equal(db_nic, ganeti_nic):
267
            for f in SIMPLE_NIC_FIELDS:
268
                # Update the NIC in DB with the values from Ganeti NIC
269
                setattr(db_nic, f, ganeti_nic[f])
270
                db_nic.save()
271

    
272
            # Special case where the IPv4 address has changed, because you
273
            # need to release the old IPv4 address and reserve the new one
274
            gnt_ipv4_address = ganeti_nic["ipv4_address"]
275
            db_ipv4_address = db_nic.ipv4_address
276
            if db_ipv4_address != gnt_ipv4_address:
277
                change_address_of_port(db_nic, vm.userid,
278
                                       old_address=db_ipv4_address,
279
                                       new_address=gnt_ipv4_address,
280
                                       version=4)
281

    
282
            gnt_ipv6_address = ganeti_nic["ipv6_address"]
283
            db_ipv6_address = db_nic.ipv6_address
284
            if db_ipv6_address != gnt_ipv6_address:
285
                change_address_of_port(db_nic, vm.userid,
286
                                       old_address=db_ipv6_address,
287
                                       new_address=gnt_ipv6_address,
288
                                       version=6)
289

    
290
    vm.backendtime = etime
291
    vm.save()
292

    
293

    
294
def change_address_of_port(port, userid, old_address, new_address, version):
295
    """Change."""
296
    if old_address is not None:
297
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
298
               % (version, port.machine_id, old_address, new_address))
299
        log.error(msg)
300

    
301
    # Remove the old IP address
302
    remove_nic_ips(port, version=version)
303

    
304
    if version == 4:
305
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
306
        ipaddress.nic = port
307
        ipaddress.save()
308
    elif version == 6:
309
        subnet6 = port.network.subnet6
310
        ipaddress = IPAddress.objects.create(userid=userid,
311
                                             network=port.network,
312
                                             subnet=subnet6,
313
                                             nic=port,
314
                                             address=new_address,
315
                                             ipversion=6)
316
    else:
317
        raise ValueError("Unknown version: %s" % version)
318

    
319
    # New address log
320
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
321
                                         network_id=port.network_id,
322
                                         address=new_address,
323
                                         active=True)
324
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
325
             ip_log.id, new_address, port.machine_id)
326

    
327
    return ipaddress
328

    
329

    
330
def nics_are_equal(db_nic, gnt_nic):
331
    for field in NIC_FIELDS:
332
        if getattr(db_nic, field) != gnt_nic[field]:
333
            return False
334
    return True
335

    
336

    
337
def process_ganeti_nics(ganeti_nics):
338
    """Process NIC dict from ganeti"""
339
    new_nics = []
340
    for index, gnic in enumerate(ganeti_nics):
341
        nic_name = gnic.get("name", None)
342
        if nic_name is not None:
343
            nic_id = utils.id_from_nic_name(nic_name)
344
        else:
345
            # Put as default value the index. If it is an unknown NIC to
346
            # synnefo it will be created automaticaly.
347
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
348
        network_name = gnic.get('network', '')
349
        network_id = utils.id_from_network_name(network_name)
350
        network = Network.objects.get(id=network_id)
351

    
352
        # Get the new nic info
353
        mac = gnic.get('mac')
354
        ipv4 = gnic.get('ip')
355
        subnet6 = network.subnet6
356
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
357

    
358
        firewall = gnic.get('firewall')
359
        firewall_profile = _reverse_tags.get(firewall)
360
        if not firewall_profile and network.public:
361
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
362

    
363
        nic_info = {
364
            'index': index,
365
            'network': network,
366
            'mac': mac,
367
            'ipv4_address': ipv4,
368
            'ipv6_address': ipv6,
369
            'firewall_profile': firewall_profile,
370
            'state': 'ACTIVE'}
371

    
372
        new_nics.append((nic_id, nic_info))
373
    return dict(new_nics)
374

    
375

    
376
def remove_nic_ips(nic, version=None):
377
    """Remove IP addresses associated with a NetworkInterface.
378

379
    Remove all IP addresses that are associated with the NetworkInterface
380
    object, by returning them to the pool and deleting the IPAddress object. If
381
    the IP is a floating IP, then it is just disassociated from the NIC.
382
    If version is specified, then only IP addressses of that version will be
383
    removed.
384

385
    """
386
    for ip in nic.ips.all():
387
        if version and ip.ipversion != version:
388
            continue
389

    
390
        # Update the DB table holding the logging of all IP addresses
391
        terminate_active_ipaddress_log(nic, ip)
392

    
393
        if ip.floating_ip:
394
            ip.nic = None
395
            ip.save()
396
        else:
397
            # Release the IPv4 address
398
            ip.release_address()
399
            ip.delete()
400

    
401

    
402
def terminate_active_ipaddress_log(nic, ip):
403
    """Update DB logging entry for this IP address."""
404
    if not ip.network.public or nic.machine is None:
405
        return
406
    try:
407
        ip_log, created = \
408
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
409
                                               network_id=ip.network_id,
410
                                               address=ip.address,
411
                                               active=True)
412
    except IPAddressLog.MultipleObjectsReturned:
413
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
414
                  "Server %s. Cannot proceed!"
415
                  % (ip.address, ip.network, nic.machine))
416
        log.error(logmsg)
417
        raise
418

    
419
    if created:
420
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
421
                  " but with wrong creation timestamp."
422
                  % (ip.address, ip.network, nic.machine))
423
        log.error(logmsg)
424
    ip_log.released_at = datetime.now()
425
    ip_log.active = False
426
    ip_log.save()
427

    
428

    
429
@transaction.commit_on_success
430
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
431
    if status not in [x[0] for x in BACKEND_STATUSES]:
432
        raise Network.InvalidBackendMsgError(opcode, status)
433

    
434
    back_network.backendjobid = jobid
435
    back_network.backendjobstatus = status
436
    back_network.backendopcode = opcode
437
    back_network.backendlogmsg = logmsg
438

    
439
    # Note: Network is already locked!
440
    network = back_network.network
441

    
442
    # Notifications of success change the operating state
443
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
444
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
445
        back_network.operstate = state_for_success
446

    
447
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
448
       and opcode == 'OP_NETWORK_ADD'):
449
        back_network.operstate = 'ERROR'
450
        back_network.backendtime = etime
451

    
452
    if opcode == 'OP_NETWORK_REMOVE':
453
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
454
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
455
                                  network_exists_in_backend(back_network)):
456
            back_network.operstate = state_for_success
457
            back_network.deleted = True
458
            back_network.backendtime = etime
459

    
460
    if status == rapi.JOB_STATUS_SUCCESS:
461
        back_network.backendtime = etime
462
    back_network.save()
463
    # Also you must update the state of the Network!!
464
    update_network_state(network)
465

    
466

    
467
def update_network_state(network):
468
    """Update the state of a Network based on BackendNetwork states.
469

470
    Update the state of a Network based on the operstate of the networks in the
471
    backends that network exists.
472

473
    The state of the network is:
474
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
475
    * DELETED: If it is is 'DELETED' in all backends that have been created.
476

477
    This function also releases the resources (MAC prefix or Bridge) and the
478
    quotas for the network.
479

480
    """
481
    if network.deleted:
482
        # Network has already been deleted. Just assert that state is also
483
        # DELETED
484
        if not network.state == "DELETED":
485
            network.state = "DELETED"
486
            network.save()
487
        return
488

    
489
    backend_states = [s.operstate for s in network.backend_networks.all()]
490
    if not backend_states and network.action != "DESTROY":
491
        if network.state != "ACTIVE":
492
            network.state = "ACTIVE"
493
            network.save()
494
            return
495

    
496
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
497
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
498
                     "DELETED")
499

    
500
    # Release the resources on the deletion of the Network
501
    if deleted:
502
        if network.ips.filter(deleted=False, floating_ip=True).exists():
503
            msg = "Cannot delete network %s! Floating IPs still in use!"
504
            log.error(msg % network)
505
            raise Exception(msg % network)
506
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
507
                 network.id, network.mac_prefix, network.link)
508
        network.deleted = True
509
        network.state = "DELETED"
510
        # Undrain the network, otherwise the network state will remain
511
        # as 'SNF:DRAINED'
512
        network.drained = False
513
        if network.mac_prefix:
514
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
515
                release_resource(res_type="mac_prefix",
516
                                 value=network.mac_prefix)
517
        if network.link:
518
            if network.FLAVORS[network.flavor]["link"] == "pool":
519
                release_resource(res_type="bridge", value=network.link)
520

    
521
        # Set all subnets as deleted
522
        network.subnets.update(deleted=True)
523
        # And delete the IP pools
524
        for subnet in network.subnets.all():
525
            if subnet.ipversion == 4:
526
                subnet.ip_pools.all().delete()
527
        # And all the backend networks since there are useless
528
        network.backend_networks.all().delete()
529

    
530
        # Issue commission
531
        if network.userid:
532
            quotas.issue_and_accept_commission(network, action="DESTROY")
533
            # the above has already saved the object and committed;
534
            # a second save would override others' changes, since the
535
            # object is now unlocked
536
            return
537
        elif not network.public:
538
            log.warning("Network %s does not have an owner!", network.id)
539
    network.save()
540

    
541

    
542
@transaction.commit_on_success
543
def process_network_modify(back_network, etime, jobid, opcode, status,
544
                           job_fields):
545
    assert (opcode == "OP_NETWORK_SET_PARAMS")
546
    if status not in [x[0] for x in BACKEND_STATUSES]:
547
        raise Network.InvalidBackendMsgError(opcode, status)
548

    
549
    back_network.backendjobid = jobid
550
    back_network.backendjobstatus = status
551
    back_network.opcode = opcode
552

    
553
    add_reserved_ips = job_fields.get("add_reserved_ips")
554
    if add_reserved_ips:
555
        network = back_network.network
556
        for ip in add_reserved_ips:
557
            network.reserve_address(ip, external=True)
558

    
559
    if status == rapi.JOB_STATUS_SUCCESS:
560
        back_network.backendtime = etime
561
    back_network.save()
562

    
563

    
564
@transaction.commit_on_success
565
def process_create_progress(vm, etime, progress):
566

    
567
    percentage = int(progress)
568

    
569
    # The percentage may exceed 100%, due to the way
570
    # snf-image:copy-progress tracks bytes read by image handling processes
571
    percentage = 100 if percentage > 100 else percentage
572
    if percentage < 0:
573
        raise ValueError("Percentage cannot be negative")
574

    
575
    # FIXME: log a warning here, see #1033
576
#   if last_update > percentage:
577
#       raise ValueError("Build percentage should increase monotonically " \
578
#                        "(old = %d, new = %d)" % (last_update, percentage))
579

    
580
    # This assumes that no message of type 'ganeti-create-progress' is going to
581
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
582
    # the instance is STARTED.  What if the two messages are processed by two
583
    # separate dispatcher threads, and the 'ganeti-op-status' message for
584
    # successful creation gets processed before the 'ganeti-create-progress'
585
    # message? [vkoukis]
586
    #
587
    #if not vm.operstate == 'BUILD':
588
    #    raise VirtualMachine.IllegalState("VM is not in building state")
589

    
590
    vm.buildpercentage = percentage
591
    vm.backendtime = etime
592
    vm.save()
593

    
594

    
595
@transaction.commit_on_success
596
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
597
                               details=None):
598
    """
599
    Create virtual machine instance diagnostic entry.
600

601
    :param vm: VirtualMachine instance to create diagnostic for.
602
    :param message: Diagnostic message.
603
    :param source: Diagnostic source identifier (e.g. image-helper).
604
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
605
    :param etime: The time the message occured (if available).
606
    :param details: Additional details or debug information.
607
    """
608
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
609
                                                   source_date=etime,
610
                                                   message=message,
611
                                                   details=details)
612

    
613

    
614
def create_instance(vm, nics, volumes, flavor, image):
615
    """`image` is a dictionary which should contain the keys:
616
            'backend_id', 'format' and 'metadata'
617

618
        metadata value should be a dictionary.
619
    """
620

    
621
    # Handle arguments to CreateInstance() as a dictionary,
622
    # initialize it based on a deployment-specific value.
623
    # This enables the administrator to override deployment-specific
624
    # arguments, such as the disk template to use, name of os provider
625
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
626
    #
627
    kw = vm.backend.get_create_params()
628
    kw['mode'] = 'create'
629
    kw['name'] = vm.backend_vm_id
630
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
631

    
632
    kw['disk_template'] = flavor.disk_template
633
    disks = []
634
    for volume in volumes:
635
        disk = {}
636
        disk["size"] = volume.size * 1024
637
        provider = flavor.disk_provider
638
        if provider is not None:
639
            disk["provider"] = provider
640
            disk["origin"] = volume.source_image["checksum"]
641
            extra_disk_params = settings.GANETI_DISK_PROVIDER_KWARGS\
642
                                        .get(provider)
643
            if extra_disk_params is not None:
644
                disk.update(extra_disk_params)
645
        disks.append(disk)
646

    
647
    kw["disks"] = disks
648

    
649
    kw['nics'] = [{"name": nic.backend_uuid,
650
                   "network": nic.network.backend_id,
651
                   "ip": nic.ipv4_address}
652
                  for nic in nics]
653

    
654
    backend = vm.backend
655
    depend_jobs = []
656
    for nic in nics:
657
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
658
        depend_jobs.extend(job_ids)
659

    
660
    kw["depends"] = create_job_dependencies(depend_jobs)
661

    
662
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
663
    # kw['os'] = settings.GANETI_OS_PROVIDER
664
    kw['ip_check'] = False
665
    kw['name_check'] = False
666

    
667
    # Do not specific a node explicitly, have
668
    # Ganeti use an iallocator instead
669
    #kw['pnode'] = rapi.GetNodes()[0]
670

    
671
    kw['dry_run'] = settings.TEST
672

    
673
    kw['beparams'] = {
674
        'auto_balance': True,
675
        'vcpus': flavor.cpu,
676
        'memory': flavor.ram}
677

    
678
    kw['osparams'] = {
679
        'config_url': vm.config_url,
680
        # Store image id and format to Ganeti
681
        'img_id': image['backend_id'],
682
        'img_format': image['format']}
683

    
684
    # Use opportunistic locking
685
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
686

    
687
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
688
    # kw['hvparams'] = dict(serial_console=False)
689

    
690
    log.debug("Creating instance %s", utils.hide_pass(kw))
691
    with pooled_rapi_client(vm) as client:
692
        return client.CreateInstance(**kw)
693

    
694

    
695
def delete_instance(vm):
696
    with pooled_rapi_client(vm) as client:
697
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
698

    
699

    
700
def reboot_instance(vm, reboot_type):
701
    assert reboot_type in ('soft', 'hard')
702
    # Note that reboot type of Ganeti job must be always hard. The 'soft' and
703
    # 'hard' type of OS API is different from the one in Ganeti, and maps to
704
    # 'shutdown_timeout'.
705
    kwargs = {"instance": vm.backend_vm_id,
706
              "reboot_type": "hard"}
707
    # 'shutdown_timeout' parameter is only support from snf-ganeti>=2.8.2 and
708
    # Ganeti > 2.10. In other versions this parameter will be ignored and
709
    # we will fallback to default timeout of Ganeti (120s).
710
    if reboot_type == "hard":
711
        kwargs["shutdown_timeout"] = 0
712
    if settings.TEST:
713
        kwargs["dry_run"] = True
714
    with pooled_rapi_client(vm) as client:
715
        return client.RebootInstance(**kwargs)
716

    
717

    
718
def startup_instance(vm):
719
    with pooled_rapi_client(vm) as client:
720
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
721

    
722

    
723
def shutdown_instance(vm):
724
    with pooled_rapi_client(vm) as client:
725
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
726

    
727

    
728
def resize_instance(vm, vcpus, memory):
729
    beparams = {"vcpus": int(vcpus),
730
                "minmem": int(memory),
731
                "maxmem": int(memory)}
732
    with pooled_rapi_client(vm) as client:
733
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
734

    
735

    
736
def get_instance_console(vm):
737
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
738
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
739
    # useless (see #783).
740
    #
741
    # Until this is fixed on the Ganeti side, construct a console info reply
742
    # directly.
743
    #
744
    # WARNING: This assumes that VNC runs on port network_port on
745
    #          the instance's primary node, and is probably
746
    #          hypervisor-specific.
747
    #
748
    log.debug("Getting console for vm %s", vm)
749

    
750
    console = {}
751
    console['kind'] = 'vnc'
752

    
753
    with pooled_rapi_client(vm) as client:
754
        i = client.GetInstance(vm.backend_vm_id)
755

    
756
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
757
        raise Exception("hv parameter serial_console cannot be true")
758
    console['host'] = i['pnode']
759
    console['port'] = i['network_port']
760

    
761
    return console
762

    
763

    
764
def get_instance_info(vm):
765
    with pooled_rapi_client(vm) as client:
766
        return client.GetInstance(vm.backend_vm_id)
767

    
768

    
769
def vm_exists_in_backend(vm):
770
    try:
771
        get_instance_info(vm)
772
        return True
773
    except rapi.GanetiApiError as e:
774
        if e.code == 404:
775
            return False
776
        raise e
777

    
778

    
779
def get_network_info(backend_network):
780
    with pooled_rapi_client(backend_network) as client:
781
        return client.GetNetwork(backend_network.network.backend_id)
782

    
783

    
784
def network_exists_in_backend(backend_network):
785
    try:
786
        get_network_info(backend_network)
787
        return True
788
    except rapi.GanetiApiError as e:
789
        if e.code == 404:
790
            return False
791

    
792

    
793
def job_is_still_running(vm, job_id=None):
794
    with pooled_rapi_client(vm) as c:
795
        try:
796
            if job_id is None:
797
                job_id = vm.backendjobid
798
            job_info = c.GetJobStatus(job_id)
799
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
800
        except rapi.GanetiApiError:
801
            return False
802

    
803

    
804
def nic_is_stale(vm, nic, timeout=60):
805
    """Check if a NIC is stale or exists in the Ganeti backend."""
806
    # First check the state of the NIC and if there is a pending CONNECT
807
    if nic.state == "BUILD" and vm.task == "CONNECT":
808
        if datetime.now() < nic.created + timedelta(seconds=timeout):
809
            # Do not check for too recent NICs to avoid the time overhead
810
            return False
811
        if job_is_still_running(vm, job_id=vm.task_job_id):
812
            return False
813
        else:
814
            # If job has finished, check that the NIC exists, because the
815
            # message may have been lost or stuck in the queue.
816
            vm_info = get_instance_info(vm)
817
            if nic.backend_uuid in vm_info["nic.names"]:
818
                return False
819
    return True
820

    
821

    
822
def ensure_network_is_active(backend, network_id):
823
    """Ensure that a network is active in the specified backend
824

825
    Check that a network exists and is active in the specified backend. If not
826
    (re-)create the network. Return the corresponding BackendNetwork object
827
    and the IDs of the Ganeti job to create the network.
828

829
    """
830
    job_ids = []
831
    try:
832
        bnet = BackendNetwork.objects.select_related("network")\
833
                                     .get(backend=backend, network=network_id)
834
        if bnet.operstate != "ACTIVE":
835
            job_ids = create_network(bnet.network, backend, connect=True)
836
    except BackendNetwork.DoesNotExist:
837
        network = Network.objects.select_for_update().get(id=network_id)
838
        bnet = BackendNetwork.objects.create(backend=backend, network=network)
839
        job_ids = create_network(network, backend, connect=True)
840

    
841
    return bnet, job_ids
842

    
843

    
844
def create_network(network, backend, connect=True):
845
    """Create a network in a Ganeti backend"""
846
    log.debug("Creating network %s in backend %s", network, backend)
847

    
848
    job_id = _create_network(network, backend)
849

    
850
    if connect:
851
        job_ids = connect_network(network, backend, depends=[job_id])
852
        return job_ids
853
    else:
854
        return [job_id]
855

    
856

    
857
def _create_network(network, backend):
858
    """Create a network."""
859

    
860
    tags = network.backend_tag
861
    subnet = None
862
    subnet6 = None
863
    gateway = None
864
    gateway6 = None
865
    for _subnet in network.subnets.all():
866
        if _subnet.dhcp and not "nfdhcpd" in tags:
867
            tags.append("nfdhcpd")
868
        if _subnet.ipversion == 4:
869
            subnet = _subnet.cidr
870
            gateway = _subnet.gateway
871
        elif _subnet.ipversion == 6:
872
            subnet6 = _subnet.cidr
873
            gateway6 = _subnet.gateway
874

    
875
    conflicts_check = False
876
    if network.public:
877
        tags.append('public')
878
        if subnet is not None:
879
            conflicts_check = True
880
    else:
881
        tags.append('private')
882

    
883
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
884
    # not support IPv6 only networks. To bypass this limitation, we create the
885
    # network with a dummy network subnet, and make Cyclades connect instances
886
    # to such networks, with address=None.
887
    if subnet is None:
888
        subnet = "10.0.0.0/29"
889

    
890
    try:
891
        bn = BackendNetwork.objects.get(network=network, backend=backend)
892
        mac_prefix = bn.mac_prefix
893
    except BackendNetwork.DoesNotExist:
894
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
895
                        " does not exist" % (network.id, backend.id))
896

    
897
    with pooled_rapi_client(backend) as client:
898
        return client.CreateNetwork(network_name=network.backend_id,
899
                                    network=subnet,
900
                                    network6=subnet6,
901
                                    gateway=gateway,
902
                                    gateway6=gateway6,
903
                                    mac_prefix=mac_prefix,
904
                                    conflicts_check=conflicts_check,
905
                                    tags=tags)
906

    
907

    
908
def connect_network(network, backend, depends=[], group=None):
909
    """Connect a network to nodegroups."""
910
    log.debug("Connecting network %s to backend %s", network, backend)
911

    
912
    conflicts_check = False
913
    if network.public and (network.subnet4 is not None):
914
        conflicts_check = True
915

    
916
    depends = create_job_dependencies(depends)
917
    with pooled_rapi_client(backend) as client:
918
        groups = [group] if group is not None else client.GetGroups()
919
        job_ids = []
920
        for group in groups:
921
            job_id = client.ConnectNetwork(network.backend_id, group,
922
                                           network.mode, network.link,
923
                                           conflicts_check,
924
                                           depends=depends)
925
            job_ids.append(job_id)
926
    return job_ids
927

    
928

    
929
def delete_network(network, backend, disconnect=True):
930
    log.debug("Deleting network %s from backend %s", network, backend)
931

    
932
    depends = []
933
    if disconnect:
934
        depends = disconnect_network(network, backend)
935
    _delete_network(network, backend, depends=depends)
936

    
937

    
938
def _delete_network(network, backend, depends=[]):
939
    depends = create_job_dependencies(depends)
940
    with pooled_rapi_client(backend) as client:
941
        return client.DeleteNetwork(network.backend_id, depends)
942

    
943

    
944
def disconnect_network(network, backend, group=None):
945
    log.debug("Disconnecting network %s to backend %s", network, backend)
946

    
947
    with pooled_rapi_client(backend) as client:
948
        groups = [group] if group is not None else client.GetGroups()
949
        job_ids = []
950
        for group in groups:
951
            job_id = client.DisconnectNetwork(network.backend_id, group)
952
            job_ids.append(job_id)
953
    return job_ids
954

    
955

    
956
def connect_to_network(vm, nic):
957
    network = nic.network
958
    backend = vm.backend
959
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
960

    
961
    depends = create_job_dependencies(depend_jobs)
962

    
963
    nic = {'name': nic.backend_uuid,
964
           'network': network.backend_id,
965
           'ip': nic.ipv4_address}
966

    
967
    log.debug("Adding NIC %s to VM %s", nic, vm)
968

    
969
    kwargs = {
970
        "instance": vm.backend_vm_id,
971
        "nics": [("add", "-1", nic)],
972
        "depends": depends,
973
    }
974
    if vm.backend.use_hotplug():
975
        kwargs["hotplug_if_possible"] = True
976
    if settings.TEST:
977
        kwargs["dry_run"] = True
978

    
979
    with pooled_rapi_client(vm) as client:
980
        return client.ModifyInstance(**kwargs)
981

    
982

    
983
def disconnect_from_network(vm, nic):
984
    log.debug("Removing NIC %s of VM %s", nic, vm)
985

    
986
    kwargs = {
987
        "instance": vm.backend_vm_id,
988
        "nics": [("remove", nic.backend_uuid, {})],
989
    }
990
    if vm.backend.use_hotplug():
991
        kwargs["hotplug_if_possible"] = True
992
    if settings.TEST:
993
        kwargs["dry_run"] = True
994

    
995
    with pooled_rapi_client(vm) as client:
996
        jobID = client.ModifyInstance(**kwargs)
997
        firewall_profile = nic.firewall_profile
998
        if firewall_profile and firewall_profile != "DISABLED":
999
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
1000
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
1001
                                      dry_run=settings.TEST)
1002

    
1003
        return jobID
1004

    
1005

    
1006
def set_firewall_profile(vm, profile, nic):
1007
    uuid = nic.backend_uuid
1008
    try:
1009
        tag = _firewall_tags[profile] % uuid
1010
    except KeyError:
1011
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
1012

    
1013
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
1014

    
1015
    with pooled_rapi_client(vm) as client:
1016
        # Delete previous firewall tags
1017
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
1018
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
1019
                       if (t % uuid) in old_tags]
1020
        if delete_tags:
1021
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
1022
                                      dry_run=settings.TEST)
1023

    
1024
        if profile != "DISABLED":
1025
            client.AddInstanceTags(vm.backend_vm_id, [tag],
1026
                                   dry_run=settings.TEST)
1027

    
1028
        # XXX NOP ModifyInstance call to force process_net_status to run
1029
        # on the dispatcher
1030
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
1031
        client.ModifyInstance(vm.backend_vm_id,
1032
                              os_name=os_name)
1033
    return None
1034

    
1035

    
1036
def attach_volume(vm, volume, depends=[]):
1037
    log.debug("Attaching volume %s to vm %s", vm, volume)
1038

    
1039
    disk = {"size": volume.size,
1040
            "name": volume.backend_volume_uuid,
1041
            "volume_name": volume.backend_volume_uuid}
1042
    if volume.source_volume_id is not None:
1043
        disk["origin"] = volume.source_volume.backend_volume_uuid
1044
    elif volume.source_snapshot is not None:
1045
        disk["origin"] = volume.source_snapshot["checksum"]
1046
    elif volume.source_image is not None:
1047
        disk["origin"] = volume.source_image["checksum"]
1048

    
1049
    kwargs = {
1050
        "instance": vm.backend_vm_id,
1051
        "disks": [("add", "-1", disk)],
1052
        "depends": depends,
1053
    }
1054
    if vm.backend.use_hotplug():
1055
        kwargs["hotplug"] = True
1056
    if settings.TEST:
1057
        kwargs["dry_run"] = True
1058

    
1059
    with pooled_rapi_client(vm) as client:
1060
        return client.ModifyInstance(**kwargs)
1061

    
1062

    
1063
def detach_volume(vm, volume):
1064
    log.debug("Removing volume %s from vm %s", volume, vm)
1065
    kwargs = {
1066
        "instance": vm.backend_vm_id,
1067
        "disks": [("remove", volume.backend_volume_uuid, {})],
1068
    }
1069
    if vm.backend.use_hotplug():
1070
        kwargs["hotplug"] = True
1071
    if settings.TEST:
1072
        kwargs["dry_run"] = True
1073

    
1074
    with pooled_rapi_client(vm) as client:
1075
        return client.ModifyInstance(**kwargs)
1076

    
1077

    
1078
def snapshot_instance(vm, snapshot_name):
1079
    #volume = instance.volumes.all()[0]
1080
    with pooled_rapi_client(vm) as client:
1081
        return client.SnapshotInstance(instance=vm.backend_vm_id,
1082
                                       snapshot_name=snapshot_name)
1083

    
1084

    
1085
def get_instances(backend, bulk=True):
1086
    with pooled_rapi_client(backend) as c:
1087
        return c.GetInstances(bulk=bulk)
1088

    
1089

    
1090
def get_nodes(backend, bulk=True):
1091
    with pooled_rapi_client(backend) as c:
1092
        return c.GetNodes(bulk=bulk)
1093

    
1094

    
1095
def get_jobs(backend, bulk=True):
1096
    with pooled_rapi_client(backend) as c:
1097
        return c.GetJobs(bulk=bulk)
1098

    
1099

    
1100
def get_physical_resources(backend):
1101
    """ Get the physical resources of a backend.
1102

1103
    Get the resources of a backend as reported by the backend (not the db).
1104

1105
    """
1106
    nodes = get_nodes(backend, bulk=True)
1107
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1108
    res = {}
1109
    for a in attr:
1110
        res[a] = 0
1111
    for n in nodes:
1112
        # Filter out drained, offline and not vm_capable nodes since they will
1113
        # not take part in the vm allocation process
1114
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1115
        if can_host_vms and n['cnodes']:
1116
            for a in attr:
1117
                res[a] += int(n[a] or 0)
1118
    return res
1119

    
1120

    
1121
def update_backend_resources(backend, resources=None):
1122
    """ Update the state of the backend resources in db.
1123

1124
    """
1125

    
1126
    if not resources:
1127
        resources = get_physical_resources(backend)
1128

    
1129
    backend.mfree = resources['mfree']
1130
    backend.mtotal = resources['mtotal']
1131
    backend.dfree = resources['dfree']
1132
    backend.dtotal = resources['dtotal']
1133
    backend.pinst_cnt = resources['pinst_cnt']
1134
    backend.ctotal = resources['ctotal']
1135
    backend.updated = datetime.now()
1136
    backend.save()
1137

    
1138

    
1139
def get_memory_from_instances(backend):
1140
    """ Get the memory that is used from instances.
1141

1142
    Get the used memory of a backend. Note: This is different for
1143
    the real memory used, due to kvm's memory de-duplication.
1144

1145
    """
1146
    with pooled_rapi_client(backend) as client:
1147
        instances = client.GetInstances(bulk=True)
1148
    mem = 0
1149
    for i in instances:
1150
        mem += i['oper_ram']
1151
    return mem
1152

    
1153

    
1154
def get_available_disk_templates(backend):
1155
    """Get the list of available disk templates of a Ganeti backend.
1156

1157
    The list contains the disk templates that are enabled in the Ganeti backend
1158
    and also included in ipolicy-disk-templates.
1159

1160
    """
1161
    with pooled_rapi_client(backend) as c:
1162
        info = c.GetInfo()
1163
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1164
    try:
1165
        enabled_disk_templates = info["enabled_disk_templates"]
1166
        return [dp for dp in enabled_disk_templates
1167
                if dp in ipolicy_disk_templates]
1168
    except KeyError:
1169
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1170
        return ipolicy_disk_templates
1171

    
1172

    
1173
def update_backend_disk_templates(backend):
1174
    disk_templates = get_available_disk_templates(backend)
1175
    backend.disk_templates = disk_templates
1176
    backend.save()
1177

    
1178

    
1179
##
1180
## Synchronized operations for reconciliation
1181
##
1182

    
1183

    
1184
def create_network_synced(network, backend):
1185
    result = _create_network_synced(network, backend)
1186
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1187
        return result
1188
    result = connect_network_synced(network, backend)
1189
    return result
1190

    
1191

    
1192
def _create_network_synced(network, backend):
1193
    with pooled_rapi_client(backend) as client:
1194
        job = _create_network(network, backend)
1195
        result = wait_for_job(client, job)
1196
    return result
1197

    
1198

    
1199
def connect_network_synced(network, backend):
1200
    with pooled_rapi_client(backend) as client:
1201
        for group in client.GetGroups():
1202
            job = client.ConnectNetwork(network.backend_id, group,
1203
                                        network.mode, network.link)
1204
            result = wait_for_job(client, job)
1205
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1206
                return result
1207

    
1208
    return result
1209

    
1210

    
1211
def wait_for_job(client, jobid):
1212
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1213
    status = result['job_info'][0]
1214
    while status not in rapi.JOB_STATUS_FINALIZED:
1215
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1216
                                         [result], None)
1217
        status = result['job_info'][0]
1218

    
1219
    if status == rapi.JOB_STATUS_SUCCESS:
1220
        return (status, None)
1221
    else:
1222
        error = result['job_info'][1]
1223
        return (status, error)
1224

    
1225

    
1226
def create_job_dependencies(job_ids=[], job_states=None):
1227
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1228
    if job_states is None:
1229
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1230
    assert(type(job_states) == list)
1231
    return [[job_id, job_states] for job_id in job_ids]