Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 1fdd8d69

History | View | Annotate | Download (41.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
61

    
62
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
63
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
64
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
65
UNKNOWN_NIC_PREFIX = "unknown-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in rapi.JOB_STATUS_FINALIZED:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88

    
89
    if vm.task_job_id == job_id and vm.serial is not None:
90
        # Commission for this change has already been issued. So just
91
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
92
        # if fails, must be accepted, as the user must manually remove the
93
        # failed server
94
        serial = vm.serial
95
        if job_status == rapi.JOB_STATUS_SUCCESS:
96
            quotas.accept_serial(serial)
97
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
98
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
99
                      serial)
100
            quotas.reject_serial(serial)
101
        vm.serial = None
102
    elif job_status == rapi.JOB_STATUS_SUCCESS:
103
        commission_info = quotas.get_commission_info(resource=vm,
104
                                                     action=action,
105
                                                     action_fields=job_fields)
106
        if commission_info is not None:
107
            # Commission for this change has not been issued, or the issued
108
            # commission was unaware of the current change. Reject all previous
109
            # commissions and create a new one in forced mode!
110
            log.debug("Expected job was %s. Processing job %s.",
111
                      vm.task_job_id, job_id)
112
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
113
                      % (vm, job_id))
114
            quotas.handle_resource_commission(vm, action,
115
                                              action_fields=job_fields,
116
                                              commission_name=reason,
117
                                              force=True,
118
                                              auto_accept=True)
119
            log.debug("Issued new commission: %s", vm.serial)
120

    
121
    return vm
122

    
123

    
124
@transaction.commit_on_success
125
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
126
                      job_fields=None):
127
    """Process a job progress notification from the backend
128

129
    Process an incoming message from the backend (currently Ganeti).
130
    Job notifications with a terminating status (sucess, error, or canceled),
131
    also update the operating state of the VM.
132

133
    """
134
    # See #1492, #1031, #1111 why this line has been removed
135
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
136
    if status not in [x[0] for x in BACKEND_STATUSES]:
137
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
138

    
139
    vm.backendjobid = jobid
140
    vm.backendjobstatus = status
141
    vm.backendopcode = opcode
142
    vm.backendlogmsg = logmsg
143

    
144
    if status not in rapi.JOB_STATUS_FINALIZED:
145
        vm.save()
146
        return
147

    
148
    if job_fields is None:
149
        job_fields = {}
150

    
151
    new_operstate = None
152
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
153

    
154
    if status == rapi.JOB_STATUS_SUCCESS:
155
        # If job succeeds, change operating state if needed
156
        if state_for_success is not None:
157
            new_operstate = state_for_success
158

    
159
        beparams = job_fields.get("beparams", None)
160
        if beparams:
161
            # Change the flavor of the VM
162
            _process_resize(vm, beparams)
163

    
164
        # Update backendtime only for jobs that have been successfully
165
        # completed, since only these jobs update the state of the VM. Else a
166
        # "race condition" may occur when a successful job (e.g.
167
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
168
        # in reversed order.
169
        vm.backendtime = etime
170

    
171
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
172
        # Update the NICs of the VM
173
        _process_net_status(vm, etime, nics)
174

    
175
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
176
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
177
                                                     rapi.JOB_STATUS_ERROR):
178
        new_operstate = "ERROR"
179
        vm.backendtime = etime
180
        # Update state of associated NICs
181
        vm.nics.all().update(state="ERROR")
182
    elif opcode == 'OP_INSTANCE_REMOVE':
183
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
184
        # when no instance exists at the Ganeti backend.
185
        # See ticket #799 for all the details.
186
        if (status == rapi.JOB_STATUS_SUCCESS or
187
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
188
            # VM has been deleted
189
            for nic in vm.nics.all():
190
                # Release the IP
191
                remove_nic_ips(nic)
192
                # And delete the NIC.
193
                nic.delete()
194
            vm.deleted = True
195
            new_operstate = state_for_success
196
            vm.backendtime = etime
197
            status = rapi.JOB_STATUS_SUCCESS
198

    
199
    if status in rapi.JOB_STATUS_FINALIZED:
200
        # Job is finalized: Handle quotas/commissioning
201
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
202
                              job_status=status, job_fields=job_fields)
203
        # and clear task fields
204
        if vm.task_job_id == jobid:
205
            vm.task = None
206
            vm.task_job_id = None
207

    
208
    if new_operstate is not None:
209
        vm.operstate = new_operstate
210

    
211
    vm.save()
