Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 3e9398b1

History | View | Annotate | Download (42.4 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic import rapi
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
61

    
62
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
63
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
64
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
65
UNKNOWN_NIC_PREFIX = "unknown-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in rapi.JOB_STATUS_FINALIZED:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88

    
89
    if vm.task_job_id == job_id and vm.serial is not None:
90
        # Commission for this change has already been issued. So just
91
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
92
        # if fails, must be accepted, as the user must manually remove the
93
        # failed server
94
        serial = vm.serial
95
        if job_status == rapi.JOB_STATUS_SUCCESS:
96
            quotas.accept_serial(serial)
97
        elif job_status in [rapi.JOB_STATUS_ERROR, rapi.JOB_STATUS_CANCELED]:
98
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
99
                      serial)
100
            quotas.reject_serial(serial)
101
        vm.serial = None
102
    elif job_status == rapi.JOB_STATUS_SUCCESS:
103
        commission_info = quotas.get_commission_info(resource=vm,
104
                                                     action=action,
105
                                                     action_fields=job_fields)
106
        if commission_info is not None:
107
            # Commission for this change has not been issued, or the issued
108
            # commission was unaware of the current change. Reject all previous
109
            # commissions and create a new one in forced mode!
110
            log.debug("Expected job was %s. Processing job %s.",
111
                      vm.task_job_id, job_id)
112
            reason = ("client: dispatcher, resource: %s, ganeti_job: %s"
113
                      % (vm, job_id))
114
            quotas.handle_resource_commission(vm, action,
115
                                              action_fields=job_fields,
116
                                              commission_name=reason,
117
                                              force=True,
118
                                              auto_accept=True)
119
            log.debug("Issued new commission: %s", vm.serial)
120

    
121
    return vm
122

    
123

    
124
@transaction.commit_on_success
125
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
126
                      job_fields=None):
127
    """Process a job progress notification from the backend
128

129
    Process an incoming message from the backend (currently Ganeti).
130
    Job notifications with a terminating status (sucess, error, or canceled),
131
    also update the operating state of the VM.
132

133
    """
134
    # See #1492, #1031, #1111 why this line has been removed
135
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
136
    if status not in [x[0] for x in BACKEND_STATUSES]:
137
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
138

    
139
    vm.backendjobid = jobid
140
    vm.backendjobstatus = status
141
    vm.backendopcode = opcode
142
    vm.backendlogmsg = logmsg
143

    
144
    if status not in rapi.JOB_STATUS_FINALIZED:
145
        vm.save()
146
        return
147

    
148
    if job_fields is None:
149
        job_fields = {}
150
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
151

    
152
    # Notifications of success change the operating state
153
    if status == rapi.JOB_STATUS_SUCCESS:
154
        if state_for_success is not None:
155
            vm.operstate = state_for_success
156
        beparams = job_fields.get("beparams", None)
157
        if beparams:
158
            # Change the flavor of the VM
159
            _process_resize(vm, beparams)
160
        # Update backendtime only for jobs that have been successfully
161
        # completed, since only these jobs update the state of the VM. Else a
162
        # "race condition" may occur when a successful job (e.g.
163
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
164
        # in reversed order.
165
        vm.backendtime = etime
166

    
167
    if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
168
        # Update the NICs of the VM
169
        _process_net_status(vm, etime, nics)
170

    
171
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
172
    if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
173
                                                     rapi.JOB_STATUS_ERROR):
174
        vm.operstate = 'ERROR'
175
        vm.backendtime = etime
176
        # Update state of associated NICs
177
        vm.nics.all().update(state="ERROR")
178
    elif opcode == 'OP_INSTANCE_REMOVE':
179
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
180
        # when no instance exists at the Ganeti backend.
181
        # See ticket #799 for all the details.
182
        if (status == rapi.JOB_STATUS_SUCCESS or
183
           (status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
184
            # VM has been deleted
185
            for nic in vm.nics.all():
186
                # Release the IP
187
                remove_nic_ips(nic)
188
                # And delete the NIC.
189
                nic.delete()
190
            vm.deleted = True
191
            vm.operstate = state_for_success
192
            vm.backendtime = etime
193
            status = rapi.JOB_STATUS_SUCCESS
194

    
195
    if status in rapi.JOB_STATUS_FINALIZED:
196
        # Job is finalized: Handle quotas/commissioning
197
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
198
                              job_status=status, job_fields=job_fields)
199
        # and clear task fields
200
        if vm.task_job_id == jobid:
201
            vm.task = None
202
            vm.task_job_id = None
203

    
204
    vm.save()
205

    
206

    
207
def _process_resize(vm, beparams):
208
    """Change flavor of a VirtualMachine based on new beparams."""
209
    old_flavor = vm.flavor
210
    vcpus = beparams.get("vcpus", old_flavor.cpu)
211
    ram = beparams.get("maxmem", old_flavor.ram)
212
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
213
        return
214
    try:
215
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
216
                                        disk=old_flavor.disk,
217
                                        disk_template=old_flavor.disk_template)
