Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 168a12e2

History | View | Annotate | Download (42.7 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
61

    
62
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
63
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
64
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
65
UNKNOWN_NIC_PREFIX = "unknown-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in rapi.JOB_STATUS_FINALIZED:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88

    
89
    if vm.task_job_id == job_id and vm.serial is not None:
90
        # Commission for this change has already been issued. So just
91
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
92
        # if fails, must be accepted, as the user must manually remove the
93
        # failed server
94
        serial = vm.serial
95
        if job_status == rapi.JOB_STATUS_SUCCESS:
96
            quotas.accept_serial(serial)
97
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
98
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
99
                      serial)
100
            quotas.reject_serial(serial)
101
        vm.serial = None
102
    elif job_status == rapi.JOB_STATUS_SUCCESS:
103
        commission_info = quotas.get_commission_info(resource=vm,
104
                                                     action=action,
105
                                                     action_fields=job_fields)
106
        if commission_info is not None:
107
            # Commission for this change has not been issued, or the issued
108
            # commission was unaware of the current change. Reject all previous
109
            # commissions and create a new one in forced mode!
110
            log.debug("Expected job was %s. Processing job %s.",
111
                      vm.task_job_id, job_id)
112
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
113
                      % (vm, job_id))
114
            quotas.handle_resource_commission(vm, action,
115
                                              action_fields=job_fields,
116
                                              commission_name=reason,
117
                                              force=True,
118
                                              auto_accept=True)
119
            log.debug("Issued new commission: %s", vm.serial)
120

    
121
    return vm
122

    
123

    
124
@transaction.commit_on_success
125
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
126
                      job_fields=None):
127
    """Process a job progress notification from the backend
128

129
    Process an incoming message from the backend (currently Ganeti).
130
    Job notifications with a terminating status (sucess, error, or canceled),
131
    also update the operating state of the VM.
132

133
    """
134
    # See #1492, #1031, #1111 why this line has been removed
135
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
136
    if status not in [x[0] for x in BACKEND_STATUSES]:
137
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
138

    
139
    vm.backendjobid = jobid
140
    vm.backendjobstatus = status
141
    vm.backendopcode = opcode
142
    vm.backendlogmsg = logmsg
143

    
144
    if status not in rapi.JOB_STATUS_FINALIZED:
145
        vm.save()
146
        return
147

    
148
    if job_fields is None:
149
        job_fields = {}
150
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
151

    
152
    # Notifications of success change the operating state
153
    if status == rapi.JOB_STATUS_SUCCESS:
154
        if state_for_success is not None:
155
            vm.operstate = state_for_success
156
        beparams = job_fields.get("beparams", None)
157
        if beparams:
158
            # Change the flavor of the VM
159
            _process_resize(vm, beparams)
160
        # Update backendtime only for jobs that have been successfully
161
        # completed, since only these jobs update the state of the VM. Else a
162
        # "race condition" may occur when a successful job (e.g.
163
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
164
        # in reversed order.
165
        vm.backendtime = etime
166

    
167
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
168
        # Update the NICs of the VM
169
        _process_net_status(vm, etime, nics)
170

    
171
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
172
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
173
                                                     rapi.JOB_STATUS_ERROR):
174
        vm.operstate = 'ERROR'
175
        vm.backendtime = etime
176
        # Update state of associated NICs
177
        vm.nics.all().update(state="ERROR")
178
    elif opcode == 'OP_INSTANCE_REMOVE':
179
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
180
        # when no instance exists at the Ganeti backend.
181
        # See ticket #799 for all the details.
182
        if (status == rapi.JOB_STATUS_SUCCESS or
183
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
184
            # VM has been deleted
185
            for nic in vm.nics.all():
186
                # Release the IP
187
                remove_nic_ips(nic)
188
                # And delete the NIC.
189
                nic.delete()
190
            vm.deleted = True
191
            vm.operstate = state_for_success
192
            vm.backendtime = etime
193
            status = rapi.JOB_STATUS_SUCCESS
194
            #status = "success"
195
            vm.volumes.all().update(deleted=True, machine=None)
196

    
197
    if status in rapi.JOB_STATUS_FINALIZED:
198
        # Job is finalized: Handle quotas/commissioning
199
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
200
                              job_status=status, job_fields=job_fields)
201
        # and clear task fields
202
        if vm.task_job_id == jobid:
203
            vm.task = None
204
            vm.task_job_id = None
205

    
206
    vm.save()
207

    
208

    
209
def _process_resize(vm, beparams):
210
    """Change flavor of a VirtualMachine based on new beparams."""
211
    old_flavor = vm.flavor
212
    vcpus = beparams.get("vcpus", old_flavor.cpu)
213
    ram = beparams.get("maxmem", old_flavor.ram)
214
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
215
        return
216
    try:
217
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
218
                                        disk=old_flavor.disk,
219
                                        disk_template=old_flavor.disk_template)
220
    except Flavor.DoesNotExist:
221
        raise Exception("Cannot find flavor for VM")
222
    vm.flavor = new_flavor
223
    vm.save()
224

    
225

    
226
@transaction.commit_on_success
227
def process_net_status(vm, etime, nics):
228
    """Wrap _process_net_status inside transaction."""
229
    _process_net_status(vm, etime, nics)
230

    
231

    
232
def _process_net_status(vm, etime, nics):
233
    """Process a net status notification from the backend
234

235
    Process an incoming message from the Ganeti backend,
236
    detailing the NIC configuration of a VM instance.
237

238
    Update the state of the VM in the DB accordingly.
239

240
    """
241
    ganeti_nics = process_ganeti_nics(nics)
242
    db_nics = dict([(nic.id, nic)
243
                    for nic in vm.nics.prefetch_related("ips__subnet")])
244

    
245
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
246
    # guarantee that no deadlock will occur with Backend allocator.
247
    Backend.objects.select_for_update().get(id=vm.backend_id)
248

    
249
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
250
        db_nic = db_nics.get(nic_name)
251
        ganeti_nic = ganeti_nics.get(nic_name)
252
        if ganeti_nic is None:
253
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
254
            # state for more than 5 minutes, then we remove the NIC.
255
            # TODO: This is dangerous as the job may be stack in the queue, and
256
            # releasing the IP may lead to duplicate IP use.
257
            if db_nic.state != "BUILD" or\
258
                (db_nic.state == "BUILD" and
259
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
260
                remove_nic_ips(db_nic)
261
                db_nic.delete()
262
            else:
263
                log.warning("Ignoring recent building NIC: %s", db_nic)
264
        elif db_nic is None:
265
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
266
                   " fix this issue!" % (nic_name, vm))
267
            log.error(msg)
268
            continue
269
        elif not nics_are_equal(db_nic, ganeti_nic):
270
            for f in SIMPLE_NIC_FIELDS:
271
                # Update the NIC in DB with the values from Ganeti NIC
272
                setattr(db_nic, f, ganeti_nic[f])
273
                db_nic.save()
274

    
275
            # Special case where the IPv4 address has changed, because you
276
            # need to release the old IPv4 address and reserve the new one
277
            ipv4_address = ganeti_nic["ipv4_address"]
278
            if db_nic.ipv4_address != ipv4_address:
279
                change_address_of_port(db_nic, vm.userid,
280
                                       old_address=db_nic.ipv4_address,
281
                                       new_address=ipv4_address,
282
                                       version=4)
283

    
284
            ipv6_address = ganeti_nic["ipv6_address"]
285
            if db_nic.ipv6_address != ipv6_address:
286
                change_address_of_port(db_nic, vm.userid,
287
                                       old_address=db_nic.ipv6_address,
288
                                       new_address=ipv6_address,
289
                                       version=6)
290

    
291
    vm.backendtime = etime
292
    vm.save()
293

    
294

    
295
def change_address_of_port(port, userid, old_address, new_address, version):
296
    """Change."""
297
    if old_address is not None:
298
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
299
               % (version, port.machine_id, old_address, new_address))
300
        log.critical(msg)
301

    
302
    # Remove the old IP address
303
    remove_nic_ips(port, version=version)
304

    
305
    if version == 4:
306
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
307
        ipaddress.nic = port
308
        ipaddress.save()
309
    elif version == 6:
310
        subnet6 = port.network.subnet6
311
        ipaddress = IPAddress.objects.create(userid=userid,
312
                                             network=port.network,
313
                                             subnet=subnet6,
314
                                             nic=port,
315
                                             address=new_address)
316
    else:
317
        raise ValueError("Unknown version: %s" % version)
318

    
319
    # New address log
320
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
321
                                         network_id=port.network_id,
322
                                         address=new_address,
323
                                         active=True)
324
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
325
             ip_log.id, new_address, port.machine_id)
326

    
327
    return ipaddress
328

    
329

    
330
def nics_are_equal(db_nic, gnt_nic):
331
    for field in NIC_FIELDS:
332
        if getattr(db_nic, field) != gnt_nic[field]:
333
            return False
334
    return True
335

    
336

    
337
def process_ganeti_nics(ganeti_nics):
338
    """Process NIC dict from ganeti"""
339
    new_nics = []
340
    for index, gnic in enumerate(ganeti_nics):
341
        nic_name = gnic.get("name", None)
342
        if nic_name is not None:
343
            nic_id = utils.id_from_nic_name(nic_name)
344
        else:
345
            # Put as default value the index. If it is an unknown NIC to
346
            # synnefo it will be created automaticaly.
347
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
348
        network_name = gnic.get('network', '')
