Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ bfb3f9c2

History | View | Annotate | Download (40 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress, IPAddressLog)
41
from synnefo.logic import utils, ips
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
61

    
62
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
63
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
64
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
65
UNKNOWN_NIC_PREFIX = "unknown-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in ["success", "error", "canceled"]:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88
    commission_info = quotas.get_commission_info(vm, action=action,
89
                                                 action_fields=job_fields)
90

    
91
    if vm.task_job_id == job_id and vm.serial is not None:
92
        # Commission for this change has already been issued. So just
93
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
94
        # if fails, must be accepted, as the user must manually remove the
95
        # failed server
96
        serial = vm.serial
97
        if job_status == "success":
98
            quotas.accept_serial(serial)
99
        elif job_status in ["error", "canceled"]:
100
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
101
                      serial)
102
            quotas.reject_serial(serial)
103
        vm.serial = None
104
    elif job_status == "success" and commission_info is not None:
105
        log.debug("Expected job was %s. Processing job %s. Commission for"
106
                  " this job: %s", vm.task_job_id, job_id, commission_info)
107
        # Commission for this change has not been issued, or the issued
108
        # commission was unaware of the current change. Reject all previous
109
        # commissions and create a new one in forced mode!
110
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
111
                           % (vm, job_id))
112
        quotas.handle_resource_commission(vm, action,
113
                                          commission_info=commission_info,
114
                                          commission_name=commission_name,
115
                                          force=True,
116
                                          auto_accept=True)
117
        log.debug("Issued new commission: %s", vm.serial)
118

    
119
    return vm
120

    
121

    
122
@transaction.commit_on_success
123
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
124
                      job_fields=None):
125
    """Process a job progress notification from the backend
126

127
    Process an incoming message from the backend (currently Ganeti).
128
    Job notifications with a terminating status (sucess, error, or canceled),
129
    also update the operating state of the VM.
130

131
    """
132
    # See #1492, #1031, #1111 why this line has been removed
133
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
134
    if status not in [x[0] for x in BACKEND_STATUSES]:
135
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
136

    
137
    vm.backendjobid = jobid
138
    vm.backendjobstatus = status
139
    vm.backendopcode = opcode
140
    vm.backendlogmsg = logmsg
141

    
142
    if status in ["queued", "waiting", "running"]:
143
        vm.save()
144
        return
145

    
146
    if job_fields is None:
147
        job_fields = {}
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

    
150
    # Notifications of success change the operating state
151
    if status == "success":
152
        if state_for_success is not None:
153
            vm.operstate = state_for_success
154
        beparams = job_fields.get("beparams", None)
155
        if beparams:
156
            # Change the flavor of the VM
157
            _process_resize(vm, beparams)
158
        # Update backendtime only for jobs that have been successfully
159
        # completed, since only these jobs update the state of the VM. Else a
160
        # "race condition" may occur when a successful job (e.g.
161
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
162
        # in reversed order.
163
        vm.backendtime = etime
164

    
165
    if status in ["success", "error", "canceled"] and nics is not None:
166
        # Update the NICs of the VM
167
        _process_net_status(vm, etime, nics)
168

    
169
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
170
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
171
        vm.operstate = 'ERROR'
172
        vm.backendtime = etime
173
        # Update state of associated NICs
174
        vm.nics.all().update(state="ERROR")
175
    elif opcode == 'OP_INSTANCE_REMOVE':
176
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
177
        # when no instance exists at the Ganeti backend.
178
        # See ticket #799 for all the details.
179
        if status == 'success' or (status == 'error' and
180
                                   not vm_exists_in_backend(vm)):
181
            # VM has been deleted
182
            for nic in vm.nics.all():
183
                # Release the IP
184
                remove_nic_ips(nic)
185
                # And delete the NIC.
186
                nic.delete()
187
            vm.deleted = True
188
            vm.operstate = state_for_success
189
            vm.backendtime = etime
190
            status = "success"
191

    
192
    if status in ["success", "error", "canceled"]:
193
        # Job is finalized: Handle quotas/commissioning
194
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
195
                              job_status=status, job_fields=job_fields)
196
        # and clear task fields
197
        if vm.task_job_id == jobid:
198
            vm.task = None
199
            vm.task_job_id = None
200

    
201
    vm.save()
202

    
203

    
204
def _process_resize(vm, beparams):
205
    """Change flavor of a VirtualMachine based on new beparams."""
206
    old_flavor = vm.flavor
