Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 368d879e

History | View | Annotate | Download (41 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
61

    
62
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
63
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
64
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
65
UNKNOWN_NIC_PREFIX = "unknown-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in rapi.JOB_STATUS_FINALIZED:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88

    
89
    if vm.task_job_id == job_id and vm.serial is not None:
90
        # Commission for this change has already been issued. So just
91
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
92
        # if fails, must be accepted, as the user must manually remove the
93
        # failed server
94
        serial = vm.serial
95
        if job_status == rapi.JOB_STATUS_SUCCESS:
96
            quotas.accept_serial(serial)
97
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
98
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
99
                      serial)
100
            quotas.reject_serial(serial)
101
        vm.serial = None
102
    elif job_status == rapi.JOB_STATUS_SUCCESS:
103
        commission_info = quotas.get_commission_info(resource=vm,
104
                                                     action=action,
105
                                                     action_fields=job_fields)
106
        if commission_info is not None:
107
            # Commission for this change has not been issued, or the issued
108
            # commission was unaware of the current change. Reject all previous
109
            # commissions and create a new one in forced mode!
110
            log.debug("Expected job was %s. Processing job %s.",
111
                      vm.task_job_id, job_id)
112
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
113
                      % (vm, job_id))
114
            quotas.handle_resource_commission(vm, action,
115
                                              action_fields=job_fields,
116
                                              commission_name=reason,
117
                                              force=True,
118
                                              auto_accept=True)
119
            log.debug("Issued new commission: %s", vm.serial)
120

    
121
    return vm
122

    
123

    
124
@transaction.commit_on_success
125
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
126
                      job_fields=None):
127
    """Process a job progress notification from the backend
128

129
    Process an incoming message from the backend (currently Ganeti).
130
    Job notifications with a terminating status (sucess, error, or canceled),
131
    also update the operating state of the VM.
132

133
    """
134
    # See #1492, #1031, #1111 why this line has been removed
135
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
136
    if status not in [x[0] for x in BACKEND_STATUSES]:
137
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
138

    
139
    vm.backendjobid = jobid
140
    vm.backendjobstatus = status
141
    vm.backendopcode = opcode
142
    vm.backendlogmsg = logmsg
143

    
144
    if status not in rapi.JOB_STATUS_FINALIZED:
145
        vm.save()
146
        return
147

    
148
    if job_fields is None:
149
        job_fields = {}
150
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
151

    
152
    # Notifications of success change the operating state
153
    if status == rapi.JOB_STATUS_SUCCESS:
154
        if state_for_success is not None:
155
            vm.operstate = state_for_success
156
        beparams = job_fields.get("beparams", None)
157
        if beparams:
158
            # Change the flavor of the VM
159
            _process_resize(vm, beparams)
160
        # Update backendtime only for jobs that have been successfully
161
        # completed, since only these jobs update the state of the VM. Else a
162
        # "race condition" may occur when a successful job (e.g.
163
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
164
        # in reversed order.
165
        vm.backendtime = etime
166

    
167
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
168
        # Update the NICs of the VM
169
        _process_net_status(vm, etime, nics)
170

    
171
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
172
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
173
                                                     rapi.JOB_STATUS_ERROR):
174
        vm.operstate = 'ERROR'
175
        vm.backendtime = etime
176
        # Update state of associated NICs
177
        vm.nics.all().update(state="ERROR")
178
    elif opcode == 'OP_INSTANCE_REMOVE':
179
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
180
        # when no instance exists at the Ganeti backend.
181
        # See ticket #799 for all the details.
182
        if (status == rapi.JOB_STATUS_SUCCESS or
183
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
184
            # VM has been deleted
185
            for nic in vm.nics.all():
186
                # Release the IP
187
                remove_nic_ips(nic)
188
                # And delete the NIC.
189
                nic.delete()
190
            vm.deleted = True
191
            vm.operstate = state_for_success
192
            vm.backendtime = etime
193
            status = rapi.JOB_STATUS_SUCCESS
194

    
195
    if status in rapi.JOB_STATUS_FINALIZED:
196
        # Job is finalized: Handle quotas/commissioning
197
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
198
                              job_status=status, job_fields=job_fields)
199
        # and clear task fields
200
        if vm.task_job_id == jobid:
201
            vm.task = None
202
            vm.task_job_id = None
203

    
204
    vm.save()
205

    
206

    
207
def _process_resize(vm, beparams):
208
    """Change flavor of a VirtualMachine based on new beparams."""
209
    old_flavor = vm.flavor
