Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ c82f57ad

History | View | Annotate | Download (38.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddressLog)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource, allocate_ip
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
61

    
62
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
63
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
64
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
65
UNKNOWN_NIC_PREFIX = "unknown-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in ["success", "error", "canceled"]:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88
    commission_info = quotas.get_commission_info(vm, action=action,
89
                                                 action_fields=job_fields)
90

    
91
    if vm.task_job_id == job_id and vm.serial is not None:
92
        # Commission for this change has already been issued. So just
93
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
94
        # if fails, must be accepted, as the user must manually remove the
95
        # failed server
96
        serial = vm.serial
97
        if job_status == "success":
98
            quotas.accept_serial(serial)
99
        elif job_status in ["error", "canceled"]:
100
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
101
                      serial)
102
            quotas.reject_serial(serial)
103
        vm.serial = None
104
    elif job_status == "success" and commission_info is not None:
105
        log.debug("Expected job was %s. Processing job %s. Commission for"
106
                  " this job: %s", vm.task_job_id, job_id, commission_info)
107
        # Commission for this change has not been issued, or the issued
108
        # commission was unaware of the current change. Reject all previous
109
        # commissions and create a new one in forced mode!
110
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
111
                           % (vm, job_id))
112
        quotas.handle_resource_commission(vm, action,
113
                                          commission_info=commission_info,
114
                                          commission_name=commission_name,
115
                                          force=True,
116
                                          auto_accept=True)
117
        log.debug("Issued new commission: %s", vm.serial)
118

    
119
    return vm
120

    
121

    
122
@transaction.commit_on_success
123
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
124
                      job_fields=None):
125
    """Process a job progress notification from the backend
126

127
    Process an incoming message from the backend (currently Ganeti).
128
    Job notifications with a terminating status (sucess, error, or canceled),
129
    also update the operating state of the VM.
130

131
    """
132
    # See #1492, #1031, #1111 why this line has been removed
133
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
134
    if status not in [x[0] for x in BACKEND_STATUSES]:
135
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
136

    
137
    vm.backendjobid = jobid
138
    vm.backendjobstatus = status
139
    vm.backendopcode = opcode
140
    vm.backendlogmsg = logmsg
141

    
142
    if status in ["queued", "waiting", "running"]:
143
        vm.save()
144
        return
145

    
146
    if job_fields is None:
147
        job_fields = {}
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

    
150
    # Notifications of success change the operating state
151
    if status == "success":
152
        if state_for_success is not None:
153
            vm.operstate = state_for_success
154
        beparams = job_fields.get("beparams", None)
155
        if beparams:
156
            # Change the flavor of the VM
157
            _process_resize(vm, beparams)
158
        # Update backendtime only for jobs that have been successfully
159
        # completed, since only these jobs update the state of the VM. Else a
160
        # "race condition" may occur when a successful job (e.g.
161
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
162
        # in reversed order.
163
        vm.backendtime = etime
164

    
165
    if status in ["success", "error", "canceled"] and nics is not None:
166
        # Update the NICs of the VM
167
        _process_net_status(vm, etime, nics)
168

    
169
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
170
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
171
        vm.operstate = 'ERROR'
172
        vm.backendtime = etime
173
        # Update state of associated NICs
174
        vm.nics.all().update(state="ERROR")
175
    elif opcode == 'OP_INSTANCE_REMOVE':
176
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
177
        # when no instance exists at the Ganeti backend.
178
        # See ticket #799 for all the details.
179
        if status == 'success' or (status == 'error' and
180
                                   not vm_exists_in_backend(vm)):
181
            # VM has been deleted
182
            for nic in vm.nics.all():
183
                # Release the IP
184
                remove_nic_ips(nic)
185
                # And delete the NIC.
186
                nic.delete()
187
            vm.deleted = True
188
            vm.operstate = state_for_success
189
            vm.backendtime = etime
190
            status = "success"
191

    
192
    if status in ["success", "error", "canceled"]:
193
        # Job is finalized: Handle quotas/commissioning
194
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
195
                              job_status=status, job_fields=job_fields)
196
        # and clear task fields
197
        if vm.task_job_id == jobid:
198
            vm.task = None
199
            vm.task_job_id = None