207
    vcpus = beparams.get("vcpus", old_flavor.cpu)
208
    ram = beparams.get("maxmem", old_flavor.ram)
209
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
210
        return
211
    try:
212
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
213
                                        disk=old_flavor.disk,
214
                                        disk_template=old_flavor.disk_template)
215
    except Flavor.DoesNotExist:
216
        raise Exception("Cannot find flavor for VM")
217
    vm.flavor = new_flavor
218
    vm.save()
219

    
220

    
221
@transaction.commit_on_success
222
def process_net_status(vm, etime, nics):
223
    """Wrap _process_net_status inside transaction."""
224
    _process_net_status(vm, etime, nics)
225

    
226

    
227
def _process_net_status(vm, etime, nics):
228
    """Process a net status notification from the backend
229

230
    Process an incoming message from the Ganeti backend,
231
    detailing the NIC configuration of a VM instance.
232

233
    Update the state of the VM in the DB accordingly.
234

235
    """
236
    ganeti_nics = process_ganeti_nics(nics)
237
    db_nics = dict([(nic.id, nic)
238
                    for nic in vm.nics.prefetch_related("ips__subnet")])
239

    
240
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
241
    # guarantee that no deadlock will occur with Backend allocator.
242
    Backend.objects.select_for_update().get(id=vm.backend_id)
243

    
244
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
245
        db_nic = db_nics.get(nic_name)
246
        ganeti_nic = ganeti_nics.get(nic_name)
247
        if ganeti_nic is None:
248
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
249
            # state for more than 5 minutes, then we remove the NIC.
250
            # TODO: This is dangerous as the job may be stack in the queue, and
251
            # releasing the IP may lead to duplicate IP use.
252
            if db_nic.state != "BUILD" or\
253
                (db_nic.state == "BUILD" and
254
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
255
                remove_nic_ips(db_nic)
256
                db_nic.delete()
257
            else:
258
                log.warning("Ignoring recent building NIC: %s", db_nic)
259
        elif db_nic is None:
260
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
261
                   " fix this issue!" % (nic_name, vm))
262
            log.error(msg)
263
            continue
264
        elif not nics_are_equal(db_nic, ganeti_nic):
265
            for f in SIMPLE_NIC_FIELDS:
266
                # Update the NIC in DB with the values from Ganeti NIC
267
                setattr(db_nic, f, ganeti_nic[f])
268
                db_nic.save()
269

    
270
            # Special case where the IPv4 address has changed, because you
271
            # need to release the old IPv4 address and reserve the new one
272
            ipv4_address = ganeti_nic["ipv4_address"]
273
            if db_nic.ipv4_address != ipv4_address:
274
                change_address_of_port(db_nic, vm.userid,
275
                                       old_address=db_nic.ipv4_address,
276
                                       new_address=ipv4_address,
277
                                       version=4)
278

    
279
            ipv6_address = ganeti_nic["ipv6_address"]
280
            if db_nic.ipv6_address != ipv6_address:
281
                change_address_of_port(db_nic, vm.userid,
282
                                       old_address=db_nic.ipv6_address,
283
                                       new_address=ipv6_address,
284
                                       version=6)
285

    
286
    vm.backendtime = etime
287
    vm.save()
288

    
289

    
290
def change_address_of_port(port, userid, old_address, new_address, version):
291
    """Change."""
292
    if old_address is not None:
293
        msg = ("IPv%s Address of server '%s' changed from '%s' to '%s'"
294
               % (version, port.machine_id, old_address, new_address))
295
        log.critical(msg)
296

    
297
    # Remove the old IP address
298
    remove_nic_ips(port, version=version)
299

    
300
    if version == 4:
301
        ipaddress = ips.allocate_ip(port.network, userid, address=new_address)
302
        ipaddress.nic = port
303
        ipaddress.save()
304
    elif version == 6:
305
        subnet6 = port.network.subnet6
306
        ipaddress = IPAddress.objects.create(userid=userid,
307
                                             network=port.network,
308
                                             subnet=subnet6,
309
                                             nic=port,
310
                                             address=new_address)
311
    else:
312
        raise ValueError("Unknown version: %s" % version)
313

    
314
    # New address log
315
    ip_log = IPAddressLog.objects.create(server_id=port.machine_id,
316
                                         network_id=port.network_id,
317
                                         address=new_address,
318
                                         active=True)