212

    
213

    
214
def _process_resize(vm, beparams):
215
    """Change flavor of a VirtualMachine based on new beparams."""
216
    old_flavor = vm.flavor
217
    vcpus = beparams.get("vcpus", old_flavor.cpu)
218
    ram = beparams.get("maxmem", old_flavor.ram)
219
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
220
        return
221
    try:
222
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
223
                                        disk=old_flavor.disk,
224
                                        disk_template=old_flavor.disk_template)
225
    except Flavor.DoesNotExist:
226
        raise Exception("Cannot find flavor for VM")
227
    vm.flavor = new_flavor
228
    vm.save()
229

    
230

    
231
@transaction.commit_on_success
232
def process_net_status(vm, etime, nics):
233
    """Wrap _process_net_status inside transaction."""
234
    _process_net_status(vm, etime, nics)
235

    
236

    
237
def _process_net_status(vm, etime, nics):
238
    """Process a net status notification from the backend
239

240
    Process an incoming message from the Ganeti backend,
241
    detailing the NIC configuration of a VM instance.
242

243
    Update the state of the VM in the DB accordingly.
244

245
    """
246
    ganeti_nics = process_ganeti_nics(nics)
247
    db_nics = dict([(nic.id, nic)
248
                    for nic in vm.nics.prefetch_related("ips__subnet")])
249

    
250
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
251
    # guarantee that no deadlock will occur with Backend allocator.
252
    Backend.objects.select_for_update().get(id=vm.backend_id)
253

    
254
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
255
        db_nic = db_nics.get(nic_name)
256
        ganeti_nic = ganeti_nics.get(nic_name)
257
        if ganeti_nic is None:
258
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
259
            # state for more than 5 minutes, then we remove the NIC.
260
            # TODO: This is dangerous as the job may be stack in the queue, and
261
            # releasing the IP may lead to duplicate IP use.
262
            if db_nic.state != "BUILD" or\
263
                (db_nic.state == "BUILD" and
264
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
265
                remove_nic_ips(db_nic)
266
                db_nic.delete()
267
            else:
268
                log.warning("Ignoring recent building NIC: %s", db_nic)
269
        elif db_nic is None:
270
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
271
                   " fix this issue!" % (nic_name, vm))
272
            log.error(msg)
273
            continue
274
        elif not nics_are_equal(db_nic, ganeti_nic):
275
            for f in SIMPLE_NIC_FIELDS:
276
                # Update the NIC in DB with the values from Ganeti NIC
277
                setattr(db_nic, f, ganeti_nic[f])
278
                db_nic.save()
279

    
280
            # Special case where the IPv4 address has changed, because you
281
            # need to release the old IPv4 address and reserve the new one
282
            ipv4_address = ganeti_nic["ipv4_address"]
283
            if db_nic.ipv4_address != ipv4_address:
284
                change_address_of_port(db_nic, vm.userid,
285
                                       old_address=db_nic.ipv4_address,
286
                                       new_address=ipv4_address,
287
                                       version=4)
288

    
289
            ipv6_address = ganeti_nic["ipv6_address"]
290
            if db_nic.ipv6_address != ipv6_address:
291
                change_address_of_port(db_nic, vm.userid,
292
                                       old_address=db_nic.ipv6_address,
293
                                       new_address=ipv6_address,
294
                                       version=6)
295

    
296
    vm.backendtime = etime
297
    vm.save()
298

    
299

    
300
def change_address_of_port(port, userid, old_address, new_address, version):
301
    """Change."""
302
    if old_address is not None:
303
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
304
               % (version, port.machine_id, old_address, new_address))
305
        log.critical(msg)
306

    
307
    # Remove the old IP address
308
    remove_nic_ips(port, version=version)
309

    
310
    if version == 4:
311
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
312
        ipaddress.nic = port
313
        ipaddress.save()
314
    elif version == 6:
315
        subnet6 = port.network.subnet6
316
        ipaddress = IPAddress.objects.create(userid=userid,
317
                                             network=port.network,
318
                                             subnet=subnet6,
319
                                             nic=port,
320
                                             address=new_address)
321
    else:
322
        raise ValueError("Unknown version: %s" % version)
323

    
324
    # New address log
325
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
326
                                         network_id=port.network_id,
327
                                         address=new_address,
328
                                         active=True)
329
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
330
             ip_log.id, new_address, port.machine_id)
331

    
332
    return ipaddress
333

    
334

    
335
def nics_are_equal(db_nic, gnt_nic):
336
    for field in NIC_FIELDS:
337
        if getattr(db_nic, field) != gnt_nic[field]:
338
            return False
339
    return True
340

    
341

    
342
def process_ganeti_nics(ganeti_nics):
343
    """Process NIC dict from ganeti"""
344
    new_nics = []
345
    for index, gnic in enumerate(ganeti_nics):
346
        nic_name = gnic.get("name", None)
347
        if nic_name is not None:
348
            nic_id = utils.id_from_nic_name(nic_name)
349
        else:
350
            # Put as default value the index. If it is an unknown NIC to
351
            # synnefo it will be created automaticaly.
352
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
353
        network_name = gnic.get('network', '')
354
        network_id = utils.id_from_network_name(network_name)
355
        network = Network.objects.get(id=network_id)
356

    
357
        # Get the new nic info
358
        mac = gnic.get('mac')
359
        ipv4 = gnic.get('ip')
360
        subnet6 = network.subnet6
361
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
362

    
363
        firewall = gnic.get('firewall')
364
        firewall_profile = _reverse_tags.get(firewall)
365
        if not firewall_profile and network.public:
366
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
367

    
368
        nic_info = {
369
            'index': index,
370
            'network': network,
371
            'mac': mac,
372
            'ipv4_address': ipv4,
373
            'ipv6_address': ipv6,
374
            'firewall_profile': firewall_profile,
375
            'state': 'ACTIVE'}
376

    
377
        new_nics.append((nic_id, nic_info))
378
    return dict(new_nics)
379

    
380

    
381
def remove_nic_ips(nic, version=None):
382
    """Remove IP addresses associated with a NetworkInterface.
383

384
    Remove all IP addresses that are associated with the NetworkInterface
385
    object, by returning them to the pool and deleting the IPAddress object. If
386
    the IP is a floating IP, then it is just disassociated from the NIC.
387
    If version is specified, then only IP addressses of that version will be
388
    removed.
389

390
    """
391
    for ip in nic.ips.all():
392
        if version and ip.ipversion != version:
393
            continue
394

    
395
        # Update the DB table holding the logging of all IP addresses
396
        terminate_active_ipaddress_log(nic, ip)
397

    
398
        if ip.floating_ip:
399
            ip.nic = None
400
            ip.save()
401
        else:
402
            # Release the IPv4 address
403
            ip.release_address()
404
            ip.delete()
405

    
406

    
407
def terminate_active_ipaddress_log(nic, ip):
408
    """Update DB logging entry for this IP address."""
409
    if not ip.network.public or nic.machine is None:
410
        return
411
    try:
412
        ip_log, created = \
413
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
414
                                               network_id=ip.network_id,
415
                                               address=ip.address,
416
                                               active=True)
417
    except IPAddressLog.MultipleObjectsReturned:
418
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
419
                  "Server %s. Cannot proceed!"
420
                  % (ip.address, ip.network, nic.machine))
421
        log.error(logmsg)
422
        raise
423

    
424
    if created:
425
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
426
                  " but with wrong creation timestamp."
427
                  % (ip.address, ip.network, nic.machine))
428
        log.error(logmsg)
429
    ip_log.released_at = datetime.now()
430
    ip_log.active = False
431
    ip_log.save()
432

    
433

    
434
@transaction.commit_on_success
435
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
436
    if status not in [x[0] for x in BACKEND_STATUSES]:
437
        raise Network.InvalidBackendMsgError(opcode, status)
438

    
439
    back_network.backendjobid = jobid
440
    back_network.backendjobstatus = status
441
    back_network.backendopcode = opcode
442
    back_network.backendlogmsg = logmsg
443

    
444
    # Note: Network is already locked!
445
    network = back_network.network
446

    
447
    # Notifications of success change the operating state
448
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
449
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
450
        back_network.operstate = state_for_success
451

    
452
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
453
       and opcode == 'OP_NETWORK_ADD'):
454
        back_network.operstate = 'ERROR'
455
        back_network.backendtime = etime
456

    
457
    if opcode == 'OP_NETWORK_REMOVE':
458
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
459
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
460
                                  network_exists_in_backend(back_network)):
461
            back_network.operstate = state_for_success
462
            back_network.deleted = True
463
            back_network.backendtime = etime
464

    
465
    if status == rapi.JOB_STATUS_SUCCESS:
466
        back_network.backendtime = etime
467
    back_network.save()
468
    # Also you must update the state of the Network!!
469
    update_network_state(network)