200

    
201
    vm.save()
202

    
203

    
204
def _process_resize(vm, beparams):
205
    """Change flavor of a VirtualMachine based on new beparams."""
206
    old_flavor = vm.flavor
207
    vcpus = beparams.get("vcpus", old_flavor.cpu)
208
    ram = beparams.get("maxmem", old_flavor.ram)
209
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
210
        return
211
    try:
212
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
213
                                        disk=old_flavor.disk,
214
                                        disk_template=old_flavor.disk_template)
215
    except Flavor.DoesNotExist:
216
        raise Exception("Can not find flavor for VM")
217
    vm.flavor = new_flavor
218
    vm.save()
219

    
220

    
221
@transaction.commit_on_success
222
def process_net_status(vm, etime, nics):
223
    """Wrap _process_net_status inside transaction."""
224
    _process_net_status(vm, etime, nics)
225

    
226

    
227
def _process_net_status(vm, etime, nics):
228
    """Process a net status notification from the backend
229

230
    Process an incoming message from the Ganeti backend,
231
    detailing the NIC configuration of a VM instance.
232

233
    Update the state of the VM in the DB accordingly.
234

235
    """
236
    ganeti_nics = process_ganeti_nics(nics)
237
    db_nics = dict([(nic.id, nic)
238
                    for nic in vm.nics.prefetch_related("ips__subnet")])
239

    
240
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
241
    # guarantee that no deadlock will occur with Backend allocator.
242
    Backend.objects.select_for_update().get(id=vm.backend_id)
243

    
244
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
245
        db_nic = db_nics.get(nic_name)
246
        ganeti_nic = ganeti_nics.get(nic_name)
247
        if ganeti_nic is None:
248
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
249
            # state for more than 5 minutes, then we remove the NIC.
250
            # TODO: This is dangerous as the job may be stack in the queue, and
251
            # releasing the IP may lead to duplicate IP use.
252
            if db_nic.state != "BUILD" or\
253
                (db_nic.state == "BUILD" and
254
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
255
                remove_nic_ips(db_nic)
256
                db_nic.delete()
257
            else:
258
                log.warning("Ignoring recent building NIC: %s", db_nic)
259
        elif db_nic is None:
260
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
261
                   " fix this issue!" % (nic_name, vm))
262
            log.error(msg)
263
            continue
264
        elif not nics_are_equal(db_nic, ganeti_nic):
265
            for f in SIMPLE_NIC_FIELDS:
266
                # Update the NIC in DB with the values from Ganeti NIC
267
                setattr(db_nic, f, ganeti_nic[f])
268
                db_nic.save()
269
            # Special case where the IPv4 address has changed, because you
270
            # need to release the old IPv4 address and reserve the new one
271
            ipv4_address = ganeti_nic["ipv4_address"]
272
            if db_nic.ipv4_address != ipv4_address:
273
                remove_nic_ips(db_nic)
274
                if ipv4_address:
275
                    network = ganeti_nic["network"]
276
                    ipaddress = allocate_ip(network, vm.userid,
277
                                            address=ipv4_address)
278
                    ipaddress.nic = nic
279
                    ipaddress.save()
280

    
281
    vm.backendtime = etime
282
    vm.save()
283

    
284

    
285
def nics_are_equal(db_nic, gnt_nic):
286
    for field in NIC_FIELDS:
287
        if getattr(db_nic, field) != gnt_nic[field]:
288
            return False
289
    return True
290

    
291

    
292
def process_ganeti_nics(ganeti_nics):
293
    """Process NIC dict from ganeti"""
294
    new_nics = []
295
    for index, gnic in enumerate(ganeti_nics):
296
        nic_name = gnic.get("name", None)
297
        if nic_name is not None:
298
            nic_id = utils.id_from_nic_name(nic_name)
299
        else:
300
            # Put as default value the index. If it is an unknown NIC to
301
            # synnefo it will be created automaticaly.
302
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
303
        network_name = gnic.get('network', '')
304
        network_id = utils.id_from_network_name(network_name)
305
        network = Network.objects.get(id=network_id)
306

    
307
        # Get the new nic info
308
        mac = gnic.get('mac')
309
        ipv4 = gnic.get('ip')
310
        subnet6 = network.subnet6