319
    log.info("Created IP log entry '%s' for address '%s' to server '%s'",
320
             ip_log.id, new_address, port.machine_id)
321

    
322
    return ipaddress
323

    
324

    
325
def nics_are_equal(db_nic, gnt_nic):
326
    for field in NIC_FIELDS:
327
        if getattr(db_nic, field) != gnt_nic[field]:
328
            return False
329
    return True
330

    
331

    
332
def process_ganeti_nics(ganeti_nics):
333
    """Process NIC dict from ganeti"""
334
    new_nics = []
335
    for index, gnic in enumerate(ganeti_nics):
336
        nic_name = gnic.get("name", None)
337
        if nic_name is not None:
338
            nic_id = utils.id_from_nic_name(nic_name)
339
        else:
340
            # Put as default value the index. If it is an unknown NIC to
341
            # synnefo it will be created automaticaly.
342
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
343
        network_name = gnic.get('network', '')
344
        network_id = utils.id_from_network_name(network_name)
345
        network = Network.objects.get(id=network_id)
346

    
347
        # Get the new nic info
348
        mac = gnic.get('mac')
349
        ipv4 = gnic.get('ip')
350
        subnet6 = network.subnet6
351
        ipv6 = mac2eui64(mac, subnet6.cidr) if subnet6 else None
352

    
353
        firewall = gnic.get('firewall')
354
        firewall_profile = _reverse_tags.get(firewall)
355
        if not firewall_profile and network.public:
356
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
357

    
358
        nic_info = {
359
            'index': index,
360
            'network': network,
361
            'mac': mac,
362
            'ipv4_address': ipv4,
363
            'ipv6_address': ipv6,
364
            'firewall_profile': firewall_profile,
365
            'state': 'ACTIVE'}
366

    
367
        new_nics.append((nic_id, nic_info))
368
    return dict(new_nics)
369

    
370

    
371
def remove_nic_ips(nic, version=None):
372
    """Remove IP addresses associated with a NetworkInterface.
373

374
    Remove all IP addresses that are associated with the NetworkInterface
375
    object, by returning them to the pool and deleting the IPAddress object. If
376
    the IP is a floating IP, then it is just disassociated from the NIC.
377
    If version is specified, then only IP addressses of that version will be
378
    removed.
379

380
    """
381
    for ip in nic.ips.all():
382
        if version and ip.ipversion != version:
383
            continue
384

    
385
        # Update the DB table holding the logging of all IP addresses
386
        terminate_active_ipaddress_log(nic, ip)
387

    
388
        if ip.floating_ip:
389
            ip.nic = None
390
            ip.save()
391
        else:
392
            # Release the IPv4 address
393
            ip.release_address()
394
            ip.delete()
395

    
396

    
397
def terminate_active_ipaddress_log(nic, ip):
398
    """Update DB logging entry for this IP address."""
399
    if not ip.network.public or nic.machine is None:
400
        return
401
    try:
402
        ip_log, created = \
403
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
404
                                               network_id=ip.network_id,
405
                                               address=ip.address,
406
                                               active=True)
407
    except IPAddressLog.MultipleObjectsReturned:
408
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
409
                  "Server %s. Cannot proceed!"
410
                  % (ip.address, ip.network, nic.machine))
411
        log.error(logmsg)
412
        raise
413

    
414
    if created:
415
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
416
                  " but with wrong creation timestamp."
417
                  % (ip.address, ip.network, nic.machine))
418
        log.error(logmsg)
419
    ip_log.released_at = datetime.now()
420
    ip_log.active = False
421
    ip_log.save()
422

    
423

    
424
@transaction.commit_on_success
425
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
426
    if status not in [x[0] for x in BACKEND_STATUSES]:
427
        raise Network.InvalidBackendMsgError(opcode, status)
428

    
429
    back_network.backendjobid = jobid
430
    back_network.backendjobstatus = status
431
    back_network.backendopcode = opcode
432
    back_network.backendlogmsg = logmsg
433

    
434
    # Note: Network is already locked!
435
    network = back_network.network
436

    
437
    # Notifications of success change the operating state
438
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
439
    if status == 'success' and state_for_success is not None:
440
        back_network.operstate = state_for_success
441

    
442
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
443
        back_network.operstate = 'ERROR'
444
        back_network.backendtime = etime
445

    
446
    if opcode == 'OP_NETWORK_REMOVE':
447
        network_is_deleted = (status == "success")
448
        if network_is_deleted or (status == "error" and not
449
                                  network_exists_in_backend(back_network)):