349
        network_id = utils.id_from_network_name(network_name)
350
        network = Network.objects.get(id=network_id)
351

    
352
        # Get the new nic info
353
        mac = gnic.get('mac')
354
        ipv4 = gnic.get('ip')
355
        subnet6 = network.subnet6
356
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
357

    
358
        firewall = gnic.get('firewall')
359
        firewall_profile = _reverse_tags.get(firewall)
360
        if not firewall_profile and network.public:
361
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
362

    
363
        nic_info = {
364
            'index': index,
365
            'network': network,
366
            'mac': mac,
367
            'ipv4_address': ipv4,
368
            'ipv6_address': ipv6,
369
            'firewall_profile': firewall_profile,
370
            'state': 'ACTIVE'}
371

    
372
        new_nics.append((nic_id, nic_info))
373
    return dict(new_nics)
374

    
375

    
376
def remove_nic_ips(nic, version=None):
377
    """Remove IP addresses associated with a NetworkInterface.
378

379
    Remove all IP addresses that are associated with the NetworkInterface
380
    object, by returning them to the pool and deleting the IPAddress object. If
381
    the IP is a floating IP, then it is just disassociated from the NIC.
382
    If version is specified, then only IP addressses of that version will be
383
    removed.
384

385
    """
386
    for ip in nic.ips.all():
387
        if version and ip.ipversion != version:
388
            continue
389

    
390
        # Update the DB table holding the logging of all IP addresses
391
        terminate_active_ipaddress_log(nic, ip)
392

    
393
        if ip.floating_ip:
394
            ip.nic = None
395
            ip.save()
396
        else:
397
            # Release the IPv4 address
398
            ip.release_address()
399
            ip.delete()
400

    
401

    
402
def terminate_active_ipaddress_log(nic, ip):
403
    """Update DB logging entry for this IP address."""
404
    if not ip.network.public or nic.machine is None:
405
        return
406
    try:
407
        ip_log, created = \
408
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
409
                                               network_id=ip.network_id,
410
                                               address=ip.address,
411
                                               active=True)
412
    except IPAddressLog.MultipleObjectsReturned:
413
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
414
                  "Server %s. Cannot proceed!"
415
                  % (ip.address, ip.network, nic.machine))
416
        log.error(logmsg)
417
        raise
418

    
419
    if created:
420
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
421
                  " but with wrong creation timestamp."
422
                  % (ip.address, ip.network, nic.machine))
423
        log.error(logmsg)
424
    ip_log.released_at = datetime.now()
425
    ip_log.active = False
426
    ip_log.save()
427

    
428

    
429
@transaction.commit_on_success
430
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
431
    if status not in [x[0] for x in BACKEND_STATUSES]:
432
        raise Network.InvalidBackendMsgError(opcode, status)
433

    
434
    back_network.backendjobid = jobid
435
    back_network.backendjobstatus = status
436
    back_network.backendopcode = opcode
437
    back_network.backendlogmsg = logmsg
438

    
439
    # Note: Network is already locked!
440
    network = back_network.network
441

    
442
    # Notifications of success change the operating state
443
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
444
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
445
        back_network.operstate = state_for_success
446

    
447
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
448
       and opcode == 'OP_NETWORK_ADD'):
449
        back_network.operstate = 'ERROR'
450
        back_network.backendtime = etime
451

    
452
    if opcode == 'OP_NETWORK_REMOVE':
453
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
454
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
455
                                  network_exists_in_backend(back_network)):
456
            back_network.operstate = state_for_success
457
            back_network.deleted = True
458
            back_network.backendtime = etime
459

    
460
    if status == rapi.JOB_STATUS_SUCCESS:
461
        back_network.backendtime = etime
462
    back_network.save()
463
    # Also you must update the state of the Network!!
464
    update_network_state(network)
465

    
466

    
467
def update_network_state(network):
468
    """Update the state of a Network based on BackendNetwork states.
469

470
    Update the state of a Network based on the operstate of the networks in the
471
    backends that network exists.
472

473
    The state of the network is:
474
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
475
    * DELETED: If it is is 'DELETED' in all backends that have been created.
476

477
    This function also releases the resources (MAC prefix or Bridge) and the
478
    quotas for the network.
479

480
    """
481
    if network.deleted:
482
        # Network has already been deleted. Just assert that state is also
483
        # DELETED
484
        if not network.state == "DELETED":
485
            network.state = "DELETED"
486
            network.save()
487
        return
488

    
489
    backend_states = [s.operstate for s in network.backend_networks.all()]
490
    if not backend_states and network.action != "DESTROY":
491
        if network.state != "ACTIVE":
492
            network.state = "ACTIVE"
493
            network.save()
494
            return
495

    
496
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
497
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
498
                     "DELETED")