470

    
471

    
472
def update_network_state(network):
473
    """Update the state of a Network based on BackendNetwork states.
474

475
    Update the state of a Network based on the operstate of the networks in the
476
    backends that network exists.
477

478
    The state of the network is:
479
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
480
    * DELETED: If it is is 'DELETED' in all backends that have been created.
481

482
    This function also releases the resources (MAC prefix or Bridge) and the
483
    quotas for the network.
484

485
    """
486
    if network.deleted:
487
        # Network has already been deleted. Just assert that state is also
488
        # DELETED
489
        if not network.state == "DELETED":
490
            network.state = "DELETED"
491
            network.save()
492
        return
493

    
494
    backend_states = [s.operstate for s in network.backend_networks.all()]
495
    if not backend_states and network.action != "DESTROY":
496
        if network.state != "ACTIVE":
497
            network.state = "ACTIVE"
498
            network.save()
499
            return
500

    
501
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
502
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
503
                     "DELETED")
504

    
505
    # Release the resources on the deletion of the Network
506
    if deleted:
507
        if network.ips.filter(deleted=False, floating_ip=True).exists():
508
            msg = "Cannot delete network %s! Floating IPs still in use!"
509
            log.error(msg % network)
510
            raise Exception(msg % network)
511
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
512
                 network.id, network.mac_prefix, network.link)
513
        network.deleted = True
514
        network.state = "DELETED"
515
        if network.mac_prefix:
516
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
517
                release_resource(res_type="mac_prefix",
518
                                 value=network.mac_prefix)
519
        if network.link:
520
            if network.FLAVORS[network.flavor]["link"] == "pool":
521
                release_resource(res_type="bridge", value=network.link)
522

    
523
        # Set all subnets as deleted
524
        network.subnets.update(deleted=True)
525
        # And delete the IP pools
526
        for subnet in network.subnets.all():
527
            if subnet.ipversion == 4:
528
                subnet.ip_pools.all().delete()
529
        # And all the backend networks since there are useless
530
        network.backend_networks.all().delete()
531

    
532
        # Issue commission
533
        if network.userid:
534
            quotas.issue_and_accept_commission(network, action="DESTROY")
535
            # the above has already saved the object and committed;
536
            # a second save would override others' changes, since the
537
            # object is now unlocked
538
            return
539
        elif not network.public:
540
            log.warning("Network %s does not have an owner!", network.id)
541
    network.save()
542

    
543

    
544
@transaction.commit_on_success
545
def process_network_modify(back_network, etime, jobid, opcode, status,
546
                           job_fields):
547
    assert (opcode == "OP_NETWORK_SET_PARAMS")
548
    if status not in [x[0] for x in BACKEND_STATUSES]:
549
        raise Network.InvalidBackendMsgError(opcode, status)
550

    
551
    back_network.backendjobid = jobid
552
    back_network.backendjobstatus = status
553
    back_network.opcode = opcode
554

    
555
    add_reserved_ips = job_fields.get("add_reserved_ips")
556
    if add_reserved_ips:
557
        network = back_network.network
558
        for ip in add_reserved_ips:
559
            network.reserve_address(ip, external=True)
560

    
561
    if status == rapi.JOB_STATUS_SUCCESS:
562
        back_network.backendtime = etime
563
    back_network.save()
564

    
565

    
566
@transaction.commit_on_success
567
def process_create_progress(vm, etime, progress):
568

    
569
    percentage = int(progress)
570

    
571
    # The percentage may exceed 100%, due to the way
572
    # snf-image:copy-progress tracks bytes read by image handling processes
573
    percentage = 100 if percentage > 100 else percentage
574
    if percentage < 0:
575
        raise ValueError("Percentage cannot be negative")
576

    
577
    # FIXME: log a warning here, see #1033
578
#   if last_update > percentage:
579
#       raise ValueError("Build percentage should increase monotonically " \
580
#                        "(old = %d, new = %d)" % (last_update, percentage))
581

    
582
    # This assumes that no message of type 'ganeti-create-progress' is going to
583
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
584
    # the instance is STARTED.  What if the two messages are processed by two
585
    # separate dispatcher threads, and the 'ganeti-op-status' message for
586
    # successful creation gets processed before the 'ganeti-create-progress'
587
    # message? [vkoukis]
588
    #
589
    #if not vm.operstate == 'BUILD':
590
    #    raise VirtualMachine.IllegalState("VM is not in building state")
591

    
592
    vm.buildpercentage = percentage
593
    vm.backendtime = etime
594
    vm.save()
595

    
596

    
597
@transaction.commit_on_success
598
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
599
                               details=None):