450
            back_network.operstate = state_for_success
451
            back_network.deleted = True
452
            back_network.backendtime = etime
453

    
454
    if status == 'success':
455
        back_network.backendtime = etime
456
    back_network.save()
457
    # Also you must update the state of the Network!!
458
    update_network_state(network)
459

    
460

    
461
def update_network_state(network):
462
    """Update the state of a Network based on BackendNetwork states.
463

464
    Update the state of a Network based on the operstate of the networks in the
465
    backends that network exists.
466

467
    The state of the network is:
468
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
469
    * DELETED: If it is is 'DELETED' in all backends that have been created.
470

471
    This function also releases the resources (MAC prefix or Bridge) and the
472
    quotas for the network.
473

474
    """
475
    if network.deleted:
476
        # Network has already been deleted. Just assert that state is also
477
        # DELETED
478
        if not network.state == "DELETED":
479
            network.state = "DELETED"
480
            network.save()
481
        return
482

    
483
    backend_states = [s.operstate for s in network.backend_networks.all()]
484
    if not backend_states and network.action != "DESTROY":
485
        if network.state != "ACTIVE":
486
            network.state = "ACTIVE"
487
            network.save()
488
            return
489

    
490
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
491
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
492
                     "DELETED")
493

    
494
    # Release the resources on the deletion of the Network
495
    if deleted:
496
        if network.ips.filter(deleted=False, floating_ip=True).exists():
497
            msg = "Cannot delete network %s! Floating IPs still in use!"
498
            log.error(msg % network)
499
            raise Exception(msg % network)
500
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
501
                 network.id, network.mac_prefix, network.link)
502
        network.deleted = True
503
        network.state = "DELETED"
504
        if network.mac_prefix:
505
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
506
                release_resource(res_type="mac_prefix",
507
                                 value=network.mac_prefix)
508
        if network.link:
509
            if network.FLAVORS[network.flavor]["link"] == "pool":
510
                release_resource(res_type="bridge", value=network.link)
511

    
512
        # Set all subnets as deleted
513
        network.subnets.update(deleted=True)
514
        # And delete the IP pools
515
        for subnet in network.subnets.all():
516
            if subnet.ipversion == 4:
517
                subnet.ip_pools.all().delete()
518
        # And all the backend networks since there are useless
519
        network.backend_networks.all().delete()
520

    
521
        # Issue commission
522
        if network.userid:
523
            quotas.issue_and_accept_commission(network, delete=True)
524
            # the above has already saved the object and committed;
525
            # a second save would override others' changes, since the
526
            # object is now unlocked
527
            return
528
        elif not network.public:
529
            log.warning("Network %s does not have an owner!", network.id)
530
    network.save()
531

    
532

    
533
@transaction.commit_on_success
534
def process_network_modify(back_network, etime, jobid, opcode, status,
535
                           job_fields):
536
    assert (opcode == "OP_NETWORK_SET_PARAMS")
537
    if status not in [x[0] for x in BACKEND_STATUSES]:
538
        raise Network.InvalidBackendMsgError(opcode, status)
539

    
540
    back_network.backendjobid = jobid
541
    back_network.backendjobstatus = status
542
    back_network.opcode = opcode
543

    
544
    add_reserved_ips = job_fields.get("add_reserved_ips")
545
    if add_reserved_ips:
546
        network = back_network.network
547
        for ip in add_reserved_ips:
548
            network.reserve_address(ip, external=True)
549

    
550
    if status == 'success':
551
        back_network.backendtime = etime
552
    back_network.save()
553

    
554

    
555
@transaction.commit_on_success
556
def process_create_progress(vm, etime, progress):
557

    
558
    percentage = int(progress)
559

    
560
    # The percentage may exceed 100%, due to the way
561
    # snf-image:copy-progress tracks bytes read by image handling processes
562
    percentage = 100 if percentage > 100 else percentage
563
    if percentage < 0:
564
        raise ValueError("Percentage cannot be negative")
565

    
566
    # FIXME: log a warning here, see #1033
567
#   if last_update > percentage:
568
#       raise ValueError("Build percentage should increase monotonically " \
569
#                        "(old = %d, new = %d)" % (last_update, percentage))
570

    
571
    # This assumes that no message of type 'ganeti-create-progress' is going to
572
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
573
    # the instance is STARTED.  What if the two messages are processed by two
574
    # separate dispatcher threads, and the 'ganeti-op-status' message for