499

    
500
    # Release the resources on the deletion of the Network
501
    if deleted:
502
        if network.ips.filter(deleted=False, floating_ip=True).exists():
503
            msg = "Cannot delete network %s! Floating IPs still in use!"
504
            log.error(msg % network)
505
            raise Exception(msg % network)
506
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
507
                 network.id, network.mac_prefix, network.link)
508
        network.deleted = True
509
        network.state = "DELETED"
510
        if network.mac_prefix:
511
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
512
                release_resource(res_type="mac_prefix",
513
                                 value=network.mac_prefix)
514
        if network.link:
515
            if network.FLAVORS[network.flavor]["link"] == "pool":
516
                release_resource(res_type="bridge", value=network.link)
517

    
518
        # Set all subnets as deleted
519
        network.subnets.update(deleted=True)
520
        # And delete the IP pools
521
        for subnet in network.subnets.all():
522
            if subnet.ipversion == 4:
523
                subnet.ip_pools.all().delete()
524
        # And all the backend networks since there are useless
525
        network.backend_networks.all().delete()
526

    
527
        # Issue commission
528
        if network.userid:
529
            quotas.issue_and_accept_commission(network, action="DESTROY")
530
            # the above has already saved the object and committed;
531
            # a second save would override others' changes, since the
532
            # object is now unlocked
533
            return
534
        elif not network.public:
535
            log.warning("Network %s does not have an owner!", network.id)
536
    network.save()
537

    
538

    
539
@transaction.commit_on_success
540
def process_network_modify(back_network, etime, jobid, opcode, status,
541
                           job_fields):
542
    assert (opcode == "OP_NETWORK_SET_PARAMS")
543
    if status not in [x[0] for x in BACKEND_STATUSES]:
544
        raise Network.InvalidBackendMsgError(opcode, status)
545

    
546
    back_network.backendjobid = jobid
547
    back_network.backendjobstatus = status
548
    back_network.opcode = opcode
549

    
550
    add_reserved_ips = job_fields.get("add_reserved_ips")
551
    if add_reserved_ips:
552
        network = back_network.network
553
        for ip in add_reserved_ips:
554
            network.reserve_address(ip, external=True)
555

    
556
    if status == rapi.JOB_STATUS_SUCCESS:
557
        back_network.backendtime = etime
558
    back_network.save()
559

    
560

    
561
@transaction.commit_on_success
562
def process_create_progress(vm, etime, progress):
563

    
564
    percentage = int(progress)
565

    
566
    # The percentage may exceed 100%, due to the way
567
    # snf-image:copy-progress tracks bytes read by image handling processes
568
    percentage = 100 if percentage > 100 else percentage
569
    if percentage < 0:
570
        raise ValueError("Percentage cannot be negative")
571

    
572
    # FIXME: log a warning here, see #1033
573
#   if last_update > percentage:
574
#       raise ValueError("Build percentage should increase monotonically " \
575
#                        "(old = %d, new = %d)" % (last_update, percentage))
576

    
577
    # This assumes that no message of type 'ganeti-create-progress' is going to
578
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
579
    # the instance is STARTED.  What if the two messages are processed by two
580
    # separate dispatcher threads, and the 'ganeti-op-status' message for
581
    # successful creation gets processed before the 'ganeti-create-progress'
582
    # message? [vkoukis]
583
    #
584
    #if not vm.operstate == 'BUILD':
585
    #    raise VirtualMachine.IllegalState("VM is not in building state")
586

    
587
    vm.buildpercentage = percentage
588
    vm.backendtime = etime
589
    vm.save()
590

    
591

    
592
@transaction.commit_on_success
593
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
594
                               details=None):
595
    """
596
    Create virtual machine instance diagnostic entry.
597

598
    :param vm: VirtualMachine instance to create diagnostic for.
599
    :param message: Diagnostic message.
600
    :param source: Diagnostic source identifier (e.g. image-helper).
601
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
602
    :param etime: The time the message occured (if available).
603
    :param details: Additional details or debug information.
604
    """
605
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
606
                                                   source_date=etime,
607
                                                   message=message,
608
                                                   details=details)
609

    
610

    
611
def create_instance(vm, nics, volumes, flavor, image):
612
    """`image` is a dictionary which should contain the keys:
613
            'backend_id', 'format' and 'metadata'
614

615
        metadata value should be a dictionary.
616
    """
617

    
618
    # Handle arguments to CreateInstance() as a dictionary,
619
    # initialize it based on a deployment-specific value.
620
    # This enables the administrator to override deployment-specific
621
    # arguments, such as the disk template to use, name of os provider
622
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
623
    #
624
    kw = vm.backend.get_create_params()
625
    kw['mode'] = 'create'
626
    kw['name'] = vm.backend_vm_id
627
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
628

    
629
    kw['disk_template'] = flavor.disk_template