210
    vcpus = beparams.get("vcpus", old_flavor.cpu)
211
    ram = beparams.get("maxmem", old_flavor.ram)
212
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
213
        return
214
    try:
215
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
216
                                        disk=old_flavor.disk,
217
                                        disk_template=old_flavor.disk_template)
218
    except Flavor.DoesNotExist:
219
        raise Exception("Cannot find flavor for VM")
220
    vm.flavor = new_flavor
221
    vm.save()
222

    
223

    
224
@transaction.commit_on_success
225
def process_net_status(vm, etime, nics):
226
    """Wrap _process_net_status inside transaction."""
227
    _process_net_status(vm, etime, nics)
228

    
229

    
230
def _process_net_status(vm, etime, nics):
231
    """Process a net status notification from the backend
232

233
    Process an incoming message from the Ganeti backend,
234
    detailing the NIC configuration of a VM instance.
235

236
    Update the state of the VM in the DB accordingly.
237

238
    """
239
    ganeti_nics = process_ganeti_nics(nics)
240
    db_nics = dict([(nic.id, nic)
241
                    for nic in vm.nics.prefetch_related("ips__subnet")])
242

    
243
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
244
    # guarantee that no deadlock will occur with Backend allocator.
245
    Backend.objects.select_for_update().get(id=vm.backend_id)
246

    
247
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
248
        db_nic = db_nics.get(nic_name)
249
        ganeti_nic = ganeti_nics.get(nic_name)
250
        if ganeti_nic is None:
251
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
252
            # state for more than 5 minutes, then we remove the NIC.
253
            # TODO: This is dangerous as the job may be stack in the queue, and
254
            # releasing the IP may lead to duplicate IP use.
255
            if db_nic.state != "BUILD" or\
256
                (db_nic.state == "BUILD" and
257
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
258
                remove_nic_ips(db_nic)
259
                db_nic.delete()
260
            else:
261
                log.warning("Ignoring recent building NIC: %s", db_nic)
262
        elif db_nic is None:
263
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
264
                   " fix this issue!" % (nic_name, vm))
265
            log.error(msg)
266
            continue
267
        elif not nics_are_equal(db_nic, ganeti_nic):
268
            for f in SIMPLE_NIC_FIELDS:
269
                # Update the NIC in DB with the values from Ganeti NIC
270
                setattr(db_nic, f, ganeti_nic[f])
271
                db_nic.save()
272

    
273
            # Special case where the IPv4 address has changed, because you
274
            # need to release the old IPv4 address and reserve the new one
275
            ipv4_address = ganeti_nic["ipv4_address"]
276
            if db_nic.ipv4_address != ipv4_address:
277
                change_address_of_port(db_nic, vm.userid,
278
                                       old_address=db_nic.ipv4_address,
279
                                       new_address=ipv4_address,
280
                                       version=4)
281

    
282
            ipv6_address = ganeti_nic["ipv6_address"]
283
            if db_nic.ipv6_address != ipv6_address:
284
                change_address_of_port(db_nic, vm.userid,
285
                                       old_address=db_nic.ipv6_address,
286
                                       new_address=ipv6_address,
287
                                       version=6)
288

    
289
    vm.backendtime = etime
290
    vm.save()
291

    
292

    
293
def change_address_of_port(port, userid, old_address, new_address, version):
294
    """Change."""
295
    if old_address is not None:
296
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
297
               % (version, port.machine_id, old_address, new_address))
298
        log.critical(msg)
299

    
300
    # Remove the old IP address
301
    remove_nic_ips(port, version=version)
302

    
303
    if version == 4:
304
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
305
        ipaddress.nic = port
306
        ipaddress.save()
307
    elif version == 6:
308
        subnet6 = port.network.subnet6
309
        ipaddress = IPAddress.objects.create(userid=userid,
310
                                             network=port.network,
311
                                             subnet=subnet6,
312
                                             nic=port,
313
                                             address=new_address)
314
    else:
315
        raise ValueError("Unknown version: %s" % version)
316

    
317
    # New address log
318
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
319
                                         network_id=port.network_id,
320
                                         address=new_address,
321
                                         active=True)
322
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
323
             ip_log.id, new_address, port.machine_id)
324

    
325
    return ipaddress
326

    
327

    
328
def nics_are_equal(db_nic, gnt_nic):
329
    for field in NIC_FIELDS:
330
        if getattr(db_nic, field) != gnt_nic[field]:
331
            return False
332
    return True