575
    # successful creation gets processed before the 'ganeti-create-progress'
576
    # message? [vkoukis]
577
    #
578
    #if not vm.operstate == 'BUILD':
579
    #    raise VirtualMachine.IllegalState("VM is not in building state")
580

    
581
    vm.buildpercentage = percentage
582
    vm.backendtime = etime
583
    vm.save()
584

    
585

    
586
@transaction.commit_on_success
587
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
588
                               details=None):
589
    """
590
    Create virtual machine instance diagnostic entry.
591

592
    :param vm: VirtualMachine instance to create diagnostic for.
593
    :param message: Diagnostic message.
594
    :param source: Diagnostic source identifier (e.g. image-helper).
595
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
596
    :param etime: The time the message occured (if available).
597
    :param details: Additional details or debug information.
598
    """
599
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
600
                                                   source_date=etime,
601
                                                   message=message,
602
                                                   details=details)
603

    
604

    
605
def create_instance(vm, nics, flavor, image):
606
    """`image` is a dictionary which should contain the keys:
607
            'backend_id', 'format' and 'metadata'
608

609
        metadata value should be a dictionary.
610
    """
611

    
612
    # Handle arguments to CreateInstance() as a dictionary,
613
    # initialize it based on a deployment-specific value.
614
    # This enables the administrator to override deployment-specific
615
    # arguments, such as the disk template to use, name of os provider
616
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
617
    #
618
    kw = vm.backend.get_create_params()
619
    kw['mode'] = 'create'
620
    kw['name'] = vm.backend_vm_id
621
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
622

    
623
    kw['disk_template'] = flavor.disk_template
624
    kw['disks'] = [{"size": flavor.disk * 1024}]
625
    provider = flavor.disk_provider
626
    if provider:
627
        kw['disks'][0]['provider'] = provider
628
        kw['disks'][0]['origin'] = flavor.disk_origin
629

    
630
    kw['nics'] = [{"name": nic.backend_uuid,
631
                   "network": nic.network.backend_id,
632
                   "ip": nic.ipv4_address}
633
                  for nic in nics]
634
    backend = vm.backend
635
    depend_jobs = []
636
    for nic in nics:
637
        network = Network.objects.select_for_update().get(id=nic.network_id)
638
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
639
                                                             network=network)
640
        if bnet.operstate != "ACTIVE":
641
            depend_jobs = create_network(network, backend, connect=True)
642
    kw["depends"] = [[job, ["success", "error", "canceled"]]
643
                     for job in depend_jobs]
644

    
645
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
646
    # kw['os'] = settings.GANETI_OS_PROVIDER
647
    kw['ip_check'] = False
648
    kw['name_check'] = False
649

    
650
    # Do not specific a node explicitly, have
651
    # Ganeti use an iallocator instead
652
    #kw['pnode'] = rapi.GetNodes()[0]
653

    
654
    kw['dry_run'] = settings.TEST
655

    
656
    kw['beparams'] = {
657
        'auto_balance': True,
658
        'vcpus': flavor.cpu,
659
        'memory': flavor.ram}
660

    
661
    kw['osparams'] = {
662
        'config_url': vm.config_url,
663
        # Store image id and format to Ganeti
664
        'img_id': image['backend_id'],
665
        'img_format': image['format']}
666

    
667
    # Use opportunistic locking
668
    kw['opportunistic_locking'] = True
669

    
670
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
671
    # kw['hvparams'] = dict(serial_console=False)
672

    
673
    log.debug("Creating instance %s", utils.hide_pass(kw))
674
    with pooled_rapi_client(vm) as client:
675
        return client.CreateInstance(**kw)
676

    
677

    
678
def delete_instance(vm):
679
    with pooled_rapi_client(vm) as client:
680
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
681

    
682

    
683
def reboot_instance(vm, reboot_type):
684
    assert reboot_type in ('soft', 'hard')
685
    kwargs = {"instance": vm.backend_vm_id,
686
              "reboot_type": "hard"}
687
    # XXX: Currently shutdown_timeout parameter is not supported from the
688
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
689
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
690
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
691
    # of OS API is different from the one in Ganeti, and maps to
692
    # 'shutdown_timeout'.
693
    #if reboot_type == "hard":
694
    #    kwargs["shutdown_timeout"] = 0
695
    if settings.TEST:
696
        kwargs["dry_run"] = True
697
    with pooled_rapi_client(vm) as client:
698
        return client.RebootInstance(**kwargs)
699

    
700

    
701
def startup_instance(vm):
702
    with pooled_rapi_client(vm) as client:
703
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
704

    
705

    
706
def shutdown_instance(vm):
707
    with pooled_rapi_client(vm) as client:
708
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
709

    
710

    
711
def resize_instance(vm, vcpus, memory):
712
    beparams = {"vcpus": int(vcpus),
713
                "minmem": int(memory),
714
                "maxmem": int(memory)}
715
    with pooled_rapi_client(vm) as client:
716
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
717

    
718

    
719
def get_instance_console(vm):
720
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
721
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
722
    # useless (see #783).
723
    #
724
    # Until this is fixed on the Ganeti side, construct a console info reply
725
    # directly.
726
    #
727
    # WARNING: This assumes that VNC runs on port network_port on
728
    #          the instance's primary node, and is probably
729
    #          hypervisor-specific.
730
    #
731
    log.debug("Getting console for vm %s", vm)
732

    
733
    console = {}
734
    console['kind'] = 'vnc'
735

    
736
    with pooled_rapi_client(vm) as client:
737
        i = client.GetInstance(vm.backend_vm_id)
738

    
739
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
740
        raise Exception("hv parameter serial_console cannot be true")
741
    console['host'] = i['pnode']
742
    console['port'] = i['network_port']
743

    
744
    return console
745

    
746

    
747
def get_instance_info(vm):
748
    with pooled_rapi_client(vm) as client:
749
        return client.GetInstance(vm.backend_vm_id)
750

    
751

    
752
def vm_exists_in_backend(vm):
753
    try:
754
        get_instance_info(vm)
755
        return True
756
    except GanetiApiError as e:
757
        if e.code == 404:
758
            return False
759
        raise e
760

    
761

    
762
def get_network_info(backend_network):
763
    with pooled_rapi_client(backend_network) as client:
764
        return client.GetNetwork(backend_network.network.backend_id)
765

    
766

    
767
def network_exists_in_backend(backend_network):
768
    try:
769
        get_network_info(backend_network)
770
        return True
771
    except GanetiApiError as e:
772
        if e.code == 404:
773
            return False
774

    
775

    
776
def create_network(network, backend, connect=True):
777
    """Create a network in a Ganeti backend"""
778
    log.debug("Creating network %s in backend %s", network, backend)
779

    
780
    job_id = _create_network(network, backend)
781

    
782
    if connect:
783
        job_ids = connect_network(network, backend, depends=[job_id])
784
        return job_ids
785
    else:
786
        return [job_id]
787

    
788

    
789
def _create_network(network, backend):
790
    """Create a network."""
791

    
792
    tags = network.backend_tag
793
    subnet = None
794
    subnet6 = None
795
    gateway = None
796
    gateway6 = None
797
    for _subnet in network.subnets.all():
798
        if _subnet.ipversion == 4:
799
            if _subnet.dhcp:
800
                tags.append('nfdhcpd')
801
            subnet = _subnet.cidr
802
            gateway = _subnet.gateway
803
        elif _subnet.ipversion == 6:
804
            subnet6 = _subnet.cidr
805
            gateway6 = _subnet.gateway
806

    
807
    if network.public:
808
        conflicts_check = True
809
        tags.append('public')
810
    else:
811
        conflicts_check = False
812
        tags.append('private')
813

    
814
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
815
    # not support IPv6 only networks. To bypass this limitation, we create the
816
    # network with a dummy network subnet, and make Cyclades connect instances
817
    # to such networks, with address=None.
818
    if subnet is None:
819
        subnet = "10.0.0.0/24"
820

    
821
    try:
822
        bn = BackendNetwork.objects.get(network=network, backend=backend)
823
        mac_prefix = bn.mac_prefix
824
    except BackendNetwork.DoesNotExist:
825
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
826
                        " does not exist" % (network.id, backend.id))
827

    
828
    with pooled_rapi_client(backend) as client:
829
        return client.CreateNetwork(network_name=network.backend_id,
830
                                    network=subnet,
831
                                    network6=subnet6,
832
                                    gateway=gateway,
833
                                    gateway6=gateway6,
834
                                    mac_prefix=mac_prefix,
835
                                    conflicts_check=conflicts_check,
836
                                    tags=tags)
837

    
838

    
839
def connect_network(network, backend, depends=[], group=None):
840
    """Connect a network to nodegroups."""