630
    disks = []
631
    for volume in volumes:
632
        disk = {}
633
        disk["size"] = volume.size * 1024
634
        if flavor.disk_provider is not None:
635
            disk["provider"] = flavor.disk_provider
636
            disk["origin"] = volume.source_image["checksum"]
637
        disks.append(disk)
638

    
639
    kw["disks"] = disks
640

    
641
    kw['nics'] = [{"name": nic.backend_uuid,
642
                   "network": nic.network.backend_id,
643
                   "ip": nic.ipv4_address}
644
                  for nic in nics]
645

    
646
    backend = vm.backend
647
    depend_jobs = []
648
    for nic in nics:
649
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
650
        depend_jobs.extend(job_ids)
651

    
652
    kw["depends"] = create_job_dependencies(depend_jobs)
653

    
654
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
655
    # kw['os'] = settings.GANETI_OS_PROVIDER
656
    kw['ip_check'] = False
657
    kw['name_check'] = False
658

    
659
    # Do not specific a node explicitly, have
660
    # Ganeti use an iallocator instead
661
    #kw['pnode'] = rapi.GetNodes()[0]
662

    
663
    kw['dry_run'] = settings.TEST
664

    
665
    kw['beparams'] = {
666
        'auto_balance': True,
667
        'vcpus': flavor.cpu,
668
        'memory': flavor.ram}
669

    
670
    kw['osparams'] = {
671
        'config_url': vm.config_url,
672
        # Store image id and format to Ganeti
673
        'img_id': image['backend_id'],
674
        'img_format': image['format']}
675

    
676
    # Use opportunistic locking
677
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
678

    
679
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
680
    # kw['hvparams'] = dict(serial_console=False)
681

    
682
    log.debug("Creating instance %s", utils.hide_pass(kw))
683
    with pooled_rapi_client(vm) as client:
684
        return client.CreateInstance(**kw)
685

    
686

    
687
def delete_instance(vm):
688
    with pooled_rapi_client(vm) as client:
689
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
690

    
691

    
692
def reboot_instance(vm, reboot_type):
693
    assert reboot_type in ('soft', 'hard')
694
    kwargs = {"instance": vm.backend_vm_id,
695
              "reboot_type": "hard"}
696
    # XXX: Currently shutdown_timeout parameter is not supported from the
697
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
698
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
699
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
700
    # of OS API is different from the one in Ganeti, and maps to
701
    # 'shutdown_timeout'.
702
    #if reboot_type == "hard":
703
    #    kwargs["shutdown_timeout"] = 0
704
    if settings.TEST:
705
        kwargs["dry_run"] = True
706
    with pooled_rapi_client(vm) as client:
707
        return client.RebootInstance(**kwargs)
708

    
709

    
710
def startup_instance(vm):
711
    with pooled_rapi_client(vm) as client:
712
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
713

    
714

    
715
def shutdown_instance(vm):
716
    with pooled_rapi_client(vm) as client:
717
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
718

    
719

    
720
def resize_instance(vm, vcpus, memory):
721
    beparams = {"vcpus": int(vcpus),
722
                "minmem": int(memory),
723
                "maxmem": int(memory)}
724
    with pooled_rapi_client(vm) as client:
725
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
726

    
727

    
728
def get_instance_console(vm):
729
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
730
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
731
    # useless (see #783).
732
    #
733
    # Until this is fixed on the Ganeti side, construct a console info reply
734
    # directly.
735
    #
736
    # WARNING: This assumes that VNC runs on port network_port on
737
    #          the instance's primary node, and is probably
738
    #          hypervisor-specific.
739
    #
740
    log.debug("Getting console for vm %s", vm)
741

    
742
    console = {}
743
    console['kind'] = 'vnc'
744

    
745
    with pooled_rapi_client(vm) as client:
746
        i = client.GetInstance(vm.backend_vm_id)
747

    
748
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
749
        raise Exception("hv parameter serial_console cannot be true")
750
    console['host'] = i['pnode']
751
    console['port'] = i['network_port']
752

    
753
    return console
754

    
755

    
756
def get_instance_info(vm):
757
    with pooled_rapi_client(vm) as client:
758
        return client.GetInstance(vm.backend_vm_id)
759

    
760

    
761
def vm_exists_in_backend(vm):
762
    try:
763
        get_instance_info(vm)
764
        return True
765
    except rapi.GanetiApiError as e:
766
        if e.code == 404:
767
            return False
768
        raise e
769

    
770

    
771
def get_network_info(backend_network):
772
    with pooled_rapi_client(backend_network) as client:
773
        return client.GetNetwork(backend_network.network.backend_id)
774

    
775

    
776
def network_exists_in_backend(backend_network):
777
    try:
778
        get_network_info(backend_network)
779
        return True