333

    
334

    
335
def process_ganeti_nics(ganeti_nics):
336
    """Process NIC dict from ganeti"""
337
    new_nics = []
338
    for index, gnic in enumerate(ganeti_nics):
339
        nic_name = gnic.get("name", None)
340
        if nic_name is not None:
341
            nic_id = utils.id_from_nic_name(nic_name)
342
        else:
343
            # Put as default value the index. If it is an unknown NIC to
344
            # synnefo it will be created automaticaly.
345
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
346
        network_name = gnic.get('network', '')
347
        network_id = utils.id_from_network_name(network_name)
348
        network = Network.objects.get(id=network_id)
349

    
350
        # Get the new nic info
351
        mac = gnic.get('mac')
352
        ipv4 = gnic.get('ip')
353
        subnet6 = network.subnet6
354
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
355

    
356
        firewall = gnic.get('firewall')
357
        firewall_profile = _reverse_tags.get(firewall)
358
        if not firewall_profile and network.public:
359
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
360

    
361
        nic_info = {
362
            'index': index,
363
            'network': network,
364
            'mac': mac,
365
            'ipv4_address': ipv4,
366
            'ipv6_address': ipv6,
367
            'firewall_profile': firewall_profile,
368
            'state': 'ACTIVE'}
369

    
370
        new_nics.append((nic_id, nic_info))
371
    return dict(new_nics)
372

    
373

    
374
def remove_nic_ips(nic, version=None):
375
    """Remove IP addresses associated with a NetworkInterface.
376

377
    Remove all IP addresses that are associated with the NetworkInterface
378
    object, by returning them to the pool and deleting the IPAddress object. If
379
    the IP is a floating IP, then it is just disassociated from the NIC.
380
    If version is specified, then only IP addressses of that version will be
381
    removed.
382

383
    """
384
    for ip in nic.ips.all():
385
        if version and ip.ipversion != version:
386
            continue
387

    
388
        # Update the DB table holding the logging of all IP addresses
389
        terminate_active_ipaddress_log(nic, ip)
390

    
391
        if ip.floating_ip:
392
            ip.nic = None
393
            ip.save()
394
        else:
395
            # Release the IPv4 address
396
            ip.release_address()
397
            ip.delete()
398

    
399

    
400
def terminate_active_ipaddress_log(nic, ip):
401
    """Update DB logging entry for this IP address."""
402
    if not ip.network.public or nic.machine is None:
403
        return
404
    try:
405
        ip_log, created = \
406
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
407
                                               network_id=ip.network_id,
408
                                               address=ip.address,
409
                                               active=True)
410
    except IPAddressLog.MultipleObjectsReturned:
411
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
412
                  "Server %s. Cannot proceed!"
413
                  % (ip.address, ip.network, nic.machine))
414
        log.error(logmsg)
415
        raise
416

    
417
    if created:
418
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
419
                  " but with wrong creation timestamp."
420
                  % (ip.address, ip.network, nic.machine))
421
        log.error(logmsg)
422
    ip_log.released_at = datetime.now()
423
    ip_log.active = False
424
    ip_log.save()
425

    
426

    
427
@transaction.commit_on_success
428
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
429
    if status not in [x[0] for x in BACKEND_STATUSES]:
430
        raise Network.InvalidBackendMsgError(opcode, status)
431

    
432
    back_network.backendjobid = jobid
433
    back_network.backendjobstatus = status
434
    back_network.backendopcode = opcode
435
    back_network.backendlogmsg = logmsg
436

    
437
    # Note: Network is already locked!
438
    network = back_network.network
439

    
440
    # Notifications of success change the operating state
441
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
442
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
443
        back_network.operstate = state_for_success
444

    
445
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
446
       and opcode == 'OP_NETWORK_ADD'):
447
        back_network.operstate = 'ERROR'
448
        back_network.backendtime = etime
449

    
450
    if opcode == 'OP_NETWORK_REMOVE':
451
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
452
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
453
                                  network_exists_in_backend(back_network)):
454
            back_network.operstate = state_for_success
455
            back_network.deleted = True
456
            back_network.backendtime = etime
457

    
458
    if status == rapi.JOB_STATUS_SUCCESS:
459
        back_network.backendtime = etime
460
    back_network.save()
461
    # Also you must update the state of the Network!!
462
    update_network_state(network)
463

    
464

    
465
def update_network_state(network):
466
    """Update the state of a Network based on BackendNetwork states.
467

468
    Update the state of a Network based on the operstate of the networks in the
469
    backends that network exists.
470

471
    The state of the network is:
472
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
473
    * DELETED: If it is is 'DELETED' in all backends that have been created.
474

475
    This function also releases the resources (MAC prefix or Bridge) and the
476
    quotas for the network.
477

478
    """