841
    log.debug("Connecting network %s to backend %s", network, backend)
842

    
843
    if network.public:
844
        conflicts_check = True
845
    else:
846
        conflicts_check = False
847

    
848
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
849
    with pooled_rapi_client(backend) as client:
850
        groups = [group] if group is not None else client.GetGroups()
851
        job_ids = []
852
        for group in groups:
853
            job_id = client.ConnectNetwork(network.backend_id, group,
854
                                           network.mode, network.link,
855
                                           conflicts_check,
856
                                           depends=depends)
857
            job_ids.append(job_id)
858
    return job_ids
859

    
860

    
861
def delete_network(network, backend, disconnect=True):
862
    log.debug("Deleting network %s from backend %s", network, backend)
863

    
864
    depends = []
865
    if disconnect:
866
        depends = disconnect_network(network, backend)
867
    _delete_network(network, backend, depends=depends)
868

    
869

    
870
def _delete_network(network, backend, depends=[]):
871
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
872
    with pooled_rapi_client(backend) as client:
873
        return client.DeleteNetwork(network.backend_id, depends)
874

    
875

    
876
def disconnect_network(network, backend, group=None):
877
    log.debug("Disconnecting network %s to backend %s", network, backend)
878

    
879
    with pooled_rapi_client(backend) as client:
880
        groups = [group] if group is not None else client.GetGroups()
881
        job_ids = []
882
        for group in groups:
883
            job_id = client.DisconnectNetwork(network.backend_id, group)
884
            job_ids.append(job_id)
885
    return job_ids
886

    
887

    
888
def connect_to_network(vm, nic):
889
    network = nic.network
890
    backend = vm.backend
891
    network = Network.objects.select_for_update().get(id=network.id)
892
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
893
                                                         network=network)
894
    depend_jobs = []
895
    if bnet.operstate != "ACTIVE":
896
        depend_jobs = create_network(network, backend, connect=True)
897

    
898
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
899

    
900
    nic = {'name': nic.backend_uuid,
901
           'network': network.backend_id,
902
           'ip': nic.ipv4_address}
903

    
904
    log.debug("Adding NIC %s to VM %s", nic, vm)
905

    
906
    kwargs = {
907
        "instance": vm.backend_vm_id,
908
        "nics": [("add", "-1", nic)],
909
        "depends": depends,
910
    }
911
    if vm.backend.use_hotplug():
912
        kwargs["hotplug"] = True
913
    if settings.TEST:
914
        kwargs["dry_run"] = True
915

    
916
    with pooled_rapi_client(vm) as client:
917
        return client.ModifyInstance(**kwargs)
918

    
919

    
920
def disconnect_from_network(vm, nic):
921
    log.debug("Removing NIC %s of VM %s", nic, vm)
922

    
923
    kwargs = {
924
        "instance": vm.backend_vm_id,
925
        "nics": [("remove", nic.backend_uuid, {})],
926
    }
927
    if vm.backend.use_hotplug():
928
        kwargs["hotplug"] = True
929
    if settings.TEST:
930
        kwargs["dry_run"] = True
931

    
932
    with pooled_rapi_client(vm) as client:
933
        jobID = client.ModifyInstance(**kwargs)
934
        firewall_profile = nic.firewall_profile
935
        if firewall_profile and firewall_profile != "DISABLED":
936
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
937
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
938
                                      dry_run=settings.TEST)
939

    
940
        return jobID
941

    
942

    
943
def set_firewall_profile(vm, profile, nic):
944
    uuid = nic.backend_uuid
945
    try:
946
        tag = _firewall_tags[profile] % uuid
947
    except KeyError:
948
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
949

    
950
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
951

    
952
    with pooled_rapi_client(vm) as client:
953
        # Delete previous firewall tags
954
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
955
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
956
                       if (t % uuid) in old_tags]
957
        if delete_tags:
958
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
959
                                      dry_run=settings.TEST)
960

    
961
        if profile != "DISABLED":
962
            client.AddInstanceTags(vm.backend_vm_id, [tag],
963
                                   dry_run=settings.TEST)
964

    
965
        # XXX NOP ModifyInstance call to force process_net_status to run
966
        # on the dispatcher
967
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
968
        client.ModifyInstance(vm.backend_vm_id,
969
                              os_name=os_name)
970
    return None
971

    
972

    
973
def get_instances(backend, bulk=True):
974
    with pooled_rapi_client(backend) as c:
975
        return c.GetInstances(bulk=bulk)
976

    
977

    
978
def get_nodes(backend, bulk=True):
979
    with pooled_rapi_client(backend) as c:
980
        return c.GetNodes(bulk=bulk)
981

    
982

    
983
def get_jobs(backend, bulk=True):
984
    with pooled_rapi_client(backend) as c:
985
        return c.GetJobs(bulk=bulk)
986

    
987

    
988
def get_physical_resources(backend):
989
    """ Get the physical resources of a backend.
990

991
    Get the resources of a backend as reported by the backend (not the db).
992

993
    """
994
    nodes = get_nodes(backend, bulk=True)
995
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
996
    res = {}
997
    for a in attr:
998
        res[a] = 0
999
    for n in nodes:
1000
        # Filter out drained, offline and not vm_capable nodes since they will
1001
        # not take part in the vm allocation process
1002
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
1003
        if can_host_vms and n['cnodes']:
1004
            for a in attr:
1005
                res[a] += int(n[a] or 0)
1006
    return res
1007

    
1008

    
1009
def update_backend_resources(backend, resources=None):
1010
    """ Update the state of the backend resources in db.
1011

1012
    """
1013

    
1014
    if not resources:
1015
        resources = get_physical_resources(backend)
1016

    
1017
    backend.mfree = resources['mfree']
1018
    backend.mtotal = resources['mtotal']
1019
    backend.dfree = resources['dfree']
1020
    backend.dtotal = resources['dtotal']
1021
    backend.pinst_cnt = resources['pinst_cnt']
1022
    backend.ctotal = resources['ctotal']
1023
    backend.updated = datetime.now()
1024
    backend.save()
1025

    
1026

    
1027
def get_memory_from_instances(backend):
1028
    """ Get the memory that is used from instances.
1029

1030
    Get the used memory of a backend. Note: This is different for
1031
    the real memory used, due to kvm's memory de-duplication.
1032

1033
    """
1034
    with pooled_rapi_client(backend) as client:
1035
        instances = client.GetInstances(bulk=True)
1036
    mem = 0
1037
    for i in instances:
1038
        mem += i['oper_ram']
1039
    return mem
1040

    
1041

    
1042
def get_available_disk_templates(backend):
1043
    """Get the list of available disk templates of a Ganeti backend.
1044

1045
    The list contains the disk templates that are enabled in the Ganeti backend
1046
    and also included in ipolicy-disk-templates.
1047

1048
    """
1049
    with pooled_rapi_client(backend) as c:
1050
        info = c.GetInfo()
1051
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1052
    try:
1053
        enabled_disk_templates = info["enabled_disk_templates"]
1054
        return [dp for dp in enabled_disk_templates
1055
                if dp in ipolicy_disk_templates]
1056
    except KeyError:
1057
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1058
        return ipolicy_disk_templates
1059

    
1060

    
1061
def update_backend_disk_templates(backend):
1062
    disk_templates = get_available_disk_templates(backend)
1063
    backend.disk_templates = disk_templates
1064
    backend.save()
1065

    
1066

    
1067
##
1068
## Synchronized operations for reconciliation
1069
##
1070

    
1071

    
1072
def create_network_synced(network, backend):
1073
    result = _create_network_synced(network, backend)
1074
    if result[0] != 'success':
1075
        return result
1076
    result = connect_network_synced(network, backend)
1077
    return result
1078

    
1079

    
1080
def _create_network_synced(network, backend):
1081
    with pooled_rapi_client(backend) as client:
1082
        job = _create_network(network, backend)
1083
        result = wait_for_job(client, job)
1084
    return result
1085

    
1086

    
1087
def connect_network_synced(network, backend):
1088
    with pooled_rapi_client(backend) as client:
1089
        for group in client.GetGroups():
1090
            job = client.ConnectNetwork(network.backend_id, group,
1091
                                        network.mode, network.link)
1092
            result = wait_for_job(client, job)
1093
            if result[0] != 'success':
1094
                return result
1095

    
1096
    return result
1097

    
1098

    
1099
def wait_for_job(client, jobid):
1100
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1101
    status = result['job_info'][0]
1102
    while status not in ['success', 'error', 'cancel']:
1103
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1104
                                         [result], None)
1105
        status = result['job_info'][0]
1106

    
1107
    if status == 'success':
1108
        return (status, None)
1109
    else:
1110
        error = result['job_info'][1]
1111
        return (status, error)