218
    except Flavor.DoesNotExist:
219
        raise Exception("Cannot find flavor for VM")
220
    vm.flavor = new_flavor
221
    vm.save()
222

    
223

    
224
@transaction.commit_on_success
225
def process_net_status(vm, etime, nics):
226
    """Wrap _process_net_status inside transaction."""
227
    _process_net_status(vm, etime, nics)
228

    
229

    
230
def _process_net_status(vm, etime, nics):
231
    """Process a net status notification from the backend
232

233
    Process an incoming message from the Ganeti backend,
234
    detailing the NIC configuration of a VM instance.
235

236
    Update the state of the VM in the DB accordingly.
237

238
    """
239
    ganeti_nics = process_ganeti_nics(nics)
240
    db_nics = dict([(nic.id, nic)
241
                    for nic in vm.nics.prefetch_related("ips__subnet")])
242

    
243
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
244
    # guarantee that no deadlock will occur with Backend allocator.
245
    Backend.objects.select_for_update().get(id=vm.backend_id)
246

    
247
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
248
        db_nic = db_nics.get(nic_name)
249
        ganeti_nic = ganeti_nics.get(nic_name)
250
        if ganeti_nic is None:
251
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
252
            # state for more than 5 minutes, then we remove the NIC.
253
            # TODO: This is dangerous as the job may be stack in the queue, and
254
            # releasing the IP may lead to duplicate IP use.
255
            if db_nic.state != "BUILD" or\
256
                (db_nic.state == "BUILD" and
257
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
258
                remove_nic_ips(db_nic)
259
                db_nic.delete()
260
            else:
261
                log.warning("Ignoring recent building NIC: %s", db_nic)
262
        elif db_nic is None:
263
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
264
                   " fix this issue!" % (nic_name, vm))
265
            log.error(msg)
266
            continue
267
        elif not nics_are_equal(db_nic, ganeti_nic):
268
            for f in SIMPLE_NIC_FIELDS:
269
                # Update the NIC in DB with the values from Ganeti NIC
270
                setattr(db_nic, f, ganeti_nic[f])
271
                db_nic.save()
272

    
273
            # Special case where the IPv4 address has changed, because you
274
            # need to release the old IPv4 address and reserve the new one
275
            ipv4_address = ganeti_nic["ipv4_address"]
276
            if db_nic.ipv4_address != ipv4_address:
277
                change_address_of_port(db_nic, vm.userid,
278
                                       old_address=db_nic.ipv4_address,
279
                                       new_address=ipv4_address,
280
                                       version=4)
281

    
282
            ipv6_address = ganeti_nic["ipv6_address"]
283
            if db_nic.ipv6_address != ipv6_address:
284
                change_address_of_port(db_nic, vm.userid,
285
                                       old_address=db_nic.ipv6_address,
286
                                       new_address=ipv6_address,
287
                                       version=6)
288

    
289
    vm.backendtime = etime
290
    vm.save()
291

    
292

    
293
def change_address_of_port(port, userid, old_address, new_address, version):
294
    """Change."""
295
    if old_address is not None:
296
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
297
               % (version, port.machine_id, old_address, new_address))
298
        log.critical(msg)
299

    
300
    # Remove the old IP address
301
    remove_nic_ips(port, version=version)
302

    
303
    if version == 4:
304
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
305
        ipaddress.nic = port
306
        ipaddress.save()
307
    elif version == 6:
308
        subnet6 = port.network.subnet6
309
        ipaddress = IPAddress.objects.create(userid=userid,
310
                                             network=port.network,
311
                                             subnet=subnet6,
312
                                             nic=port,
313
                                             address=new_address)
314
    else:
315
        raise ValueError("Unknown version: %s" % version)
316

    
317
    # New address log
318
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
319
                                         network_id=port.network_id,
320
                                         address=new_address,
321
                                         active=True)
322
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
323
             ip_log.id, new_address, port.machine_id)
324

    
325
    return ipaddress
326

    
327

    
328
def nics_are_equal(db_nic, gnt_nic):
329
    for field in NIC_FIELDS:
330
        if getattr(db_nic, field) != gnt_nic[field]:
331
            return False
332
    return True
333

    
334

    
335
def process_ganeti_nics(ganeti_nics):
336
    """Process NIC dict from ganeti"""
337
    new_nics = []
338
    for index, gnic in enumerate(ganeti_nics):
339
        nic_name = gnic.get("name", None)
340
        if nic_name is not None:
341
            nic_id = utils.id_from_nic_name(nic_name)
342
        else:
343
            # Put as default value the index. If it is an unknown NIC to
344
            # synnefo it will be created automaticaly.
345
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
346
        network_name = gnic.get('network', '')
347
        network_id = utils.id_from_network_name(network_name)