479
    if network.deleted:
480
        # Network has already been deleted. Just assert that state is also
481
        # DELETED
482
        if not network.state == "DELETED":
483
            network.state = "DELETED"
484
            network.save()
485
        return
486

    
487
    backend_states = [s.operstate for s in network.backend_networks.all()]
488
    if not backend_states and network.action != "DESTROY":
489
        if network.state != "ACTIVE":
490
            network.state = "ACTIVE"
491
            network.save()
492
            return
493

    
494
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
495
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
496
                     "DELETED")
497

    
498
    # Release the resources on the deletion of the Network
499
    if deleted:
500
        if network.ips.filter(deleted=False, floating_ip=True).exists():
501
            msg = "Cannot delete network %s! Floating IPs still in use!"
502
            log.error(msg % network)
503
            raise Exception(msg % network)
504
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
505
                 network.id, network.mac_prefix, network.link)
506
        network.deleted = True
507
        network.state = "DELETED"
508
        if network.mac_prefix:
509
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
510
                release_resource(res_type="mac_prefix",
511
                                 value=network.mac_prefix)
512
        if network.link:
513
            if network.FLAVORS[network.flavor]["link"] == "pool":
514
                release_resource(res_type="bridge", value=network.link)
515

    
516
        # Set all subnets as deleted
517
        network.subnets.update(deleted=True)
518
        # And delete the IP pools
519
        for subnet in network.subnets.all():
520
            if subnet.ipversion == 4:
521
                subnet.ip_pools.all().delete()
522
        # And all the backend networks since there are useless
523
        network.backend_networks.all().delete()
524

    
525
        # Issue commission
526
        if network.userid:
527
            quotas.issue_and_accept_commission(network, action="DESTROY")
528
            # the above has already saved the object and committed;
529
            # a second save would override others' changes, since the
530
            # object is now unlocked
531
            return
532
        elif not network.public:
533
            log.warning("Network %s does not have an owner!", network.id)
534
    network.save()
535

    
536

    
537
@transaction.commit_on_success
538
def process_network_modify(back_network, etime, jobid, opcode, status,
539
                           job_fields):
540
    assert (opcode == "OP_NETWORK_SET_PARAMS")
541
    if status not in [x[0] for x in BACKEND_STATUSES]:
542
        raise Network.InvalidBackendMsgError(opcode, status)
543

    
544
    back_network.backendjobid = jobid
545
    back_network.backendjobstatus = status
546
    back_network.opcode = opcode
547

    
548
    add_reserved_ips = job_fields.get("add_reserved_ips")
549
    if add_reserved_ips:
550
        network = back_network.network
551
        for ip in add_reserved_ips:
552
            network.reserve_address(ip, external=True)
553

    
554
    if status == rapi.JOB_STATUS_SUCCESS:
555
        back_network.backendtime = etime
556
    back_network.save()
557

    
558

    
559
@transaction.commit_on_success
560
def process_create_progress(vm, etime, progress):
561

    
562
    percentage = int(progress)
563

    
564
    # The percentage may exceed 100%, due to the way
565
    # snf-image:copy-progress tracks bytes read by image handling processes
566
    percentage = 100 if percentage > 100 else percentage
567
    if percentage < 0:
568
        raise ValueError("Percentage cannot be negative")
569

    
570
    # FIXME: log a warning here, see #1033
571
#   if last_update > percentage:
572
#       raise ValueError("Build percentage should increase monotonically " \
573
#                        "(old = %d, new = %d)" % (last_update, percentage))
574

    
575
    # This assumes that no message of type 'ganeti-create-progress' is going to
576
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
577
    # the instance is STARTED.  What if the two messages are processed by two
578
    # separate dispatcher threads, and the 'ganeti-op-status' message for
579
    # successful creation gets processed before the 'ganeti-create-progress'
580
    # message? [vkoukis]
581
    #
582
    #if not vm.operstate == 'BUILD':
583
    #    raise VirtualMachine.IllegalState("VM is not in building state")
584

    
585
    vm.buildpercentage = percentage
586
    vm.backendtime = etime
587
    vm.save()
588

    
589

    
590
@transaction.commit_on_success
591
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
592
                               details=None):