311
        ipv6 = mac2eui64(mac, subnet6) if subnet6 else None
312

    
313
        firewall = gnic.get('firewall')
314
        firewall_profile = _reverse_tags.get(firewall)
315
        if not firewall_profile and network.public:
316
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
317

    
318
        nic_info = {
319
            'index': index,
320
            'network': network,
321
            'mac': mac,
322
            'ipv4_address': ipv4,
323
            'ipv6_address': ipv6,
324
            'firewall_profile': firewall_profile,
325
            'state': 'ACTIVE'}
326

    
327
        new_nics.append((nic_id, nic_info))
328
    return dict(new_nics)
329

    
330

    
331
def remove_nic_ips(nic):
332
    """Remove IP addresses associated with a NetworkInterface.
333

334
    Remove all IP addresses that are associated with the NetworkInterface
335
    object, by returning them to the pool and deleting the IPAddress object. If
336
    the IP is a floating IP, then it is just disassociated from the NIC.
337

338
    """
339

    
340
    for ip in nic.ips.all():
341
        # Update the DB table holding the logging of all IP addresses
342
        update_ip_address_log(nic, ip)
343

    
344
        if ip.ipversion == 4:
345
            if ip.floating_ip:
346
                ip.nic = None
347
                ip.save()
348
            else:
349
                ip.release_address()
350
        if not ip.floating_ip:
351
            ip.delete()
352

    
353

    
354
def update_ip_address_log(nic, ip):
355
    """Update DB logging entry for this IP address."""
356
    if not ip.network.public or nic.machine is None:
357
        return
358
    try:
359
        ip_log, created = \
360
            IPAddressLog.objects.get_or_create(server_id=nic.machine_id,
361
                                               network_id=ip.network_id,
362
                                               address=ip.address,
363
                                               active=True)
364
    except IPAddressLog.MultipleObjectsReturned:
365
        logmsg = ("Multiple active log entries for IP %s, Network %s,"
366
                  "Server %s. Can not proceed!"
367
                  % (ip.address, ip.network, nic.machine))
368
        log.error(logmsg)
369
        raise
370

    
371
    if created:
372
        logmsg = ("No log entry for IP %s, Network %s, Server %s. Created new"
373
                  " but with wrong creation timestamp."
374
                  % (ip.address, ip.network, nic.machine))
375
        log.error(logmsg)
376
    ip_log.released_at = datetime.now()
377
    ip_log.active = False
378
    ip_log.save()
379

    
380

    
381
@transaction.commit_on_success
382
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
383
    if status not in [x[0] for x in BACKEND_STATUSES]:
384
        raise Network.InvalidBackendMsgError(opcode, status)
385

    
386
    back_network.backendjobid = jobid
387
    back_network.backendjobstatus = status
388
    back_network.backendopcode = opcode
389
    back_network.backendlogmsg = logmsg
390

    
391
    # Note: Network is already locked!
392
    network = back_network.network
393

    
394
    # Notifications of success change the operating state
395
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
396
    if status == 'success' and state_for_success is not None:
397
        back_network.operstate = state_for_success
398

    
399
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
400
        back_network.operstate = 'ERROR'
401
        back_network.backendtime = etime
402

    
403
    if opcode == 'OP_NETWORK_REMOVE':
404
        network_is_deleted = (status == "success")
405
        if network_is_deleted or (status == "error" and not
406
                                  network_exists_in_backend(back_network)):
407
            back_network.operstate = state_for_success
408
            back_network.deleted = True
409
            back_network.backendtime = etime
410

    
411
    if status == 'success':
412
        back_network.backendtime = etime
413
    back_network.save()
414
    # Also you must update the state of the Network!!
415
    update_network_state(network)
416

    
417

    
418
def update_network_state(network):
419
    """Update the state of a Network based on BackendNetwork states.
420

421
    Update the state of a Network based on the operstate of the networks in the
422
    backends that network exists.
423

424
    The state of the network is:
425
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
426
    * DELETED: If it is is 'DELETED' in all backends that have been created.
427

428
    This function also releases the resources (MAC prefix or Bridge) and the
429
    quotas for the network.
430

431
    """
432
    if network.deleted:
433
        # Network has already been deleted. Just assert that state is also