780
    except rapi.GanetiApiError as e:
781
        if e.code == 404:
782
            return False
783

    
784

    
785
def job_is_still_running(vm):
786
    with pooled_rapi_client(vm) as c:
787
        try:
788
            job_info = c.GetJobStatus(vm.backendjobid)
789
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
790
        except rapi.GanetiApiError:
791
            return False
792

    
793

    
794
def ensure_network_is_active(backend, network_id):
795
    """Ensure that a network is active in the specified backend
796

797
    Check that a network exists and is active in the specified backend. If not
798
    (re-)create the network. Return the corresponding BackendNetwork object
799
    and the IDs of the Ganeti job to create the network.
800

801
    """
802
    network = Network.objects.select_for_update().get(id=network_id)
803
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
804
                                                         network=network)
805
    job_ids = []
806
    if bnet.operstate != "ACTIVE":
807
        job_ids = create_network(network, backend, connect=True)
808

    
809
    return bnet, job_ids
810

    
811

    
812
def create_network(network, backend, connect=True):
813
    """Create a network in a Ganeti backend"""
814
    log.debug("Creating network %s in backend %s", network, backend)
815

    
816
    job_id = _create_network(network, backend)
817

    
818
    if connect:
819
        job_ids = connect_network(network, backend, depends=[job_id])
820
        return job_ids
821
    else:
822
        return [job_id]
823

    
824

    
825
def _create_network(network, backend):
826
    """Create a network."""
827

    
828
    tags = network.backend_tag
829
    subnet = None
830
    subnet6 = None
831
    gateway = None
832
    gateway6 = None
833
    for _subnet in network.subnets.all():
834
        if _subnet.dhcp and not "nfdhcpd" in tags:
835
            tags.append("nfdhcpd")
836
        if _subnet.ipversion == 4:
837
            subnet = _subnet.cidr
838
            gateway = _subnet.gateway
839
        elif _subnet.ipversion == 6:
840
            subnet6 = _subnet.cidr
841
            gateway6 = _subnet.gateway
842

    
843
    if network.public:
844
        conflicts_check = True
845
        tags.append('public')
846
    else:
847
        conflicts_check = False
848
        tags.append('private')
849

    
850
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
851
    # not support IPv6 only networks. To bypass this limitation, we create the
852
    # network with a dummy network subnet, and make Cyclades connect instances
853
    # to such networks, with address=None.
854
    if subnet is None:
855
        subnet = "10.0.0.0/29"
856

    
857
    try:
858
        bn = BackendNetwork.objects.get(network=network, backend=backend)
859
        mac_prefix = bn.mac_prefix
860
    except BackendNetwork.DoesNotExist:
861
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
862
                        " does not exist" % (network.id, backend.id))
863

    
864
    with pooled_rapi_client(backend) as client:
865
        return client.CreateNetwork(network_name=network.backend_id,
866
                                    network=subnet,
867
                                    network6=subnet6,
868
                                    gateway=gateway,
869
                                    gateway6=gateway6,
870
                                    mac_prefix=mac_prefix,
871
                                    conflicts_check=conflicts_check,
872
                                    tags=tags)
873

    
874

    
875
def connect_network(network, backend, depends=[], group=None):
876
    """Connect a network to nodegroups."""
877
    log.debug("Connecting network %s to backend %s", network, backend)
878

    
879
    if network.public:
880
        conflicts_check = True
881
    else:
882
        conflicts_check = False
883

    
884
    depends = create_job_dependencies(depends)
885
    with pooled_rapi_client(backend) as client:
886
        groups = [group] if group is not None else client.GetGroups()
887
        job_ids = []
888
        for group in groups:
889
            job_id = client.ConnectNetwork(network.backend_id, group,
890
                                           network.mode, network.link,
891
                                           conflicts_check,
892
                                           depends=depends)
893
            job_ids.append(job_id)
894
    return job_ids
895

    
896

    
897
def delete_network(network, backend, disconnect=True):
898
    log.debug("Deleting network %s from backend %s", network, backend)
899

    
900
    depends = []
901
    if disconnect:
902
        depends = disconnect_network(network, backend)
903
    _delete_network(network, backend, depends=depends)
904

    
905

    
906
def _delete_network(network, backend, depends=[]):
907
    depends = create_job_dependencies(depends)
908
    with pooled_rapi_client(backend) as client:
909
        return client.DeleteNetwork(network.backend_id, depends)
910

    
911

    
912
def disconnect_network(network, backend, group=None):
913
    log.debug("Disconnecting network %s to backend %s", network, backend)
914

    
915
    with pooled_rapi_client(backend) as client:
916
        groups = [group] if group is not None else client.GetGroups()
917
        job_ids = []
918
        for group in groups:
919
            job_id = client.DisconnectNetwork(network.backend_id, group)