600
    """
601
    Create virtual machine instance diagnostic entry.
602

603
    :param vm: VirtualMachine instance to create diagnostic for.
604
    :param message: Diagnostic message.
605
    :param source: Diagnostic source identifier (e.g. image-helper).
606
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
607
    :param etime: The time the message occured (if available).
608
    :param details: Additional details or debug information.
609
    """
610
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
611
                                                   source_date=etime,
612
                                                   message=message,
613
                                                   details=details)
614

    
615

    
616
def create_instance(vm, nics, flavor, image):
617
    """`image` is a dictionary which should contain the keys:
618
            'backend_id', 'format' and 'metadata'
619

620
        metadata value should be a dictionary.
621
    """
622

    
623
    # Handle arguments to CreateInstance() as a dictionary,
624
    # initialize it based on a deployment-specific value.
625
    # This enables the administrator to override deployment-specific
626
    # arguments, such as the disk template to use, name of os provider
627
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
628
    #
629
    kw = vm.backend.get_create_params()
630
    kw['mode'] = 'create'
631
    kw['name'] = vm.backend_vm_id
632
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
633

    
634
    kw['disk_template'] = flavor.disk_template
635
    kw['disks'] = [{"size": flavor.disk * 1024}]
636
    provider = flavor.disk_provider
637
    if provider:
638
        kw['disks'][0]['provider'] = provider
639
        kw['disks'][0]['origin'] = flavor.disk_origin
640

    
641
    kw['nics'] = [{"name": nic.backend_uuid,
642
                   "network": nic.network.backend_id,
643
                   "ip": nic.ipv4_address}
644
                  for nic in nics]
645

    
646
    backend = vm.backend
647
    depend_jobs = []
648
    for nic in nics:
649
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
650
        depend_jobs.extend(job_ids)
651

    
652
    kw["depends"] = create_job_dependencies(depend_jobs)
653

    
654
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
655
    # kw['os'] = settings.GANETI_OS_PROVIDER
656
    kw['ip_check'] = False
657
    kw['name_check'] = False
658

    
659
    # Do not specific a node explicitly, have
660
    # Ganeti use an iallocator instead
661
    #kw['pnode'] = rapi.GetNodes()[0]
662

    
663
    kw['dry_run'] = settings.TEST
664

    
665
    kw['beparams'] = {
666
        'auto_balance': True,
667
        'vcpus': flavor.cpu,
668
        'memory': flavor.ram}
669

    
670
    kw['osparams'] = {
671
        'config_url': vm.config_url,
672
        # Store image id and format to Ganeti
673
        'img_id': image['backend_id'],
674
        'img_format': image['format']}
675

    
676
    # Use opportunistic locking
677
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
678

    
679
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
680
    # kw['hvparams'] = dict(serial_console=False)
681

    
682
    log.debug("Creating instance %s", utils.hide_pass(kw))
683
    with pooled_rapi_client(vm) as client:
684
        return client.CreateInstance(**kw)
685

    
686

    
687
def delete_instance(vm):
688
    with pooled_rapi_client(vm) as client:
689
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
690

    
691

    
692
def reboot_instance(vm, reboot_type):
693
    assert reboot_type in ('soft', 'hard')
694
    kwargs = {"instance": vm.backend_vm_id,
695
              "reboot_type": "hard"}
696
    # XXX: Currently shutdown_timeout parameter is not supported from the
697
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
698
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
699
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
700
    # of OS API is different from the one in Ganeti, and maps to
701
    # 'shutdown_timeout'.
702
    #if reboot_type == "hard":
703
    #    kwargs["shutdown_timeout"] = 0
704
    if settings.TEST:
705
        kwargs["dry_run"] = True
706
    with pooled_rapi_client(vm) as client:
707
        return client.RebootInstance(**kwargs)
708

    
709

    
710
def startup_instance(vm):
711
    with pooled_rapi_client(vm) as client:
712
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
713

    
714

    
715
def shutdown_instance(vm):
716
    with pooled_rapi_client(vm) as client:
717
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
718

    
719

    
720
def resize_instance(vm, vcpus, memory):
721
    beparams = {"vcpus": int(vcpus),
722
                "minmem": int(memory),
723
                "maxmem": int(memory)}
724
    with pooled_rapi_client(vm) as client:
725
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
726

    
727

    
728
def get_instance_console(vm):
729
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
730
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
731
    # useless (see #783).
732
    #
733
    # Until this is fixed on the Ganeti side, construct a console info reply
734
    # directly.
735
    #
736
    # WARNING: This assumes that VNC runs on port network_port on