434
        # DELETED
435
        if not network.state == "DELETED":
436
            network.state = "DELETED"
437
            network.save()
438
        return
439

    
440
    backend_states = [s.operstate for s in network.backend_networks.all()]
441
    if not backend_states and network.action != "DESTROY":
442
        if network.state != "ACTIVE":
443
            network.state = "ACTIVE"
444
            network.save()
445
            return
446

    
447
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
448
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
449
                     "DELETED")
450

    
451
    # Release the resources on the deletion of the Network
452
    if deleted:
453
        if network.ips.filter(deleted=False, floating_ip=True).exists():
454
            msg = "Can not delete network %s! Floating IPs still in use!"
455
            log.error(msg % network)
456
            raise Exception(msg % network)
457
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
458
                 network.id, network.mac_prefix, network.link)
459
        network.deleted = True
460
        network.state = "DELETED"
461
        if network.mac_prefix:
462
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
463
                release_resource(res_type="mac_prefix",
464
                                 value=network.mac_prefix)
465
        if network.link:
466
            if network.FLAVORS[network.flavor]["link"] == "pool":
467
                release_resource(res_type="bridge", value=network.link)
468

    
469
        # Set all subnets as deleted
470
        network.subnets.update(deleted=True)
471
        # And delete the IP pools
472
        for subnet in network.subnets.all():
473
            if subnet.ipversion == 4:
474
                subnet.ip_pools.all().delete()
475

    
476
        # Issue commission
477
        if network.userid:
478
            quotas.issue_and_accept_commission(network, delete=True)
479
            # the above has already saved the object and committed;
480
            # a second save would override others' changes, since the
481
            # object is now unlocked
482
            return
483
        elif not network.public:
484
            log.warning("Network %s does not have an owner!", network.id)
485
    network.save()
486

    
487

    
488
@transaction.commit_on_success
489
def process_network_modify(back_network, etime, jobid, opcode, status,
490
                           job_fields):
491
    assert (opcode == "OP_NETWORK_SET_PARAMS")
492
    if status not in [x[0] for x in BACKEND_STATUSES]:
493
        raise Network.InvalidBackendMsgError(opcode, status)
494

    
495
    back_network.backendjobid = jobid
496
    back_network.backendjobstatus = status
497
    back_network.opcode = opcode
498

    
499
    add_reserved_ips = job_fields.get("add_reserved_ips")
500
    if add_reserved_ips:
501
        network = back_network.network
502
        for ip in add_reserved_ips:
503
            network.reserve_address(ip, external=True)
504

    
505
    if status == 'success':
506
        back_network.backendtime = etime
507
    back_network.save()
508

    
509

    
510
@transaction.commit_on_success
511
def process_create_progress(vm, etime, progress):
512

    
513
    percentage = int(progress)
514

    
515
    # The percentage may exceed 100%, due to the way
516
    # snf-image:copy-progress tracks bytes read by image handling processes
517
    percentage = 100 if percentage > 100 else percentage
518
    if percentage < 0:
519
        raise ValueError("Percentage cannot be negative")
520

    
521
    # FIXME: log a warning here, see #1033
522
#   if last_update > percentage:
523
#       raise ValueError("Build percentage should increase monotonically " \
524
#                        "(old = %d, new = %d)" % (last_update, percentage))
525

    
526
    # This assumes that no message of type 'ganeti-create-progress' is going to
527
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
528
    # the instance is STARTED.  What if the two messages are processed by two
529
    # separate dispatcher threads, and the 'ganeti-op-status' message for
530
    # successful creation gets processed before the 'ganeti-create-progress'
531
    # message? [vkoukis]
532
    #
533
    #if not vm.operstate == 'BUILD':
534
    #    raise VirtualMachine.IllegalState("VM is not in building state")
535

    
536
    vm.buildpercentage = percentage
537
    vm.backendtime = etime
538
    vm.save()
539

    
540

    
541
@transaction.commit_on_success
542
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
543
                               details=None):
544
    """
545
    Create virtual machine instance diagnostic entry.
546

547
    :param vm: VirtualMachine instance to create diagnostic for.
548
    :param message: Diagnostic message.
549
    :param source: Diagnostic source identifier (e.g. image-helper).
550
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
551
    :param etime: The time the message occured (if available).
552
    :param details: Additional details or debug information.
553
    """