593
    """
594
    Create virtual machine instance diagnostic entry.
595

596
    :param vm: VirtualMachine instance to create diagnostic for.
597
    :param message: Diagnostic message.
598
    :param source: Diagnostic source identifier (e.g. image-helper).
599
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
600
    :param etime: The time the message occured (if available).
601
    :param details: Additional details or debug information.
602
    """
603
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
604
                                                   source_date=etime,
605
                                                   message=message,
606
                                                   details=details)
607

    
608

    
609
def create_instance(vm, nics, flavor, image):
610
    """`image` is a dictionary which should contain the keys:
611
            'backend_id', 'format' and 'metadata'
612

613
        metadata value should be a dictionary.
614
    """
615

    
616
    # Handle arguments to CreateInstance() as a dictionary,
617
    # initialize it based on a deployment-specific value.
618
    # This enables the administrator to override deployment-specific
619
    # arguments, such as the disk template to use, name of os provider
620
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
621
    #
622
    kw = vm.backend.get_create_params()
623
    kw['mode'] = 'create'
624
    kw['name'] = vm.backend_vm_id
625
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
626

    
627
    kw['disk_template'] = flavor.disk_template
628
    kw['disks'] = [{"size": flavor.disk * 1024}]
629
    provider = flavor.disk_provider
630
    if provider:
631
        kw['disks'][0]['provider'] = provider
632
        kw['disks'][0]['origin'] = flavor.disk_origin
633

    
634
    kw['nics'] = [{"name": nic.backend_uuid,
635
                   "network": nic.network.backend_id,
636
                   "ip": nic.ipv4_address}
637
                  for nic in nics]
638

    
639
    backend = vm.backend
640
    depend_jobs = []
641
    for nic in nics:
642
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
643
        depend_jobs.extend(job_ids)
644

    
645
    kw["depends"] = create_job_dependencies(depend_jobs)
646

    
647
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
648
    # kw['os'] = settings.GANETI_OS_PROVIDER
649
    kw['ip_check'] = False
650
    kw['name_check'] = False
651

    
652
    # Do not specific a node explicitly, have
653
    # Ganeti use an iallocator instead
654
    #kw['pnode'] = rapi.GetNodes()[0]
655

    
656
    kw['dry_run'] = settings.TEST
657

    
658
    kw['beparams'] = {
659
        'auto_balance': True,
660
        'vcpus': flavor.cpu,
661
        'memory': flavor.ram}
662

    
663
    kw['osparams'] = {
664
        'config_url': vm.config_url,
665
        # Store image id and format to Ganeti
666
        'img_id': image['backend_id'],
667
        'img_format': image['format']}
668

    
669
    # Use opportunistic locking
670
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
671

    
672
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
673
    # kw['hvparams'] = dict(serial_console=False)
674

    
675
    log.debug("Creating instance %s", utils.hide_pass(kw))
676
    with pooled_rapi_client(vm) as client:
677
        return client.CreateInstance(**kw)
678

    
679

    
680
def delete_instance(vm):
681
    with pooled_rapi_client(vm) as client:
682
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
683

    
684

    
685
def reboot_instance(vm, reboot_type):
686
    assert reboot_type in ('soft', 'hard')
687
    kwargs = {"instance": vm.backend_vm_id,
688
              "reboot_type": "hard"}
689
    # XXX: Currently shutdown_timeout parameter is not supported from the
690
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
691
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
692
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
693
    # of OS API is different from the one in Ganeti, and maps to
694
    # 'shutdown_timeout'.
695
    #if reboot_type == "hard":
696
    #    kwargs["shutdown_timeout"] = 0
697
    if settings.TEST:
698
        kwargs["dry_run"] = True
699
    with pooled_rapi_client(vm) as client:
700
        return client.RebootInstance(**kwargs)
701

    
702

    
703
def startup_instance(vm):
704
    with pooled_rapi_client(vm) as client:
705
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
706

    
707

    
708
def shutdown_instance(vm):
709
    with pooled_rapi_client(vm) as client:
710
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
711

    
712

    
713
def resize_instance(vm, vcpus, memory):
714
    beparams = {"vcpus": int(vcpus),
715
                "minmem": int(memory),
716
                "maxmem": int(memory)}
717
    with pooled_rapi_client(vm) as client:
718
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
719

    
720

    
721
def get_instance_console(vm):
722
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
723
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
724
    # useless (see #783).
725
    #
726
    # Until this is fixed on the Ganeti side, construct a console info reply
727
    # directly.
728
    #
729
    # WARNING: This assumes that VNC runs on port network_port on
730
    #          the instance's primary node, and is probably