737
    #          the instance's primary node, and is probably
738
    #          hypervisor-specific.
739
    #
740
    log.debug("Getting console for vm %s", vm)
741

    
742
    console = {}
743
    console['kind'] = 'vnc'
744

    
745
    with pooled_rapi_client(vm) as client:
746
        i = client.GetInstance(vm.backend_vm_id)
747

    
748
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
749
        raise Exception("hv parameter serial_console cannot be true")
750
    console['host'] = i['pnode']
751
    console['port'] = i['network_port']
752

    
753
    return console
754

    
755

    
756
def get_instance_info(vm):
757
    with pooled_rapi_client(vm) as client:
758
        return client.GetInstance(vm.backend_vm_id)
759

    
760

    
761
def vm_exists_in_backend(vm):
762
    try:
763
        get_instance_info(vm)
764
        return True
765
    except rapi.GanetiApiError as e:
766
        if e.code == 404:
767
            return False
768
        raise e
769

    
770

    
771
def get_network_info(backend_network):
772
    with pooled_rapi_client(backend_network) as client:
773
        return client.GetNetwork(backend_network.network.backend_id)
774

    
775

    
776
def network_exists_in_backend(backend_network):
777
    try:
778
        get_network_info(backend_network)
779
        return True
780
    except rapi.GanetiApiError as e:
781
        if e.code == 404:
782
            return False
783

    
784

    
785
def job_is_still_running(vm):
786
    with pooled_rapi_client(vm) as c:
787
        try:
788
            job_info = c.GetJobStatus(vm.backendjobid)
789
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
790
        except rapi.GanetiApiError:
791
            return False
792

    
793

    
794
def ensure_network_is_active(backend, network_id):
795
    """Ensure that a network is active in the specified backend
796

797
    Check that a network exists and is active in the specified backend. If not
798
    (re-)create the network. Return the corresponding BackendNetwork object
799
    and the IDs of the Ganeti job to create the network.
800

801
    """
802
    network = Network.objects.select_for_update().get(id=network_id)
803
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
804
                                                         network=network)
805
    job_ids = []
806
    if bnet.operstate != "ACTIVE":
807
        job_ids = create_network(network, backend, connect=True)
808

    
809
    return bnet, job_ids
810

    
811

    
812
def create_network(network, backend, connect=True):
813
    """Create a network in a Ganeti backend"""
814
    log.debug("Creating network %s in backend %s", network, backend)
815

    
816
    job_id = _create_network(network, backend)
817

    
818
    if connect:
819
        job_ids = connect_network(network, backend, depends=[job_id])
820
        return job_ids
821
    else:
822
        return [job_id]
823

    
824

    
825
def _create_network(network, backend):
826
    """Create a network."""
827

    
828
    tags = network.backend_tag
829
    subnet = None
830
    subnet6 = None
831
    gateway = None
832
    gateway6 = None
833
    for _subnet in network.subnets.all():
834
        if _subnet.dhcp and not "nfdhcpd" in tags:
835
            tags.append("nfdhcpd")
836
        if _subnet.ipversion == 4:
837
            subnet = _subnet.cidr
838
            gateway = _subnet.gateway
839
        elif _subnet.ipversion == 6:
840
            subnet6 = _subnet.cidr
841
            gateway6 = _subnet.gateway
842

    
843
    if network.public:
844
        conflicts_check = True
845
        tags.append('public')
846
    else:
847
        conflicts_check = False
848
        tags.append('private')
849

    
850
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
851
    # not support IPv6 only networks. To bypass this limitation, we create the
852
    # network with a dummy network subnet, and make Cyclades connect instances
853
    # to such networks, with address=None.
854
    if subnet is None:
855
        subnet = "10.0.0.0/29"
856

    
857
    try:
858
        bn = BackendNetwork.objects.get(network=network, backend=backend)
859
        mac_prefix = bn.mac_prefix
860
    except BackendNetwork.DoesNotExist:
861
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
862
                        " does not exist" % (network.id, backend.id))
863

    
864
    with pooled_rapi_client(backend) as client:
865
        return client.CreateNetwork(network_name=network.backend_id,
866
                                    network=subnet,
867
                                    network6=subnet6,
868
                                    gateway=gateway,
869
                                    gateway6=gateway6,
870
                                    mac_prefix=mac_prefix,
871
                                    conflicts_check=conflicts_check,
872
                                    tags=tags)
873

    
874

    
875
def connect_network(network, backend, depends=[], group=None):
876
    """Connect a network to nodegroups."""