554
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
555
                                                   source_date=etime,
556
                                                   message=message,
557
                                                   details=details)
558

    
559

    
560
def create_instance(vm, nics, flavor, image):
561
    """`image` is a dictionary which should contain the keys:
562
            'backend_id', 'format' and 'metadata'
563

564
        metadata value should be a dictionary.
565
    """
566

    
567
    # Handle arguments to CreateInstance() as a dictionary,
568
    # initialize it based on a deployment-specific value.
569
    # This enables the administrator to override deployment-specific
570
    # arguments, such as the disk template to use, name of os provider
571
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
572
    #
573
    kw = vm.backend.get_create_params()
574
    kw['mode'] = 'create'
575
    kw['name'] = vm.backend_vm_id
576
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
577

    
578
    kw['disk_template'] = flavor.disk_template
579
    kw['disks'] = [{"size": flavor.disk * 1024}]
580
    provider = flavor.disk_provider
581
    if provider:
582
        kw['disks'][0]['provider'] = provider
583
        kw['disks'][0]['origin'] = flavor.disk_origin
584

    
585
    kw['nics'] = [{"name": nic.backend_uuid,
586
                   "network": nic.network.backend_id,
587
                   "ip": nic.ipv4_address}
588
                  for nic in nics]
589
    backend = vm.backend
590
    depend_jobs = []
591
    for nic in nics:
592
        network = Network.objects.select_for_update().get(id=nic.network_id)
593
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
594
                                                             network=network)
595
        if bnet.operstate != "ACTIVE":
596
            depend_jobs = create_network(network, backend, connect=True)
597
    kw["depends"] = [[job, ["success", "error", "canceled"]]
598
                     for job in depend_jobs]
599

    
600
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
601
    # kw['os'] = settings.GANETI_OS_PROVIDER
602
    kw['ip_check'] = False
603
    kw['name_check'] = False
604

    
605
    # Do not specific a node explicitly, have
606
    # Ganeti use an iallocator instead
607
    #kw['pnode'] = rapi.GetNodes()[0]
608

    
609
    kw['dry_run'] = settings.TEST
610

    
611
    kw['beparams'] = {
612
        'auto_balance': True,
613
        'vcpus': flavor.cpu,
614
        'memory': flavor.ram}
615

    
616
    kw['osparams'] = {
617
        'config_url': vm.config_url,
618
        # Store image id and format to Ganeti
619
        'img_id': image['backend_id'],
620
        'img_format': image['format']}
621

    
622
    # Use opportunistic locking
623
    kw['opportunistic_locking'] = True
624

    
625
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
626
    # kw['hvparams'] = dict(serial_console=False)
627

    
628
    log.debug("Creating instance %s", utils.hide_pass(kw))
629
    with pooled_rapi_client(vm) as client:
630
        return client.CreateInstance(**kw)
631

    
632

    
633
def delete_instance(vm):
634
    with pooled_rapi_client(vm) as client:
635
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
636

    
637

    
638
def reboot_instance(vm, reboot_type):
639
    assert reboot_type in ('soft', 'hard')
640
    kwargs = {"instance": vm.backend_vm_id,
641
              "reboot_type": "hard"}
642
    # XXX: Currently shutdown_timeout parameter is not supported from the
643
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
644
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
645
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
646
    # of OS API is different from the one in Ganeti, and maps to
647
    # 'shutdown_timeout'.
648
    #if reboot_type == "hard":
649
    #    kwargs["shutdown_timeout"] = 0
650
    if settings.TEST:
651
        kwargs["dry_run"] = True
652
    with pooled_rapi_client(vm) as client:
653
        return client.RebootInstance(**kwargs)
654

    
655

    
656
def startup_instance(vm):
657
    with pooled_rapi_client(vm) as client:
658
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
659

    
660

    
661
def shutdown_instance(vm):
662
    with pooled_rapi_client(vm) as client:
663
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
664

    
665

    
666
def resize_instance(vm, vcpus, memory):
667
    beparams = {"vcpus": int(vcpus),
668
                "minmem": int(memory),
669
                "maxmem": int(memory)}