348
        network = Network.objects.get(id=network_id)
349

    
350
        # Get the new nic info
351
        mac = gnic.get('mac')
352
        ipv4 = gnic.get('ip')
353
        subnet6 = network.subnet6
354
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
355

    
356
        firewall = gnic.get('firewall')
357
        firewall_profile = _reverse_tags.get(firewall)
358
        if not firewall_profile and network.public:
359
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
360

    
361
        nic_info = {
362
            'index': index,
363
            'network': network,
364
            'mac': mac,
365
            'ipv4_address': ipv4,
366
            'ipv6_address': ipv6,
367
            'firewall_profile': firewall_profile,
368
            'state': 'ACTIVE'}
369

    
370
        new_nics.append((nic_id, nic_info))
371
    return dict(new_nics)
372

    
373

    
374
def remove_nic_ips(nic, version=None):
375
    """Remove IP addresses associated with a NetworkInterface.
376

377
    Remove all IP addresses that are associated with the NetworkInterface
378
    object, by returning them to the pool and deleting the IPAddress object. If
379
    the IP is a floating IP, then it is just disassociated from the NIC.
380
    If version is specified, then only IP addressses of that version will be
381
    removed.
382

383
    """
384
    for ip in nic.ips.all():
385
        if version and ip.ipversion != version:
386
            continue
387

    
388
        # Update the DB table holding the logging of all IP addresses
389
        terminate_active_ipaddress_log(nic, ip)
390

    
391
        if ip.floating_ip:
392
            ip.nic = None
393
            ip.save()
394
        else:
395
            # Release the IPv4 address
396
            ip.release_address()
397
            ip.delete()
398

    
399

    
400
def terminate_active_ipaddress_log(nic, ip):
401
    """Update DB logging entry for this IP address."""
402
    if not ip.network.public or nic.machine is None:
403
        return
404
    try:
405
        ip_log, created = \
406
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
407
                                               network_id=ip.network_id,
408
                                               address=ip.address,
409
                                               active=True)
410
    except IPAddressLog.MultipleObjectsReturned:
411
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
412
                  "Server %s. Cannot proceed!"
413
                  % (ip.address, ip.network, nic.machine))
414
        log.error(logmsg)
415
        raise
416

    
417
    if created:
418
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
419
                  " but with wrong creation timestamp."
420
                  % (ip.address, ip.network, nic.machine))
421
        log.error(logmsg)
422
    ip_log.released_at = datetime.now()
423
    ip_log.active = False
424
    ip_log.save()
425

    
426

    
427
@transaction.commit_on_success
428
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
429
    if status not in [x[0] for x in BACKEND_STATUSES]:
430
        raise Network.InvalidBackendMsgError(opcode, status)
431

    
432
    back_network.backendjobid = jobid
433
    back_network.backendjobstatus = status
434
    back_network.backendopcode = opcode
435
    back_network.backendlogmsg = logmsg
436

    
437
    # Note: Network is already locked!
438
    network = back_network.network
439

    
440
    # Notifications of success change the operating state
441
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
442
    if status == rapi.JOB_STATUS_SUCCESS and state_for_success is not None:
443
        back_network.operstate = state_for_success
444

    
445
    if (status in (rapi.JOB_STATUS_CANCELED, rapi.JOB_STATUS_ERROR)
446
       and opcode == 'OP_NETWORK_ADD'):
447
        back_network.operstate = 'ERROR'
448
        back_network.backendtime = etime
449

    
450
    if opcode == 'OP_NETWORK_REMOVE':
451
        network_is_deleted = (status == rapi.JOB_STATUS_SUCCESS)
452
        if network_is_deleted or (status == rapi.JOB_STATUS_ERROR and not
453
                                  network_exists_in_backend(back_network)):
454
            back_network.operstate = state_for_success
455
            back_network.deleted = True
456
            back_network.backendtime = etime
457

    
458
    if status == rapi.JOB_STATUS_SUCCESS:
459
        back_network.backendtime = etime
460
    back_network.save()
461
    # Also you must update the state of the Network!!
462
    update_network_state(network)
463

    
464

    
465
def update_network_state(network):
466
    """Update the state of a Network based on BackendNetwork states.
467

468
    Update the state of a Network based on the operstate of the networks in the
469
    backends that network exists.
470

471
    The state of the network is:
472
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
473
    * DELETED: If it is is 'DELETED' in all backends that have been created.
474

475
    This function also releases the resources (MAC prefix or Bridge) and the
476
    quotas for the network.
477

478
    """
479
    if network.deleted:
480
        # Network has already been deleted. Just assert that state is also
481
        # DELETED
482
        if not network.state == "DELETED":
483
            network.state = "DELETED"
484
            network.save()
485
        return
486

    
487
    backend_states = [s.operstate for s in network.backend_networks.all()]
488
    if not backend_states and network.action != "DESTROY":