920
            job_ids.append(job_id)
921
    return job_ids
922

    
923

    
924
def connect_to_network(vm, nic):
925
    network = nic.network
926
    backend = vm.backend
927
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
928

    
929
    depends = create_job_dependencies(depend_jobs)
930

    
931
    nic = {'name': nic.backend_uuid,
932
           'network': network.backend_id,
933
           'ip': nic.ipv4_address}
934

    
935
    log.debug("Adding NIC %s to VM %s", nic, vm)
936

    
937
    kwargs = {
938
        "instance": vm.backend_vm_id,
939
        "nics": [("add", "-1", nic)],
940
        "depends": depends,
941
    }
942
    if vm.backend.use_hotplug():
943
        kwargs["hotplug"] = True
944
    if settings.TEST:
945
        kwargs["dry_run"] = True
946

    
947
    with pooled_rapi_client(vm) as client:
948
        return client.ModifyInstance(**kwargs)
949

    
950

    
951
def disconnect_from_network(vm, nic):
952
    log.debug("Removing NIC %s of VM %s", nic, vm)
953

    
954
    kwargs = {
955
        "instance": vm.backend_vm_id,
956
        "nics": [("remove", nic.backend_uuid, {})],
957
    }
958
    if vm.backend.use_hotplug():
959
        kwargs["hotplug"] = True
960
    if settings.TEST:
961
        kwargs["dry_run"] = True
962

    
963
    with pooled_rapi_client(vm) as client:
964
        jobID = client.ModifyInstance(**kwargs)
965
        firewall_profile = nic.firewall_profile
966
        if firewall_profile and firewall_profile != "DISABLED":
967
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
968
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
969
                                      dry_run=settings.TEST)
970

    
971
        return jobID
972

    
973

    
974
def set_firewall_profile(vm, profile, nic):
975
    uuid = nic.backend_uuid
976
    try:
977
        tag = _firewall_tags[profile] % uuid
978
    except KeyError:
979
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
980

    
981
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
982

    
983
    with pooled_rapi_client(vm) as client:
984
        # Delete previous firewall tags
985
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
986
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
987
                       if (t % uuid) in old_tags]
988
        if delete_tags:
989
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
990
                                      dry_run=settings.TEST)
991

    
992
        if profile != "DISABLED":
993
            client.AddInstanceTags(vm.backend_vm_id, [tag],
994
                                   dry_run=settings.TEST)
995

    
996
        # XXX NOP ModifyInstance call to force process_net_status to run
997
        # on the dispatcher
998
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
999
        client.ModifyInstance(vm.backend_vm_id,
1000
                              os_name=os_name)
1001
    return None
1002

    
1003

    
1004
def attach_volume(vm, volume, depends=[]):
1005
    log.debug("Attaching volume %s to vm %s", vm, volume)
1006

    
1007
    disk = {"size": volume.size,
1008
            "name": volume.backend_volume_uuid,
1009
            "volume_name": volume.backend_volume_uuid}
1010
    if volume.source_volume_id is not None:
1011
        disk["origin"] = volume.source_volume.backend_volume_uuid
1012
    elif volume.source_snapshot is not None:
1013
        disk["origin"] = volume.source_snapshot["checksum"]
1014
    elif volume.source_image is not None:
1015
        disk["origin"] = volume.source_image["checksum"]
1016

    
1017
    kwargs = {
1018
        "instance": vm.backend_vm_id,
1019
        "disks": [("add", "-1", disk)],
1020
        "depends": depends,
1021
    }
1022
    if vm.backend.use_hotplug():
1023
        kwargs["hotplug"] = True
1024
    if settings.TEST:
1025
        kwargs["dry_run"] = True
1026

    
1027
    with pooled_rapi_client(vm) as client:
1028
        return client.ModifyInstance(**kwargs)
1029

    
1030

    
1031
def detach_volume(vm, volume):
1032
    log.debug("Removing volume %s from vm %s", volume, vm)
1033
    kwargs = {
1034
        "instance": vm.backend_vm_id,
1035
        "disks": [("remove", volume.backend_volume_uuid, {})],
1036
    }
1037
    if vm.backend.use_hotplug():
1038
        kwargs["hotplug"] = True
1039
    if settings.TEST:
1040
        kwargs["dry_run"] = True
1041

    
1042
    with pooled_rapi_client(vm) as client:
1043
        return client.ModifyInstance(**kwargs)
1044

    
1045

    
1046
def snapshot_instance(vm, snapshot_name):
1047
    #volume = instance.volumes.all()[0]
1048
    with pooled_rapi_client(vm) as client:
1049
        return client.SnapshotInstance(instance=vm.backend_vm_id,
1050
                                       snapshot_name=snapshot_name)
1051

    
1052

    
1053
def get_instances(backend, bulk=True):
1054
    with pooled_rapi_client(backend) as c:
1055
        return c.GetInstances(bulk=bulk)
1056

    
1057

    
1058
def get_nodes(backend, bulk=True):
1059
    with pooled_rapi_client(backend) as c:
1060
        return c.GetNodes(bulk=bulk)
1061

    
1062

    
1063
def get_jobs(backend, bulk=True):
1064
    with pooled_rapi_client(backend) as c:
1065
        return c.GetJobs(bulk=bulk)
1066

    
1067

    
1068
def get_physical_resources(backend):
1069
    """ Get the physical resources of a backend.
1070

1071
    Get the resources of a backend as reported by the backend (not the db).
1072

1073
    """
1074
    nodes = get_nodes(backend, bulk=True)
1075
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1076
    res = {}
1077
    for a in attr:
1078
        res[a] = 0
1079
    for n in nodes:
1080
        # Filter out drained, offline and not vm_capable nodes since they will
1081
        # not take part in the vm allocation process
1082
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1083
        if can_host_vms and n['cnodes']:
1084
            for a in attr:
1085
                res[a] += int(n[a] or 0)
1086
    return res
1087

    
1088

    
1089
def update_backend_resources(backend, resources=None):
1090
    """ Update the state of the backend resources in db.
1091

1092
    """
1093

    
1094
    if not resources:
1095
        resources = get_physical_resources(backend)
1096

    
1097
    backend.mfree = resources['mfree']
1098
    backend.mtotal = resources['mtotal']
1099
    backend.dfree = resources['dfree']
1100
    backend.dtotal = resources['dtotal']
1101
    backend.pinst_cnt = resources['pinst_cnt']
1102
    backend.ctotal = resources['ctotal']
1103
    backend.updated = datetime.now()
1104
    backend.save()
1105

    
1106

    
1107
def get_memory_from_instances(backend):
1108
    """ Get the memory that is used from instances.
1109

1110
    Get the used memory of a backend. Note: This is different for
1111
    the real memory used, due to kvm's memory de-duplication.
1112

1113
    """
1114
    with pooled_rapi_client(backend) as client:
1115
        instances = client.GetInstances(bulk=True)
1116
    mem = 0
1117
    for i in instances:
1118
        mem += i['oper_ram']
1119
    return mem
1120

    
1121

    
1122
def get_available_disk_templates(backend):
1123
    """Get the list of available disk templates of a Ganeti backend.
1124

1125
    The list contains the disk templates that are enabled in the Ganeti backend
1126
    and also included in ipolicy-disk-templates.
1127

1128
    """
1129
    with pooled_rapi_client(backend) as c:
1130
        info = c.GetInfo()
1131
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1132
    try:
1133
        enabled_disk_templates = info["enabled_disk_templates"]
1134
        return [dp for dp in enabled_disk_templates
1135
                if dp in ipolicy_disk_templates]
1136
    except KeyError:
1137
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1138
        return ipolicy_disk_templates
1139

    
1140

    
1141
def update_backend_disk_templates(backend):
1142
    disk_templates = get_available_disk_templates(backend)
1143
    backend.disk_templates = disk_templates
1144
    backend.save()
1145

    
1146

    
1147
##
1148
## Synchronized operations for reconciliation
1149
##
1150

    
1151

    
1152
def create_network_synced(network, backend):
1153
    result = _create_network_synced(network, backend)
1154
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1155
        return result
1156
    result = connect_network_synced(network, backend)
1157
    return result
1158

    
1159

    
1160
def _create_network_synced(network, backend):
1161
    with pooled_rapi_client(backend) as client:
1162
        job = _create_network(network, backend)
1163
        result = wait_for_job(client, job)
1164
    return result
1165

    
1166

    
1167
def connect_network_synced(network, backend):
1168
    with pooled_rapi_client(backend) as client:
1169
        for group in client.GetGroups():
1170
            job = client.ConnectNetwork(network.backend_id, group,
1171
                                        network.mode, network.link)
1172
            result = wait_for_job(client, job)
1173
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1174
                return result
1175

    
1176
    return result
1177

    
1178

    
1179
def wait_for_job(client, jobid):
1180
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1181
    status = result['job_info'][0]
1182
    while status not in rapi.JOB_STATUS_FINALIZED:
1183
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1184
                                         [result], None)
1185
        status = result['job_info'][0]
1186

    
1187
    if status == rapi.JOB_STATUS_SUCCESS:
1188
        return (status, None)
1189
    else:
1190
        error = result['job_info'][1]
1191
        return (status, error)
1192

    
1193

    
1194
def create_job_dependencies(job_ids=[], job_states=None):
1195
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1196
    if job_states is None:
1197
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1198
    assert(type(job_states) == list)
1199
    return [[job_id, job_states] for job_id in job_ids]