670
    with pooled_rapi_client(vm) as client:
671
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
672

    
673

    
674
def get_instance_console(vm):
675
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
676
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
677
    # useless (see #783).
678
    #
679
    # Until this is fixed on the Ganeti side, construct a console info reply
680
    # directly.
681
    #
682
    # WARNING: This assumes that VNC runs on port network_port on
683
    #          the instance's primary node, and is probably
684
    #          hypervisor-specific.
685
    #
686
    log.debug("Getting console for vm %s", vm)
687

    
688
    console = {}
689
    console['kind'] = 'vnc'
690

    
691
    with pooled_rapi_client(vm) as client:
692
        i = client.GetInstance(vm.backend_vm_id)
693

    
694
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
695
        raise Exception("hv parameter serial_console cannot be true")
696
    console['host'] = i['pnode']
697
    console['port'] = i['network_port']
698

    
699
    return console
700

    
701

    
702
def get_instance_info(vm):
703
    with pooled_rapi_client(vm) as client:
704
        return client.GetInstance(vm.backend_vm_id)
705

    
706

    
707
def vm_exists_in_backend(vm):
708
    try:
709
        get_instance_info(vm)
710
        return True
711
    except GanetiApiError as e:
712
        if e.code == 404:
713
            return False
714
        raise e
715

    
716

    
717
def get_network_info(backend_network):
718
    with pooled_rapi_client(backend_network) as client:
719
        return client.GetNetwork(backend_network.network.backend_id)
720

    
721

    
722
def network_exists_in_backend(backend_network):
723
    try:
724
        get_network_info(backend_network)
725
        return True
726
    except GanetiApiError as e:
727
        if e.code == 404:
728
            return False
729

    
730

    
731
def create_network(network, backend, connect=True):
732
    """Create a network in a Ganeti backend"""
733
    log.debug("Creating network %s in backend %s", network, backend)
734

    
735
    job_id = _create_network(network, backend)
736

    
737
    if connect:
738
        job_ids = connect_network(network, backend, depends=[job_id])
739
        return job_ids
740
    else:
741
        return [job_id]
742

    
743

    
744
def _create_network(network, backend):
745
    """Create a network."""
746

    
747
    tags = network.backend_tag
748
    subnet = None
749
    subnet6 = None
750
    gateway = None
751
    gateway6 = None
752
    for _subnet in network.subnets.all():
753
        if _subnet.ipversion == 4:
754
            if _subnet.dhcp:
755
                tags.append('nfdhcpd')
756
            subnet = _subnet.cidr
757
            gateway = _subnet.gateway
758
        elif _subnet.ipversion == 6:
759
            subnet6 = _subnet.cidr
760
            gateway6 = _subnet.gateway
761

    
762
    if network.public:
763
        conflicts_check = True
764
        tags.append('public')
765
    else:
766
        conflicts_check = False
767
        tags.append('private')
768

    
769
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
770
    # not support IPv6 only networks. To bypass this limitation, we create the
771
    # network with a dummy network subnet, and make Cyclades connect instances
772
    # to such networks, with address=None.
773
    if subnet is None:
774
        subnet = "10.0.0.0/24"
775

    
776
    try:
777
        bn = BackendNetwork.objects.get(network=network, backend=backend)
778
        mac_prefix = bn.mac_prefix
779
    except BackendNetwork.DoesNotExist:
780
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
781
                        " does not exist" % (network.id, backend.id))
782

    
783
    with pooled_rapi_client(backend) as client:
784
        return client.CreateNetwork(network_name=network.backend_id,
785
                                    network=subnet,
786
                                    network6=subnet6,
787
                                    gateway=gateway,
788
                                    gateway6=gateway6,
789
                                    mac_prefix=mac_prefix,
790
                                    conflicts_check=conflicts_check,
791
                                    tags=tags)
792

    
793

    
794
def connect_network(network, backend, depends=[], group=None):
795
    """Connect a network to nodegroups."""
796
    log.debug("Connecting network %s to backend %s", network, backend)
797

    
798
    if network.public:
799
        conflicts_check = True
800
    else:
801
        conflicts_check = False
802

    
803
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
804
    with pooled_rapi_client(backend) as client:
805
        groups = [group] if group is not None else client.GetGroups()