877
    log.debug("Connecting network %s to backend %s", network, backend)
878

    
879
    if network.public:
880
        conflicts_check = True
881
    else:
882
        conflicts_check = False
883

    
884
    depends = create_job_dependencies(depends)
885
    with pooled_rapi_client(backend) as client:
886
        groups = [group] if group is not None else client.GetGroups()
887
        job_ids = []
888
        for group in groups:
889
            job_id = client.ConnectNetwork(network.backend_id, group,
890
                                           network.mode, network.link,
891
                                           conflicts_check,
892
                                           depends=depends)
893
            job_ids.append(job_id)
894
    return job_ids
895

    
896

    
897
def delete_network(network, backend, disconnect=True):
898
    log.debug("Deleting network %s from backend %s", network, backend)
899

    
900
    depends = []
901
    if disconnect:
902
        depends = disconnect_network(network, backend)
903
    _delete_network(network, backend, depends=depends)
904

    
905

    
906
def _delete_network(network, backend, depends=[]):
907
    depends = create_job_dependencies(depends)
908
    with pooled_rapi_client(backend) as client:
909
        return client.DeleteNetwork(network.backend_id, depends)
910

    
911

    
912
def disconnect_network(network, backend, group=None):
913
    log.debug("Disconnecting network %s to backend %s", network, backend)
914

    
915
    with pooled_rapi_client(backend) as client:
916
        groups = [group] if group is not None else client.GetGroups()
917
        job_ids = []
918
        for group in groups:
919
            job_id = client.DisconnectNetwork(network.backend_id, group)
920
            job_ids.append(job_id)
921
    return job_ids
922

    
923

    
924
def connect_to_network(vm, nic):
925
    network = nic.network
926
    backend = vm.backend
927
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
928

    
929
    depends = create_job_dependencies(depend_jobs)
930

    
931
    nic = {'name': nic.backend_uuid,
932
           'network': network.backend_id,
933
           'ip': nic.ipv4_address}
934

    
935
    log.debug("Adding NIC %s to VM %s", nic, vm)
936

    
937
    kwargs = {
938
        "instance": vm.backend_vm_id,
939
        "nics": [("add", "-1", nic)],
940
        "depends": depends,
941
    }
942
    if vm.backend.use_hotplug():
943
        kwargs["hotplug"] = True
944
    if settings.TEST:
945
        kwargs["dry_run"] = True
946

    
947
    with pooled_rapi_client(vm) as client:
948
        return client.ModifyInstance(**kwargs)
949

    
950

    
951
def disconnect_from_network(vm, nic):
952
    log.debug("Removing NIC %s of VM %s", nic, vm)
953

    
954
    kwargs = {
955
        "instance": vm.backend_vm_id,
956
        "nics": [("remove", nic.backend_uuid, {})],
957
    }
958
    if vm.backend.use_hotplug():
959
        kwargs["hotplug"] = True
960
    if settings.TEST:
961
        kwargs["dry_run"] = True
962

    
963
    with pooled_rapi_client(vm) as client:
964
        jobID = client.ModifyInstance(**kwargs)
965
        firewall_profile = nic.firewall_profile
966
        if firewall_profile and firewall_profile != "DISABLED":
967
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
968
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
969
                                      dry_run=settings.TEST)
970

    
971
        return jobID
972

    
973

    
974
def set_firewall_profile(vm, profile, nic):
975
    uuid = nic.backend_uuid
976
    try:
977
        tag = _firewall_tags[profile] % uuid
978
    except KeyError:
979
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
980

    
981
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
982

    
983
    with pooled_rapi_client(vm) as client:
984
        # Delete previous firewall tags
985
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
986
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
987
                       if (t % uuid) in old_tags]
988
        if delete_tags:
989
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
990
                                      dry_run=settings.TEST)
991

    
992
        if profile != "DISABLED":
993
            client.AddInstanceTags(vm.backend_vm_id, [tag],
994
                                   dry_run=settings.TEST)
995

    
996
        # XXX NOP ModifyInstance call to force process_net_status to run
997
        # on the dispatcher
998
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
999
        client.ModifyInstance(vm.backend_vm_id,
1000
                              os_name=os_name)
1001
    return None
1002

    
1003

    
1004
def get_instances(backend, bulk=True):
1005
    with pooled_rapi_client(backend) as c:
1006
        return c.GetInstances(bulk=bulk)
1007

    
1008

    
1009
def get_nodes(backend, bulk=True):
1010
    with pooled_rapi_client(backend) as c:
1011
        return c.GetNodes(bulk=bulk)
1012

    
1013

    
1014
def get_jobs(backend, bulk=True):
1015
    with pooled_rapi_client(backend) as c:
1016
        return c.GetJobs(bulk=bulk)
1017

    
1018

    
1019
def get_physical_resources(backend):
1020
    """ Get the physical resources of a backend.
1021

1022
    Get the resources of a backend as reported by the backend (not the db).
1023

1024
    """
1025
    nodes = get_nodes(backend, bulk=True)
1026
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1027
    res = {}
1028
    for a in attr:
1029
        res[a] = 0
1030
    for n in nodes:
1031
        # Filter out drained, offline and not vm_capable nodes since they will
1032
        # not take part in the vm allocation process
1033
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1034
        if can_host_vms and n['cnodes']:
1035
            for a in attr:
1036
                res[a] += int(n[a] or 0)
1037
    return res
1038

    
1039

    
1040
def update_backend_resources(backend, resources=None):
1041
    """ Update the state of the backend resources in db.
1042

1043
    """
1044

    
1045
    if not resources:
1046
        resources = get_physical_resources(backend)
1047

    
1048
    backend.mfree = resources['mfree']
1049
    backend.mtotal = resources['mtotal']
1050
    backend.dfree = resources['dfree']
1051
    backend.dtotal = resources['dtotal']
1052
    backend.pinst_cnt = resources['pinst_cnt']
1053
    backend.ctotal = resources['ctotal']
1054
    backend.updated = datetime.now()
1055
    backend.save()
1056

    
1057

    
1058
def get_memory_from_instances(backend):
1059
    """ Get the memory that is used from instances.
1060

1061
    Get the used memory of a backend. Note: This is different for
1062
    the real memory used, due to kvm's memory de-duplication.
1063

1064
    """
1065
    with pooled_rapi_client(backend) as client:
1066
        instances = client.GetInstances(bulk=True)
1067
    mem = 0
1068
    for i in instances:
1069
        mem += i['oper_ram']
1070
    return mem
1071

    
1072

    
1073
def get_available_disk_templates(backend):
1074
    """Get the list of available disk templates of a Ganeti backend.
1075

1076
    The list contains the disk templates that are enabled in the Ganeti backend
1077
    and also included in ipolicy-disk-templates.
1078

1079
    """
1080
    with pooled_rapi_client(backend) as c:
1081
        info = c.GetInfo()
1082
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1083
    try:
1084
        enabled_disk_templates = info["enabled_disk_templates"]
1085
        return [dp for dp in enabled_disk_templates
1086
                if dp in ipolicy_disk_templates]
1087
    except KeyError:
1088
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1089
        return ipolicy_disk_templates
1090

    
1091

    
1092
def update_backend_disk_templates(backend):
1093
    disk_templates = get_available_disk_templates(backend)
1094
    backend.disk_templates = disk_templates
1095
    backend.save()
1096

    
1097

    
1098
##
1099
## Synchronized operations for reconciliation
1100
##
1101

    
1102

    
1103
def create_network_synced(network, backend):
1104
    result = _create_network_synced(network, backend)
1105
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1106
        return result
1107
    result = connect_network_synced(network, backend)
1108
    return result
1109

    
1110

    
1111
def _create_network_synced(network, backend):
1112
    with pooled_rapi_client(backend) as client:
1113
        job = _create_network(network, backend)
1114
        result = wait_for_job(client, job)
1115
    return result
1116

    
1117

    
1118
def connect_network_synced(network, backend):
1119
    with pooled_rapi_client(backend) as client:
1120
        for group in client.GetGroups():
1121
            job = client.ConnectNetwork(network.backend_id, group,
1122
                                        network.mode, network.link)
1123
            result = wait_for_job(client, job)
1124
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1125
                return result
1126

    
1127
    return result
1128

    
1129

    
1130
def wait_for_job(client, jobid):
1131
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1132
    status = result['job_info'][0]
1133
    while status not in rapi.JOB_STATUS_FINALIZED:
1134
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1135
                                         [result], None)
1136
        status = result['job_info'][0]
1137

    
1138
    if status == rapi.JOB_STATUS_SUCCESS:
1139
        return (status, None)
1140
    else:
1141
        error = result['job_info'][1]
1142
        return (status, error)
1143

    
1144

    
1145
def create_job_dependencies(job_ids=[], job_states=None):
1146
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1147
    if job_states is None:
1148
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1149
    assert(type(job_states) == list)
1150
    return [[job_id, job_states] for job_id in job_ids]