489
        if network.state != "ACTIVE":
490
            network.state = "ACTIVE"
491
            network.save()
492
            return
493

    
494
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
495
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
496
                     "DELETED")
497

    
498
    # Release the resources on the deletion of the Network
499
    if deleted:
500
        if network.ips.filter(deleted=False, floating_ip=True).exists():
501
            msg = "Cannot delete network %s! Floating IPs still in use!"
502
            log.error(msg % network)
503
            raise Exception(msg % network)
504
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
505
                 network.id, network.mac_prefix, network.link)
506
        network.deleted = True
507
        network.state = "DELETED"
508
        if network.mac_prefix:
509
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
510
                release_resource(res_type="mac_prefix",
511
                                 value=network.mac_prefix)
512
        if network.link:
513
            if network.FLAVORS[network.flavor]["link"] == "pool":
514
                release_resource(res_type="bridge", value=network.link)
515

    
516
        # Set all subnets as deleted
517
        network.subnets.update(deleted=True)
518
        # And delete the IP pools
519
        for subnet in network.subnets.all():
520
            if subnet.ipversion == 4:
521
                subnet.ip_pools.all().delete()
522
        # And all the backend networks since there are useless
523
        network.backend_networks.all().delete()
524

    
525
        # Issue commission
526
        if network.userid:
527
            quotas.issue_and_accept_commission(network, action="DESTROY")
528
            # the above has already saved the object and committed;
529
            # a second save would override others' changes, since the
530
            # object is now unlocked
531
            return
532
        elif not network.public:
533
            log.warning("Network %s does not have an owner!", network.id)
534
    network.save()
535

    
536

    
537
@transaction.commit_on_success
538
def process_network_modify(back_network, etime, jobid, opcode, status,
539
                           job_fields):
540
    assert (opcode == "OP_NETWORK_SET_PARAMS")
541
    if status not in [x[0] for x in BACKEND_STATUSES]:
542
        raise Network.InvalidBackendMsgError(opcode, status)
543

    
544
    back_network.backendjobid = jobid
545
    back_network.backendjobstatus = status
546
    back_network.opcode = opcode
547

    
548
    add_reserved_ips = job_fields.get("add_reserved_ips")
549
    if add_reserved_ips:
550
        network = back_network.network
551
        for ip in add_reserved_ips:
552
            network.reserve_address(ip, external=True)
553

    
554
    if status == rapi.JOB_STATUS_SUCCESS:
555
        back_network.backendtime = etime
556
    back_network.save()
557

    
558

    
559
@transaction.commit_on_success
560
def process_create_progress(vm, etime, progress):
561

    
562
    percentage = int(progress)
563

    
564
    # The percentage may exceed 100%, due to the way
565
    # snf-image:copy-progress tracks bytes read by image handling processes
566
    percentage = 100 if percentage > 100 else percentage
567
    if percentage < 0:
568
        raise ValueError("Percentage cannot be negative")
569

    
570
    # FIXME: log a warning here, see #1033
571
#   if last_update > percentage:
572
#       raise ValueError("Build percentage should increase monotonically " \
573
#                        "(old = %d, new = %d)" % (last_update, percentage))
574

    
575
    # This assumes that no message of type 'ganeti-create-progress' is going to
576
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
577
    # the instance is STARTED.  What if the two messages are processed by two
578
    # separate dispatcher threads, and the 'ganeti-op-status' message for
579
    # successful creation gets processed before the 'ganeti-create-progress'
580
    # message? [vkoukis]
581
    #
582
    #if not vm.operstate == 'BUILD':
583
    #    raise VirtualMachine.IllegalState("VM is not in building state")
584

    
585
    vm.buildpercentage = percentage
586
    vm.backendtime = etime
587
    vm.save()
588

    
589

    
590
@transaction.commit_on_success
591
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
592
                               details=None):
593
    """
594
    Create virtual machine instance diagnostic entry.
595

596
    :param vm: VirtualMachine instance to create diagnostic for.
597
    :param message: Diagnostic message.
598
    :param source: Diagnostic source identifier (e.g. image-helper).
599
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
600
    :param etime: The time the message occured (if available).
601
    :param details: Additional details or debug information.
602
    """
603
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
604
                                                   source_date=etime,
605
                                                   message=message,
606
                                                   details=details)
607

    
608

    
609
def create_instance(vm, nics, volumes, flavor, image):
610
    """`image` is a dictionary which should contain the keys:
611
            'backend_id', 'format' and 'metadata'
612

613
        metadata value should be a dictionary.
614
    """
615

    
616
    # Handle arguments to CreateInstance() as a dictionary,
617
    # initialize it based on a deployment-specific value.
618
    # This enables the administrator to override deployment-specific
619
    # arguments, such as the disk template to use, name of os provider
620
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
621
    #
622
    kw = vm.backend.get_create_params()
623
    kw['mode'] = 'create'