806
        job_ids = []
807
        for group in groups:
808
            job_id = client.ConnectNetwork(network.backend_id, group,
809
                                           network.mode, network.link,
810
                                           conflicts_check,
811
                                           depends=depends)
812
            job_ids.append(job_id)
813
    return job_ids
814

    
815

    
816
def delete_network(network, backend, disconnect=True):
817
    log.debug("Deleting network %s from backend %s", network, backend)
818

    
819
    depends = []
820
    if disconnect:
821
        depends = disconnect_network(network, backend)
822
    _delete_network(network, backend, depends=depends)
823

    
824

    
825
def _delete_network(network, backend, depends=[]):
826
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
827
    with pooled_rapi_client(backend) as client:
828
        return client.DeleteNetwork(network.backend_id, depends)
829

    
830

    
831
def disconnect_network(network, backend, group=None):
832
    log.debug("Disconnecting network %s to backend %s", network, backend)
833

    
834
    with pooled_rapi_client(backend) as client:
835
        groups = [group] if group is not None else client.GetGroups()
836
        job_ids = []
837
        for group in groups:
838
            job_id = client.DisconnectNetwork(network.backend_id, group)
839
            job_ids.append(job_id)
840
    return job_ids
841

    
842

    
843
def connect_to_network(vm, nic):
844
    network = nic.network
845
    backend = vm.backend
846
    network = Network.objects.select_for_update().get(id=network.id)
847
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
848
                                                         network=network)
849
    depend_jobs = []
850
    if bnet.operstate != "ACTIVE":
851
        depend_jobs = create_network(network, backend, connect=True)
852

    
853
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
854

    
855
    nic = {'name': nic.backend_uuid,
856
           'network': network.backend_id,
857
           'ip': nic.ipv4_address}
858

    
859
    log.debug("Adding NIC %s to VM %s", nic, vm)
860

    
861
    kwargs = {
862
        "instance": vm.backend_vm_id,
863
        "nics": [("add", "-1", nic)],
864
        "depends": depends,
865
    }
866
    if vm.backend.use_hotplug():
867
        kwargs["hotplug"] = True
868
    if settings.TEST:
869
        kwargs["dry_run"] = True
870

    
871
    with pooled_rapi_client(vm) as client:
872
        return client.ModifyInstance(**kwargs)
873

    
874

    
875
def disconnect_from_network(vm, nic):
876
    log.debug("Removing NIC %s of VM %s", nic, vm)
877

    
878
    kwargs = {
879
        "instance": vm.backend_vm_id,
880
        "nics": [("remove", nic.backend_uuid, {})],
881
    }
882
    if vm.backend.use_hotplug():
883
        kwargs["hotplug"] = True
884
    if settings.TEST:
885
        kwargs["dry_run"] = True
886

    
887
    with pooled_rapi_client(vm) as client:
888
        jobID = client.ModifyInstance(**kwargs)
889
        firewall_profile = nic.firewall_profile
890
        if firewall_profile and firewall_profile != "DISABLED":
891
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
892
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
893
                                      dry_run=settings.TEST)
894

    
895
        return jobID
896

    
897

    
898
def set_firewall_profile(vm, profile, nic):
899
    uuid = nic.backend_uuid
900
    try:
901
        tag = _firewall_tags[profile] % uuid
902
    except KeyError:
903
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
904

    
905
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
906

    
907
    with pooled_rapi_client(vm) as client:
908
        # Delete previous firewall tags
909
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
910
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
911
                       if (t % uuid) in old_tags]
912
        if delete_tags:
913
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
914
                                      dry_run=settings.TEST)
915

    
916
        if profile != "DISABLED":
917
            client.AddInstanceTags(vm.backend_vm_id, [tag],
918
                                   dry_run=settings.TEST)
919

    
920
        # XXX NOP ModifyInstance call to force process_net_status to run
921
        # on the dispatcher
922
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
923
        client.ModifyInstance(vm.backend_vm_id,
924
                              os_name=os_name)
925
    return None
926

    
927

    
928
def get_instances(backend, bulk=True):
929
    with pooled_rapi_client(backend) as c:
930
        return c.GetInstances(bulk=bulk)