731
    #          hypervisor-specific.
732
    #
733
    log.debug("Getting console for vm %s", vm)
734

    
735
    console = {}
736
    console['kind'] = 'vnc'
737

    
738
    with pooled_rapi_client(vm) as client:
739
        i = client.GetInstance(vm.backend_vm_id)
740

    
741
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
742
        raise Exception("hv parameter serial_console cannot be true")
743
    console['host'] = i['pnode']
744
    console['port'] = i['network_port']
745

    
746
    return console
747

    
748

    
749
def get_instance_info(vm):
750
    with pooled_rapi_client(vm) as client:
751
        return client.GetInstance(vm.backend_vm_id)
752

    
753

    
754
def vm_exists_in_backend(vm):
755
    try:
756
        get_instance_info(vm)
757
        return True
758
    except rapi.GanetiApiError as e:
759
        if e.code == 404:
760
            return False
761
        raise e
762

    
763

    
764
def get_network_info(backend_network):
765
    with pooled_rapi_client(backend_network) as client:
766
        return client.GetNetwork(backend_network.network.backend_id)
767

    
768

    
769
def network_exists_in_backend(backend_network):
770
    try:
771
        get_network_info(backend_network)
772
        return True
773
    except rapi.GanetiApiError as e:
774
        if e.code == 404:
775
            return False
776

    
777

    
778
def job_is_still_running(vm):
779
    with pooled_rapi_client(vm) as c:
780
        try:
781
            job_info = c.GetJobStatus(vm.backendjobid)
782
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
783
        except rapi.GanetiApiError:
784
            return False
785

    
786

    
787
def ensure_network_is_active(backend, network_id):
788
    """Ensure that a network is active in the specified backend
789

790
    Check that a network exists and is active in the specified backend. If not
791
    (re-)create the network. Return the corresponding BackendNetwork object
792
    and the IDs of the Ganeti job to create the network.
793

794
    """
795
    network = Network.objects.select_for_update().get(id=network_id)
796
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
797
                                                         network=network)
798
    job_ids = []
799
    if bnet.operstate != "ACTIVE":
800
        job_ids = create_network(network, backend, connect=True)
801

    
802
    return bnet, job_ids
803

    
804

    
805
def create_network(network, backend, connect=True):
806
    """Create a network in a Ganeti backend"""
807
    log.debug("Creating network %s in backend %s", network, backend)
808

    
809
    job_id = _create_network(network, backend)
810

    
811
    if connect:
812
        job_ids = connect_network(network, backend, depends=[job_id])
813
        return job_ids
814
    else:
815
        return [job_id]
816

    
817

    
818
def _create_network(network, backend):
819
    """Create a network."""
820

    
821
    tags = network.backend_tag
822
    subnet = None
823
    subnet6 = None
824
    gateway = None
825
    gateway6 = None
826
    for _subnet in network.subnets.all():
827
        if _subnet.dhcp and not "nfdhcpd" in tags:
828
            tags.append("nfdhcpd")
829
        if _subnet.ipversion == 4:
830
            subnet = _subnet.cidr
831
            gateway = _subnet.gateway
832
        elif _subnet.ipversion == 6:
833
            subnet6 = _subnet.cidr
834
            gateway6 = _subnet.gateway
835

    
836
    if network.public:
837
        conflicts_check = True
838
        tags.append('public')
839
    else:
840
        conflicts_check = False
841
        tags.append('private')
842

    
843
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
844
    # not support IPv6 only networks. To bypass this limitation, we create the
845
    # network with a dummy network subnet, and make Cyclades connect instances
846
    # to such networks, with address=None.
847
    if subnet is None:
848
        subnet = "10.0.0.0/29"
849

    
850
    try:
851
        bn = BackendNetwork.objects.get(network=network, backend=backend)
852
        mac_prefix = bn.mac_prefix
853
    except BackendNetwork.DoesNotExist:
854
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
855
                        " does not exist" % (network.id, backend.id))
856

    
857
    with pooled_rapi_client(backend) as client:
858
        return client.CreateNetwork(network_name=network.backend_id,
859
                                    network=subnet,
860
                                    network6=subnet6,
861
                                    gateway=gateway,
862
                                    gateway6=gateway6,
863
                                    mac_prefix=mac_prefix,
864
                                    conflicts_check=conflicts_check,
865
                                    tags=tags)
866

    
867

    
868
def connect_network(network, backend, depends=[], group=None):
869
    """Connect a network to nodegroups."""