624
    kw['name'] = vm.backend_vm_id
625
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
626

    
627
    kw['disk_template'] = flavor.disk_template
628
    disks = []
629
    for volume in volumes:
630
        disk = {}
631
        disk["size"] = volume.size * 1024,
632
        if flavor.disk_provider is not None:
633
            volume["provider"] = flavor.disk_provider
634
            volume["origin"] = volume.source_image["checksum"]
635
        disks.append(disk)
636

    
637
    kw["disks"] = disks
638

    
639
    kw['nics'] = [{"name": nic.backend_uuid,
640
                   "network": nic.network.backend_id,
641
                   "ip": nic.ipv4_address}
642
                  for nic in nics]
643

    
644
    backend = vm.backend
645
    depend_jobs = []
646
    for nic in nics:
647
        bnet, job_ids = ensure_network_is_active(backend, nic.network_id)
648
        depend_jobs.extend(job_ids)
649

    
650
    kw["depends"] = create_job_dependencies(depend_jobs)
651

    
652
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
653
    # kw['os'] = settings.GANETI_OS_PROVIDER
654
    kw['ip_check'] = False
655
    kw['name_check'] = False
656

    
657
    # Do not specific a node explicitly, have
658
    # Ganeti use an iallocator instead
659
    #kw['pnode'] = rapi.GetNodes()[0]
660

    
661
    kw['dry_run'] = settings.TEST
662

    
663
    kw['beparams'] = {
664
        'auto_balance': True,
665
        'vcpus': flavor.cpu,
666
        'memory': flavor.ram}
667

    
668
    kw['osparams'] = {
669
        'config_url': vm.config_url,
670
        # Store image id and format to Ganeti
671
        'img_id': image['backend_id'],
672
        'img_format': image['format']}
673

    
674
    # Use opportunistic locking
675
    kw['opportunistic_locking'] = settings.GANETI_USE_OPPORTUNISTIC_LOCKING
676

    
677
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
678
    # kw['hvparams'] = dict(serial_console=False)
679

    
680
    log.debug("Creating instance %s", utils.hide_pass(kw))
681
    with pooled_rapi_client(vm) as client:
682
        return client.CreateInstance(**kw)
683

    
684

    
685
def delete_instance(vm):
686
    with pooled_rapi_client(vm) as client:
687
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
688

    
689

    
690
def reboot_instance(vm, reboot_type):
691
    assert reboot_type in ('soft', 'hard')
692
    kwargs = {"instance": vm.backend_vm_id,
693
              "reboot_type": "hard"}
694
    # XXX: Currently shutdown_timeout parameter is not supported from the
695
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
696
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
697
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
698
    # of OS API is different from the one in Ganeti, and maps to
699
    # 'shutdown_timeout'.
700
    #if reboot_type == "hard":
701
    #    kwargs["shutdown_timeout"] = 0
702
    if settings.TEST:
703
        kwargs["dry_run"] = True
704
    with pooled_rapi_client(vm) as client:
705
        return client.RebootInstance(**kwargs)
706

    
707

    
708
def startup_instance(vm):
709
    with pooled_rapi_client(vm) as client:
710
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
711

    
712

    
713
def shutdown_instance(vm):
714
    with pooled_rapi_client(vm) as client:
715
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
716

    
717

    
718
def resize_instance(vm, vcpus, memory):
719
    beparams = {"vcpus": int(vcpus),
720
                "minmem": int(memory),
721
                "maxmem": int(memory)}
722
    with pooled_rapi_client(vm) as client:
723
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
724

    
725

    
726
def get_instance_console(vm):
727
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
728
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
729
    # useless (see #783).
730
    #
731
    # Until this is fixed on the Ganeti side, construct a console info reply
732
    # directly.
733
    #
734
    # WARNING: This assumes that VNC runs on port network_port on
735
    #          the instance's primary node, and is probably
736
    #          hypervisor-specific.
737
    #
738
    log.debug("Getting console for vm %s", vm)
739

    
740
    console = {}
741
    console['kind'] = 'vnc'
742

    
743
    with pooled_rapi_client(vm) as client:
744
        i = client.GetInstance(vm.backend_vm_id)
745

    
746
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
747
        raise Exception("hv parameter serial_console cannot be true")
748
    console['host'] = i['pnode']
749
    console['port'] = i['network_port']
750

    
751
    return console
752

    
753

    
754
def get_instance_info(vm):
755
    with pooled_rapi_client(vm) as client:
756
        return client.GetInstance(vm.backend_vm_id)
757

    
758

    
759
def vm_exists_in_backend(vm):
760
    try:
761
        get_instance_info(vm)
762
        return True
763
    except rapi.GanetiApiError as e:
764
        if e.code == 404:
765
            return False
766
        raise e
767

    
768

    
769
def get_network_info(backend_network):
770
    with pooled_rapi_client(backend_network) as client:
771
        return client.GetNetwork(backend_network.network.backend_id)
772

    
773

    
774
def network_exists_in_backend(backend_network):
775
    try:
776
        get_network_info(backend_network)
777
        return True
778
    except rapi.GanetiApiError as e:
779
        if e.code == 404:
780
            return False
781

    
782

    
783
def job_is_still_running(vm):
784
    with pooled_rapi_client(vm) as c:
785
        try:
786
            job_info = c.GetJobStatus(vm.backendjobid)
787
            return not (job_info["status"] in rapi.JOB_STATUS_FINALIZED)
788
        except rapi.GanetiApiError:
789
            return False
790

    
791

    
792
def ensure_network_is_active(backend, network_id):
793
    """Ensure that a network is active in the specified backend
794

795
    Check that a network exists and is active in the specified backend. If not
796
    (re-)create the network. Return the corresponding BackendNetwork object
797
    and the IDs of the Ganeti job to create the network.
798

799
    """
800
    network = Network.objects.select_for_update().get(id=network_id)
801
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
802
                                                         network=network)
803
    job_ids = []
804
    if bnet.operstate != "ACTIVE":
805
        job_ids = create_network(network, backend, connect=True)
806

    
807
    return bnet, job_ids
808

    
809

    
810
def create_network(network, backend, connect=True):
811
    """Create a network in a Ganeti backend"""
812
    log.debug("Creating network %s in backend %s", network, backend)
813

    
814
    job_id = _create_network(network, backend)
815

    
816
    if connect:
817
        job_ids = connect_network(network, backend, depends=[job_id])
818
        return job_ids
819
    else:
820
        return [job_id]
821

    
822

    
823
def _create_network(network, backend):
824
    """Create a network."""
825

    
826
    tags = network.backend_tag
827
    subnet = None
828
    subnet6 = None
829
    gateway = None
830
    gateway6 = None
831
    for _subnet in network.subnets.all():
832
        if _subnet.dhcp and not "nfdhcpd" in tags:
833
            tags.append("nfdhcpd")
834
        if _subnet.ipversion == 4:
835
            subnet = _subnet.cidr
836
            gateway = _subnet.gateway
837
        elif _subnet.ipversion == 6:
838
            subnet6 = _subnet.cidr
839
            gateway6 = _subnet.gateway
840

    
841
    if network.public:
842
        conflicts_check = True
843
        tags.append('public')
844
    else:
845
        conflicts_check = False
846
        tags.append('private')
847

    
848
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
849
    # not support IPv6 only networks. To bypass this limitation, we create the
850
    # network with a dummy network subnet, and make Cyclades connect instances
851
    # to such networks, with address=None.
852
    if subnet is None:
853
        subnet = "10.0.0.0/29"
854

    
855
    try:
856
        bn = BackendNetwork.objects.get(network=network, backend=backend)
857
        mac_prefix = bn.mac_prefix
858
    except BackendNetwork.DoesNotExist:
859
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
860
                        " does not exist" % (network.id, backend.id))
861

    
862
    with pooled_rapi_client(backend) as client:
863
        return client.CreateNetwork(network_name=network.backend_id,
864
                                    network=subnet,
865
                                    network6=subnet6,
866
                                    gateway=gateway,
867
                                    gateway6=gateway6,
868
                                    mac_prefix=mac_prefix,
869
                                    conflicts_check=conflicts_check,
870
                                    tags=tags)
871

    
872

    
873
def connect_network(network, backend, depends=[], group=None):
874
    """Connect a network to nodegroups."""
875
    log.debug("Connecting network %s to backend %s", network, backend)
876

    
877
    if network.public:
878
        conflicts_check = True
879
    else:
880
        conflicts_check = False
881

    
882
    depends = create_job_dependencies(depends)
883
    with pooled_rapi_client(backend) as client:
884
        groups = [group] if group is not None else client.GetGroups()
885
        job_ids = []
886
        for group in groups:
887
            job_id = client.ConnectNetwork(network.backend_id, group,
888
                                           network.mode, network.link,
889
                                           conflicts_check,
890
                                           depends=depends)
891
            job_ids.append(job_id)
892
    return job_ids
893

    
894

    
895
def delete_network(network, backend, disconnect=True):
896
    log.debug("Deleting network %s from backend %s", network, backend)
897

    
898
    depends = []
899
    if disconnect:
900
        depends = disconnect_network(network, backend)
901
    _delete_network(network, backend, depends=depends)
902

    
903

    
904
def _delete_network(network, backend, depends=[]):
905
    depends = create_job_dependencies(depends)
906
    with pooled_rapi_client(backend) as client:
907
        return client.DeleteNetwork(network.backend_id, depends)
908

    
909

    
910
def disconnect_network(network, backend, group=None):
911
    log.debug("Disconnecting network %s to backend %s", network, backend)
912

    
913
    with pooled_rapi_client(backend) as client:
914
        groups = [group] if group is not None else client.GetGroups()
915
        job_ids = []
916
        for group in groups:
917
            job_id = client.DisconnectNetwork(network.backend_id, group)
918
            job_ids.append(job_id)
919
    return job_ids
920

    
921

    
922
def connect_to_network(vm, nic):
923
    network = nic.network
924
    backend = vm.backend
925
    bnet, depend_jobs = ensure_network_is_active(backend, network.id)
926

    
927
    depends = create_job_dependencies(depend_jobs)
928

    
929
    nic = {'name': nic.backend_uuid,
930
           'network': network.backend_id,
931
           'ip': nic.ipv4_address}
932

    
933
    log.debug("Adding NIC %s to VM %s", nic, vm)
934

    
935
    kwargs = {
936
        "instance": vm.backend_vm_id,
937
        "nics": [("add", "-1", nic)],
938
        "depends": depends,
939
    }
940
    if vm.backend.use_hotplug():
941
        kwargs["hotplug"] = True
942
    if settings.TEST:
943
        kwargs["dry_run"] = True
944

    
945
    with pooled_rapi_client(vm) as client:
946
        return client.ModifyInstance(**kwargs)
947

    
948

    
949
def disconnect_from_network(vm, nic):
950
    log.debug("Removing NIC %s of VM %s", nic, vm)
951

    
952
    kwargs = {
953
        "instance": vm.backend_vm_id,
954
        "nics": [("remove", nic.backend_uuid, {})],
955
    }
956
    if vm.backend.use_hotplug():
957
        kwargs["hotplug"] = True
958
    if settings.TEST:
959
        kwargs["dry_run"] = True
960

    
961
    with pooled_rapi_client(vm) as client:
962
        jobID = client.ModifyInstance(**kwargs)
963
        firewall_profile = nic.firewall_profile
964
        if firewall_profile and firewall_profile != "DISABLED":
965
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
966
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
967
                                      dry_run=settings.TEST)
968

    
969
        return jobID
970

    
971

    
972
def set_firewall_profile(vm, profile, nic):
973
    uuid = nic.backend_uuid
974
    try:
975
        tag = _firewall_tags[profile] % uuid
976
    except KeyError:
977
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
978

    
979
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
980

    
981
    with pooled_rapi_client(vm) as client:
982
        # Delete previous firewall tags
983
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
984
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
985
                       if (t % uuid) in old_tags]
986
        if delete_tags:
987
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
988
                                      dry_run=settings.TEST)
989

    
990
        if profile != "DISABLED":
991
            client.AddInstanceTags(vm.backend_vm_id, [tag],
992
                                   dry_run=settings.TEST)
993

    
994
        # XXX NOP ModifyInstance call to force process_net_status to run
995
        # on the dispatcher
996
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
997
        client.ModifyInstance(vm.backend_vm_id,
998
                              os_name=os_name)
999
    return None
1000

    
1001

    
1002
def attach_volume(vm, volume, depends=[]):
1003
    log.debug("Attaching volume %s to vm %s", vm, volume)
1004

    
1005
    disk = {"size": volume.size,
1006
            "name": volume.backend_volume_uuid,
1007
            "volume_name": volume.backend_volume_uuid}
1008
    if volume.source_volume_id is not None:
1009
        disk["origin"] = volume.source_volume.backend_volume_uuid
1010
    elif volume.source_snapshot is not None:
1011
        disk["origin"] = volume.source_snapshot["checksum"]
1012
    elif volume.source_image is not None:
1013
        disk["origin"] = volume.source_image["checksum"]
1014

    
1015
    kwargs = {
1016
        "instance": vm.backend_vm_id,
1017
        "disks": [("add", "-1", disk)],
1018
        "depends": depends,
1019
    }
1020
    if vm.backend.use_hotplug():
1021
        kwargs["hotplug"] = True
1022
    if settings.TEST:
1023
        kwargs["dry_run"] = True
1024

    
1025
    with pooled_rapi_client(vm) as client:
1026
        return client.ModifyInstance(**kwargs)
1027

    
1028

    
1029
def detach_volume(vm, volume):
1030
    log.debug("Removing volume %s from vm %s", volume, vm)
1031
    kwargs = {
1032
        "instance": vm.backend_vm_id,
1033
        "disks": [("remove", volume.backend_volume_uuid, {})],
1034
    }
1035
    if vm.backend.use_hotplug():
1036
        kwargs["hotplug"] = True
1037
    if settings.TEST:
1038
        kwargs["dry_run"] = True
1039

    
1040
    with pooled_rapi_client(vm) as client:
1041
        return client.ModifyInstance(**kwargs)
1042

    
1043

    
1044
def get_instances(backend, bulk=True):
1045
    with pooled_rapi_client(backend) as c:
1046
        return c.GetInstances(bulk=bulk)
1047

    
1048

    
1049
def get_nodes(backend, bulk=True):
1050
    with pooled_rapi_client(backend) as c:
1051
        return c.GetNodes(bulk=bulk)
1052

    
1053

    
1054
def get_jobs(backend, bulk=True):
1055
    with pooled_rapi_client(backend) as c:
1056
        return c.GetJobs(bulk=bulk)
1057

    
1058

    
1059
def get_physical_resources(backend):
1060
    """ Get the physical resources of a backend.
1061

1062
    Get the resources of a backend as reported by the backend (not the db).
1063

1064
    """
1065
    nodes = get_nodes(backend, bulk=True)
1066
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
1067
    res = {}
1068
    for a in attr:
1069
        res[a] = 0
1070
    for n in nodes:
1071
        # Filter out drained, offline and not vm_capable nodes since they will
1072
        # not take part in the vm allocation process
1073
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1074
        if can_host_vms and n['cnodes']:
1075
            for a in attr:
1076
                res[a] += int(n[a] or 0)
1077
    return res
1078

    
1079

    
1080
def update_backend_resources(backend, resources=None):
1081
    """ Update the state of the backend resources in db.
1082

1083
    """
1084

    
1085
    if not resources:
1086
        resources = get_physical_resources(backend)
1087

    
1088
    backend.mfree = resources['mfree']
1089
    backend.mtotal = resources['mtotal']
1090
    backend.dfree = resources['dfree']
1091
    backend.dtotal = resources['dtotal']
1092
    backend.pinst_cnt = resources['pinst_cnt']
1093
    backend.ctotal = resources['ctotal']
1094
    backend.updated = datetime.now()
1095
    backend.save()
1096

    
1097

    
1098
def get_memory_from_instances(backend):
1099
    """ Get the memory that is used from instances.
1100

1101
    Get the used memory of a backend. Note: This is different for
1102
    the real memory used, due to kvm's memory de-duplication.
1103

1104
    """
1105
    with pooled_rapi_client(backend) as client:
1106
        instances = client.GetInstances(bulk=True)
1107
    mem = 0
1108
    for i in instances:
1109
        mem += i['oper_ram']
1110
    return mem
1111

    
1112

    
1113
def get_available_disk_templates(backend):
1114
    """Get the list of available disk templates of a Ganeti backend.
1115

1116
    The list contains the disk templates that are enabled in the Ganeti backend
1117
    and also included in ipolicy-disk-templates.
1118

1119
    """
1120
    with pooled_rapi_client(backend) as c:
1121
        info = c.GetInfo()
1122
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1123
    try:
1124
        enabled_disk_templates = info["enabled_disk_templates"]
1125
        return [dp for dp in enabled_disk_templates
1126
                if dp in ipolicy_disk_templates]
1127
    except KeyError:
1128
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1129
        return ipolicy_disk_templates
1130

    
1131

    
1132
def update_backend_disk_templates(backend):
1133
    disk_templates = get_available_disk_templates(backend)
1134
    backend.disk_templates = disk_templates
1135
    backend.save()
1136

    
1137

    
1138
##
1139
## Synchronized operations for reconciliation
1140
##
1141

    
1142

    
1143
def create_network_synced(network, backend):
1144
    result = _create_network_synced(network, backend)
1145
    if result[0] != rapi.JOB_STATUS_SUCCESS:
1146
        return result
1147
    result = connect_network_synced(network, backend)
1148
    return result
1149

    
1150

    
1151
def _create_network_synced(network, backend):
1152
    with pooled_rapi_client(backend) as client:
1153
        job = _create_network(network, backend)
1154
        result = wait_for_job(client, job)
1155
    return result
1156

    
1157

    
1158
def connect_network_synced(network, backend):
1159
    with pooled_rapi_client(backend) as client:
1160
        for group in client.GetGroups():
1161
            job = client.ConnectNetwork(network.backend_id, group,
1162
                                        network.mode, network.link)
1163
            result = wait_for_job(client, job)
1164
            if result[0] != rapi.JOB_STATUS_SUCCESS:
1165
                return result
1166

    
1167
    return result
1168

    
1169

    
1170
def wait_for_job(client, jobid):
1171
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1172
    status = result['job_info'][0]
1173
    while status not in rapi.JOB_STATUS_FINALIZED:
1174
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1175
                                         [result], None)
1176
        status = result['job_info'][0]
1177

    
1178
    if status == rapi.JOB_STATUS_SUCCESS:
1179
        return (status, None)
1180
    else:
1181
        error = result['job_info'][1]
1182
        return (status, error)
1183

    
1184

    
1185
def create_job_dependencies(job_ids=[], job_states=None):
1186
    """Transform a list of job IDs to Ganeti 'depends' attribute."""
1187
    if job_states is None:
1188
        job_states = list(rapi.JOB_STATUS_FINALIZED)
1189
    assert(type(job_states) == list)
1190
    return [[job_id, job_states] for job_id in job_ids]