931

    
932

    
933
def get_nodes(backend, bulk=True):
934
    with pooled_rapi_client(backend) as c:
935
        return c.GetNodes(bulk=bulk)
936

    
937

    
938
def get_jobs(backend, bulk=True):
939
    with pooled_rapi_client(backend) as c:
940
        return c.GetJobs(bulk=bulk)
941

    
942

    
943
def get_physical_resources(backend):
944
    """ Get the physical resources of a backend.
945

946
    Get the resources of a backend as reported by the backend (not the db).
947

948
    """
949
    nodes = get_nodes(backend, bulk=True)
950
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
951
    res = {}
952
    for a in attr:
953
        res[a] = 0
954
    for n in nodes:
955
        # Filter out drained, offline and not vm_capable nodes since they will
956
        # not take part in the vm allocation process
957
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
958
        if can_host_vms and n['cnodes']:
959
            for a in attr:
960
                res[a] += int(n[a] or 0)
961
    return res
962

    
963

    
964
def update_backend_resources(backend, resources=None):
965
    """ Update the state of the backend resources in db.
966

967
    """
968

    
969
    if not resources:
970
        resources = get_physical_resources(backend)
971

    
972
    backend.mfree = resources['mfree']
973
    backend.mtotal = resources['mtotal']
974
    backend.dfree = resources['dfree']
975
    backend.dtotal = resources['dtotal']
976
    backend.pinst_cnt = resources['pinst_cnt']
977
    backend.ctotal = resources['ctotal']
978
    backend.updated = datetime.now()
979
    backend.save()
980

    
981

    
982
def get_memory_from_instances(backend):
983
    """ Get the memory that is used from instances.
984

985
    Get the used memory of a backend. Note: This is different for
986
    the real memory used, due to kvm's memory de-duplication.
987

988
    """
989
    with pooled_rapi_client(backend) as client:
990
        instances = client.GetInstances(bulk=True)
991
    mem = 0
992
    for i in instances:
993
        mem += i['oper_ram']
994
    return mem
995

    
996

    
997
def get_available_disk_templates(backend):
998
    """Get the list of available disk templates of a Ganeti backend.
999

1000
    The list contains the disk templates that are enabled in the Ganeti backend
1001
    and also included in ipolicy-disk-templates.
1002

1003
    """
1004
    with pooled_rapi_client(backend) as c:
1005
        info = c.GetInfo()
1006
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1007
    try:
1008
        enabled_disk_templates = info["enabled_disk_templates"]
1009
        return [dp for dp in enabled_disk_templates
1010
                if dp in ipolicy_disk_templates]
1011
    except KeyError:
1012
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1013
        return ipolicy_disk_templates
1014

    
1015

    
1016
def update_backend_disk_templates(backend):
1017
    disk_templates = get_available_disk_templates(backend)
1018
    backend.disk_templates = disk_templates
1019
    backend.save()
1020

    
1021

    
1022
##
1023
## Synchronized operations for reconciliation
1024
##
1025

    
1026

    
1027
def create_network_synced(network, backend):
1028
    result = _create_network_synced(network, backend)
1029
    if result[0] != 'success':
1030
        return result
1031
    result = connect_network_synced(network, backend)
1032
    return result
1033

    
1034

    
1035
def _create_network_synced(network, backend):
1036
    with pooled_rapi_client(backend) as client:
1037
        job = _create_network(network, backend)
1038
        result = wait_for_job(client, job)
1039
    return result
1040

    
1041

    
1042
def connect_network_synced(network, backend):
1043
    with pooled_rapi_client(backend) as client:
1044
        for group in client.GetGroups():
1045
            job = client.ConnectNetwork(network.backend_id, group,
1046
                                        network.mode, network.link)
1047
            result = wait_for_job(client, job)
1048
            if result[0] != 'success':
1049
                return result
1050

    
1051
    return result
1052

    
1053

    
1054
def wait_for_job(client, jobid):
1055
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1056
    status = result['job_info'][0]
1057
    while status not in ['success', 'error', 'cancel']:
1058
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1059
                                         [result], None)
1060
        status = result['job_info'][0]
1061

    
1062
    if status == 'success':
1063
        return (status, None)
1064
    else:
1065
        error = result['job_info'][1]
1066
        return (status, error)