870
    log.debug("Connecting network %s to backend %s", network, backend)
871

    
872
    if network.public:
873
        conflicts_check = True
874
    else:
875
        conflicts_check = False
876

    
877
    depends = create_job_dependencies(depends)
878
    with pooled_rapi_client(backend) as client:
879
        groups = [group] if group is not None else client.GetGroups()
880
        job_ids = []
881
        for group in groups:
882
            job_id = client.ConnectNetwork(network.backend_id, group,
883
                                           network.mode, network.link,
884
                                           conflicts_check,
885
                                           depends=depends)
886
            job_ids.append(job_id)
887
    return job_ids
888

    
889

    
890
def delete_network(network, backend, disconnect=True):
891
    log.debug("Deleting network %s from backend %s", network, backend)
892

    
893
    depends = []
894
    if disconnect:
895
        depends = disconnect_network(network, backend)
896
    _delete_network(network, backend, depends=depends)
897

    
898

    
899
def _delete_network(network, backend, depends=[]):
900
    depends = create_job_dependencies(depends)
901
    with pooled_rapi_client(backend) as client:
902
        return client.DeleteNetwork(network.backend_id, depends)
903

    
904

    
905
def disconnect_network(network, backend, group=None):
906
    log.debug("Disconnecting network %s to backend %s", network, backend)
907

    
908
    with pooled_rapi_client(backend) as client:
909
        groups = [group] if group is not None else client.GetGroups()
910
        job_ids = []
911
        for group in groups:
912
            job_id = client.DisconnectNetwork(network.backend_id, group)
913
            job_ids.append(job_id)
914
    return job_ids
915

    
916

    
917
def connect_to_network(vm, nic):
918
    network = nic.network
919
    backend = vm.backend
920
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
921

    
922
    depends = create_job_dependencies(depend_jobs)
923

    
924
    nic = {'name': nic.backend_uuid,
925
           'network': network.backend_id,
926
           'ip': nic.ipv4_address}
927

    
928
    log.debug("Adding NIC %s to VM %s", nic, vm)
929

    
930
    kwargs = {
931
        "instance": vm.backend_vm_id,
932
        "nics": [("add", "-1", nic)],
933
        "depends": depends,
934
    }
935
    if vm.backend.use_hotplug():
936
        kwargs["hotplug"] = True
937
    if settings.TEST:
938
        kwargs["dry_run"] = True
939

    
940
    with pooled_rapi_client(vm) as client:
941
        return client.ModifyInstance(**kwargs)
942

    
943

    
944
def disconnect_from_network(vm, nic):
945
    log.debug("Removing NIC %s of VM %s", nic, vm)
946

    
947
    kwargs = {
948
        "instance": vm.backend_vm_id,
949
        "nics": [("remove", nic.backend_uuid, {})],
950
    }
951
    if vm.backend.use_hotplug():
952
        kwargs["hotplug"] = True
953
    if settings.TEST:
954
        kwargs["dry_run"] = True
955

    
956
    with pooled_rapi_client(vm) as client:
957
        jobID = client.ModifyInstance(**kwargs)
958
        firewall_profile = nic.firewall_profile
959
        if firewall_profile and firewall_profile != "DISABLED":
960
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
961
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
962
                                      dry_run=settings.TEST)
963

    
964
        return jobID
965

    
966

    
967
def set_firewall_profile(vm, profile, nic):
968
    uuid = nic.backend_uuid
969
    try:
970
        tag = _firewall_tags[profile] % uuid
971
    except KeyError:
972
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
973

    
974
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
975

    
976
    with pooled_rapi_client(vm) as client:
977
        # Delete previous firewall tags
978
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
979
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
980
                       if (t % uuid) in old_tags]
981
        if delete_tags:
982
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
983
                                      dry_run=settings.TEST)
984

    
985
        if profile != "DISABLED":
986
            client.AddInstanceTags(vm.backend_vm_id, [tag],
987
                                   dry_run=settings.TEST)
988

    
989
        # XXX NOP ModifyInstance call to force process_net_status to run
990
        # on the dispatcher
991
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
992
        client.ModifyInstance(vm.backend_vm_id,
993
                              os_name=os_name)
994
    return None
995

    
996

    
997
def get_instances(backend, bulk=True):
998
    with pooled_rapi_client(backend) as c:
999
        return c.GetInstances(bulk=bulk)
1000

    
1001

    
1002
def get_nodes(backend, bulk=True):
1003
    with pooled_rapi_client(backend) as c:
1004
        return c.GetNodes(bulk=bulk)
1005

    
1006

    
1007
def get_jobs(backend, bulk=True):
1008
    with pooled_rapi_client(backend) as c:
1009
        return c.GetJobs(bulk=bulk)
1010

    
1011

    
1012
def get_physical_resources(backend):
1013
    """ Get the physical resources of a backend.
1014

1015
    Get the resources of a backend as reported by the backend (not the db).
1016

1017
    """
1018
    nodes = get_nodes(backend, bulk=True)
1019
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1020
    res = {}
1021
    for a in attr:
1022
        res[a] = 0
1023
    for n in nodes:
1024
        # Filter out drained, offline and not vm_capable nodes since they will
1025
        # not take part in the vm allocation process
1026
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1027
        if can_host_vms and n['cnodes']:
1028
            for a in attr:
1029
                res[a] += int(n[a] or 0)
1030
    return res
1031

    
1032

    
1033
def update_backend_resources(backend, resources=None):
1034
    """ Update the state of the backend resources in db.
1035

1036
    """
1037

    
1038
    if not resources:
1039
        resources = get_physical_resources(backend)
1040

    
1041
    backend.mfree = resources['mfree']
1042
    backend.mtotal = resources['mtotal']
1043
    backend.dfree = resources['dfree']
1044
    backend.dtotal = resources['dtotal']
1045
    backend.pinst_cnt = resources['pinst_cnt']
1046
    backend.ctotal = resources['ctotal']
1047
    backend.updated = datetime.now()
1048
    backend.save()
1049

    
1050

    
1051
def get_memory_from_instances(backend):
1052
    """ Get the memory that is used from instances.
1053

1054
    Get the used memory of a backend. Note: This is different for
1055
    the real memory used, due to kvm's memory de-duplication.
1056

1057
    """
1058
    with pooled_rapi_client(backend) as client:
1059
        instances = client.GetInstances(bulk=True)
1060
    mem = 0
1061
    for i in instances:
1062
        mem += i['oper_ram']
1063
    return mem
1064

    
1065

    
1066
def get_available_disk_templates(backend):
1067
    """Get the list of available disk templates of a Ganeti backend.
1068

1069
    The list contains the disk templates that are enabled in the Ganeti backend
1070
    and also included in ipolicy-disk-templates.
1071

1072
    """
1073
    with pooled_rapi_client(backend) as c:
1074
        info = c.GetInfo()
1075
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1076
    try:
1077
        enabled_disk_templates = info["enabled_disk_templates"]
1078
        return [dp for dp in enabled_disk_templates
1079
                if dp in ipolicy_disk_templates]
1080
    except KeyError:
1081
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1082
        return ipolicy_disk_templates
1083

    
1084

    
1085
def update_backend_disk_templates(backend):
1086
    disk_templates = get_available_disk_templates(backend)
1087
    backend.disk_templates = disk_templates
1088
    backend.save()
1089

    
1090

    
1091
##
1092
## Synchronized operations for reconciliation
1093
##
1094

    
1095

    
1096
def create_network_synced(network, backend):
1097
    result = _create_network_synced(network, backend)
1098
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1099
        return result
1100
    result = connect_network_synced(network, backend)
1101
    return result
1102

    
1103

    
1104
def _create_network_synced(network, backend):
1105
    with pooled_rapi_client(backend) as client:
1106
        job = _create_network(network, backend)
1107
        result = wait_for_job(client, job)
1108
    return result
1109

    
1110

    
1111
def connect_network_synced(network, backend):
1112
    with pooled_rapi_client(backend) as client:
1113
        for group in client.GetGroups():
1114
            job = client.ConnectNetwork(network.backend_id, group,
1115
                                        network.mode, network.link)
1116
            result = wait_for_job(client, job)
1117
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1118
                return result
1119

    
1120
    return result
1121

    
1122

    
1123
def wait_for_job(client, jobid):
1124
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1125
    status = result['job_info'][0]
1126
    while status not in rapi.JOB_STATUS_FINALIZED:
1127
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1128
                                         [result], None)
1129
        status = result['job_info'][0]
1130

    
1131
    if status == rapi.JOB_STATUS_SUCCESS:
1132
        return (status, None)
1133
    else:
1134
        error = result['job_info'][1]
1135
        return (status, error)
1136

    
1137

    
1138
def create_job_dependencies(job_ids=[], job_states=None):
1139
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1140
    if job_states is None:
1141
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1142
    assert(type(job_states) == list)
1143
    return [[job_id, job_states] for job_id in job_ids]