Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 3278725f

History | View | Annotate | Download (36.9 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource, allocate_ip
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
61

    
62
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
63
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
64
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
65
UNKNOWN_NIC_PREFIX = "unknown-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in ["success", "error", "canceled"]:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88
    commission_info = quotas.get_commission_info(vm, action=action,
89
                                                 action_fields=job_fields)
90

    
91
    if vm.task_job_id == job_id and vm.serial is not None:
92
        # Commission for this change has already been issued. So just
93
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
94
        # if fails, must be accepted, as the user must manually remove the
95
        # failed server
96
        serial = vm.serial
97
        if job_status == "success":
98
            quotas.accept_serial(serial)
99
        elif job_status in ["error", "canceled"]:
100
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
101
                      serial)
102
            quotas.reject_serial(serial)
103
        vm.serial = None
104
    elif job_status == "success" and commission_info is not None:
105
        log.debug("Expected job was %s. Processing job %s. Commission for"
106
                  " this job: %s", vm.task_job_id, job_id, commission_info)
107
        # Commission for this change has not been issued, or the issued
108
        # commission was unaware of the current change. Reject all previous
109
        # commissions and create a new one in forced mode!
110
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
111
                           % (vm, job_id))
112
        quotas.handle_resource_commission(vm, action,
113
                                          commission_info=commission_info,
114
                                          commission_name=commission_name,
115
                                          force=True,
116
                                          auto_accept=True)
117
        log.debug("Issued new commission: %s", vm.serial)
118

    
119
    return vm
120

    
121

    
122
@transaction.commit_on_success
123
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
124
                      job_fields=None):
125
    """Process a job progress notification from the backend
126

127
    Process an incoming message from the backend (currently Ganeti).
128
    Job notifications with a terminating status (sucess, error, or canceled),
129
    also update the operating state of the VM.
130

131
    """
132
    # See #1492, #1031, #1111 why this line has been removed
133
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
134
    if status not in [x[0] for x in BACKEND_STATUSES]:
135
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
136

    
137
    vm.backendjobid = jobid
138
    vm.backendjobstatus = status
139
    vm.backendopcode = opcode
140
    vm.backendlogmsg = logmsg
141

    
142
    if status in ["queued", "waiting", "running"]:
143
        vm.save()
144
        return
145

    
146
    if job_fields is None:
147
        job_fields = {}
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

    
150
    # Notifications of success change the operating state
151
    if status == "success":
152
        if state_for_success is not None:
153
            vm.operstate = state_for_success
154
        beparams = job_fields.get("beparams", None)
155
        if beparams:
156
            # Change the flavor of the VM
157
            _process_resize(vm, beparams)
158
        # Update backendtime only for jobs that have been successfully
159
        # completed, since only these jobs update the state of the VM. Else a
160
        # "race condition" may occur when a successful job (e.g.
161
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
162
        # in reversed order.
163
        vm.backendtime = etime
164

    
165
    if status in ["success", "error", "canceled"] and nics is not None:
166
        # Update the NICs of the VM
167
        _process_net_status(vm, etime, nics)
168

    
169
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
170
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
171
        vm.operstate = 'ERROR'
172
        vm.backendtime = etime
173
    elif opcode == 'OP_INSTANCE_REMOVE':
174
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
175
        # when no instance exists at the Ganeti backend.
176
        # See ticket #799 for all the details.
177
        if status == 'success' or (status == 'error' and
178
                                   not vm_exists_in_backend(vm)):
179
            # VM has been deleted
180
            for nic in vm.nics.all():
181
                # Release the IP
182
                remove_nic_ips(nic)
183
                # And delete the NIC.
184
                nic.delete()
185
            vm.deleted = True
186
            vm.operstate = state_for_success
187
            vm.backendtime = etime
188
            status = "success"
189

    
190
    if status in ["success", "error", "canceled"]:
191
        # Job is finalized: Handle quotas/commissioning
192
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
193
                              job_status=status, job_fields=job_fields)
194
        # and clear task fields
195
        if vm.task_job_id == jobid:
196
            vm.task = None
197
            vm.task_job_id = None
198

    
199
    vm.save()
200

    
201

    
202
def _process_resize(vm, beparams):
203
    """Change flavor of a VirtualMachine based on new beparams."""
204
    old_flavor = vm.flavor
205
    vcpus = beparams.get("vcpus", old_flavor.cpu)
206
    ram = beparams.get("maxmem", old_flavor.ram)
207
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
208
        return
209
    try:
210
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
211
                                        disk=old_flavor.disk,
212
                                        disk_template=old_flavor.disk_template)
213
    except Flavor.DoesNotExist:
214
        raise Exception("Can not find flavor for VM")
215
    vm.flavor = new_flavor
216
    vm.save()
217

    
218

    
219
@transaction.commit_on_success
220
def process_net_status(vm, etime, nics):
221
    """Wrap _process_net_status inside transaction."""
222
    _process_net_status(vm, etime, nics)
223

    
224

    
225
def _process_net_status(vm, etime, nics):
226
    """Process a net status notification from the backend
227

228
    Process an incoming message from the Ganeti backend,
229
    detailing the NIC configuration of a VM instance.
230

231
    Update the state of the VM in the DB accordingly.
232

233
    """
234
    ganeti_nics = process_ganeti_nics(nics)
235
    db_nics = dict([(nic.id, nic)
236
                    for nic in vm.nics.prefetch_related("ips__subnet")])
237

    
238
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
239
    # guarantee that no deadlock will occur with Backend allocator.
240
    Backend.objects.select_for_update().get(id=vm.backend_id)
241

    
242
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
243
        db_nic = db_nics.get(nic_name)
244
        ganeti_nic = ganeti_nics.get(nic_name)
245
        if ganeti_nic is None:
246
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
247
            # state for more than 5 minutes, then we remove the NIC.
248
            # TODO: This is dangerous as the job may be stack in the queue, and
249
            # releasing the IP may lead to duplicate IP use.
250
            if db_nic.state != "BUILDING" or\
251
                (db_nic.state == "BUILDING" and
252
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
253
                remove_nic_ips(db_nic)
254
                db_nic.delete()
255
            else:
256
                log.warning("Ignoring recent building NIC: %s", db_nic)
257
        elif db_nic is None:
258
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
259
                   " fix this issue!" % (nic_name, vm))
260
            log.error(msg)
261
            continue
262
        elif not nics_are_equal(db_nic, ganeti_nic):
263
            for f in SIMPLE_NIC_FIELDS:
264
                # Update the NIC in DB with the values from Ganeti NIC
265
                setattr(db_nic, f, ganeti_nic[f])
266
                db_nic.save()
267
            # Special case where the IPv4 address has changed, because you
268
            # need to release the old IPv4 address and reserve the new one
269
            ipv4_address = ganeti_nic["ipv4_address"]
270
            if db_nic.ipv4_address != ipv4_address:
271
                remove_nic_ips(db_nic)
272
                if ipv4_address:
273
                    network = ganeti_nic["network"]
274
                    ipaddress = allocate_ip(network, vm.userid,
275
                                            address=ipv4_address)
276
                    ipaddress.nic = nic
277
                    ipaddress.save()
278

    
279
    vm.backendtime = etime
280
    vm.save()
281

    
282

    
283
def nics_are_equal(db_nic, gnt_nic):
284
    for field in NIC_FIELDS:
285
        if getattr(db_nic, field) != gnt_nic[field]:
286
            return False
287
    return True
288

    
289

    
290
def process_ganeti_nics(ganeti_nics):
291
    """Process NIC dict from ganeti"""
292
    new_nics = []
293
    for index, gnic in enumerate(ganeti_nics):
294
        nic_name = gnic.get("name", None)
295
        if nic_name is not None:
296
            nic_id = utils.id_from_nic_name(nic_name)
297
        else:
298
            # Put as default value the index. If it is an unknown NIC to
299
            # synnefo it will be created automaticaly.
300
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
301
        network_name = gnic.get('network', '')
302
        network_id = utils.id_from_network_name(network_name)
303
        network = Network.objects.get(id=network_id)
304

    
305
        # Get the new nic info
306
        mac = gnic.get('mac')
307
        ipv4 = gnic.get('ip')
308
        subnet6 = network.subnet6
309
        ipv6 = mac2eui64(mac, subnet6) if subnet6 else None
310

    
311
        firewall = gnic.get('firewall')
312
        firewall_profile = _reverse_tags.get(firewall)
313
        if not firewall_profile and network.public:
314
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
315

    
316
        nic_info = {
317
            'index': index,
318
            'network': network,
319
            'mac': mac,
320
            'ipv4_address': ipv4,
321
            'ipv6_address': ipv6,
322
            'firewall_profile': firewall_profile,
323
            'state': 'ACTIVE'}
324

    
325
        new_nics.append((nic_id, nic_info))
326
    return dict(new_nics)
327

    
328

    
329
def remove_nic_ips(nic):
330
    """Remove IP addresses associated with a NetworkInterface.
331

332
    Remove all IP addresses that are associated with the NetworkInterface
333
    object, by returning them to the pool and deleting the IPAddress object. If
334
    the IP is a floating IP, then it is just disassociated from the NIC.
335

336
    """
337

    
338
    for ip in nic.ips.all():
339
        if ip.ipversion == 4:
340
            if ip.floating_ip:
341
                ip.nic = None
342
                ip.save()
343
            else:
344
                ip.release_address()
345
        if not ip.floating_ip:
346
            ip.delete()
347

    
348

    
349
@transaction.commit_on_success
350
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
351
    if status not in [x[0] for x in BACKEND_STATUSES]:
352
        raise Network.InvalidBackendMsgError(opcode, status)
353

    
354
    back_network.backendjobid = jobid
355
    back_network.backendjobstatus = status
356
    back_network.backendopcode = opcode
357
    back_network.backendlogmsg = logmsg
358

    
359
    network = back_network.network
360

    
361
    # Notifications of success change the operating state
362
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
363
    if status == 'success' and state_for_success is not None:
364
        back_network.operstate = state_for_success
365

    
366
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
367
        back_network.operstate = 'ERROR'
368
        back_network.backendtime = etime
369

    
370
    if opcode == 'OP_NETWORK_REMOVE':
371
        network_is_deleted = (status == "success")
372
        if network_is_deleted or (status == "error" and not
373
                                  network_exists_in_backend(back_network)):
374
            back_network.operstate = state_for_success
375
            back_network.deleted = True
376
            back_network.backendtime = etime
377

    
378
    if status == 'success':
379
        back_network.backendtime = etime
380
    back_network.save()
381
    # Also you must update the state of the Network!!
382
    update_network_state(network)
383

    
384

    
385
def update_network_state(network):
386
    """Update the state of a Network based on BackendNetwork states.
387

388
    Update the state of a Network based on the operstate of the networks in the
389
    backends that network exists.
390

391
    The state of the network is:
392
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
393
    * DELETED: If it is is 'DELETED' in all backends that have been created.
394

395
    This function also releases the resources (MAC prefix or Bridge) and the
396
    quotas for the network.
397

398
    """
399
    if network.deleted:
400
        # Network has already been deleted. Just assert that state is also
401
        # DELETED
402
        if not network.state == "DELETED":
403
            network.state = "DELETED"
404
            network.save()
405
        return
406

    
407
    backend_states = [s.operstate for s in network.backend_networks.all()]
408
    if not backend_states and network.action != "DESTROY":
409
        if network.state != "ACTIVE":
410
            network.state = "ACTIVE"
411
            network.save()
412
            return
413

    
414
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
415
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
416
                     "DELETED")
417

    
418
    # Release the resources on the deletion of the Network
419
    if deleted:
420
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
421
                 network.id, network.mac_prefix, network.link)
422
        network.deleted = True
423
        network.state = "DELETED"
424
        if network.mac_prefix:
425
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
426
                release_resource(res_type="mac_prefix",
427
                                 value=network.mac_prefix)
428
        if network.link:
429
            if network.FLAVORS[network.flavor]["link"] == "pool":
430
                release_resource(res_type="bridge", value=network.link)
431

    
432
        # Issue commission
433
        if network.userid:
434
            quotas.issue_and_accept_commission(network, delete=True)
435
            # the above has already saved the object and committed;
436
            # a second save would override others' changes, since the
437
            # object is now unlocked
438
            return
439
        elif not network.public:
440
            log.warning("Network %s does not have an owner!", network.id)
441

    
442
        # TODO!!!!!
443
        # Set all subnets as deleted
444
        network.subnets.update(deleted=True)
445
        # And delete the IP pools
446
        network.subnets.ip_pools.all().delete()
447
    network.save()
448

    
449

    
450
@transaction.commit_on_success
451
def process_network_modify(back_network, etime, jobid, opcode, status,
452
                           job_fields):
453
    assert (opcode == "OP_NETWORK_SET_PARAMS")
454
    if status not in [x[0] for x in BACKEND_STATUSES]:
455
        raise Network.InvalidBackendMsgError(opcode, status)
456

    
457
    back_network.backendjobid = jobid
458
    back_network.backendjobstatus = status
459
    back_network.opcode = opcode
460

    
461
    add_reserved_ips = job_fields.get("add_reserved_ips")
462
    if add_reserved_ips:
463
        network = back_network.network
464
        for ip in add_reserved_ips:
465
            network.reserve_address(ip, external=True)
466

    
467
    if status == 'success':
468
        back_network.backendtime = etime
469
    back_network.save()
470

    
471

    
472
@transaction.commit_on_success
473
def process_create_progress(vm, etime, progress):
474

    
475
    percentage = int(progress)
476

    
477
    # The percentage may exceed 100%, due to the way
478
    # snf-image:copy-progress tracks bytes read by image handling processes
479
    percentage = 100 if percentage > 100 else percentage
480
    if percentage < 0:
481
        raise ValueError("Percentage cannot be negative")
482

    
483
    # FIXME: log a warning here, see #1033
484
#   if last_update > percentage:
485
#       raise ValueError("Build percentage should increase monotonically " \
486
#                        "(old = %d, new = %d)" % (last_update, percentage))
487

    
488
    # This assumes that no message of type 'ganeti-create-progress' is going to
489
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
490
    # the instance is STARTED.  What if the two messages are processed by two
491
    # separate dispatcher threads, and the 'ganeti-op-status' message for
492
    # successful creation gets processed before the 'ganeti-create-progress'
493
    # message? [vkoukis]
494
    #
495
    #if not vm.operstate == 'BUILD':
496
    #    raise VirtualMachine.IllegalState("VM is not in building state")
497

    
498
    vm.buildpercentage = percentage
499
    vm.backendtime = etime
500
    vm.save()
501

    
502

    
503
@transaction.commit_on_success
504
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
505
                               details=None):
506
    """
507
    Create virtual machine instance diagnostic entry.
508

509
    :param vm: VirtualMachine instance to create diagnostic for.
510
    :param message: Diagnostic message.
511
    :param source: Diagnostic source identifier (e.g. image-helper).
512
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
513
    :param etime: The time the message occured (if available).
514
    :param details: Additional details or debug information.
515
    """
516
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
517
                                                   source_date=etime,
518
                                                   message=message,
519
                                                   details=details)
520

    
521

    
522
def create_instance(vm, nics, flavor, image):
523
    """`image` is a dictionary which should contain the keys:
524
            'backend_id', 'format' and 'metadata'
525

526
        metadata value should be a dictionary.
527
    """
528

    
529
    # Handle arguments to CreateInstance() as a dictionary,
530
    # initialize it based on a deployment-specific value.
531
    # This enables the administrator to override deployment-specific
532
    # arguments, such as the disk template to use, name of os provider
533
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
534
    #
535
    kw = vm.backend.get_create_params()
536
    kw['mode'] = 'create'
537
    kw['name'] = vm.backend_vm_id
538
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
539

    
540
    kw['disk_template'] = flavor.disk_template
541
    kw['disks'] = [{"size": flavor.disk * 1024}]
542
    provider = flavor.disk_provider
543
    if provider:
544
        kw['disks'][0]['provider'] = provider
545
        kw['disks'][0]['origin'] = flavor.disk_origin
546

    
547
    kw['nics'] = [{"name": nic.backend_uuid,
548
                   "network": nic.network.backend_id,
549
                   "ip": nic.ipv4_address}
550
                  for nic in nics]
551
    backend = vm.backend
552
    depend_jobs = []
553
    for nic in nics:
554
        network = Network.objects.select_for_update().get(id=nic.network_id)
555
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
556
                                                             network=network)
557
        if bnet.operstate != "ACTIVE":
558
            if network.public:
559
                msg = "Can not connect instance to network %s. Network is not"\
560
                      " ACTIVE in backend %s." % (network, backend)
561
                raise Exception(msg)
562
            else:
563
                jobs = create_network(network, backend, connect=True)
564
                if isinstance(jobs, list):
565
                    depend_jobs.extend(jobs)
566
                else:
567
                    depend_jobs.append(jobs)
568
    kw["depends"] = [[job, ["success", "error", "canceled"]]
569
                     for job in depend_jobs]
570

    
571
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
572
    # kw['os'] = settings.GANETI_OS_PROVIDER
573
    kw['ip_check'] = False
574
    kw['name_check'] = False
575

    
576
    # Do not specific a node explicitly, have
577
    # Ganeti use an iallocator instead
578
    #kw['pnode'] = rapi.GetNodes()[0]
579

    
580
    kw['dry_run'] = settings.TEST
581

    
582
    kw['beparams'] = {
583
        'auto_balance': True,
584
        'vcpus': flavor.cpu,
585
        'memory': flavor.ram}
586

    
587
    kw['osparams'] = {
588
        'config_url': vm.config_url,
589
        # Store image id and format to Ganeti
590
        'img_id': image['backend_id'],
591
        'img_format': image['format']}
592

    
593
    # Use opportunistic locking
594
    kw['opportunistic_locking'] = True
595

    
596
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
597
    # kw['hvparams'] = dict(serial_console=False)
598

    
599
    log.debug("Creating instance %s", utils.hide_pass(kw))
600
    with pooled_rapi_client(vm) as client:
601
        return client.CreateInstance(**kw)
602

    
603

    
604
def delete_instance(vm):
605
    with pooled_rapi_client(vm) as client:
606
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
607

    
608

    
609
def reboot_instance(vm, reboot_type):
610
    assert reboot_type in ('soft', 'hard')
611
    kwargs = {"instance": vm.backend_vm_id,
612
              "reboot_type": "hard"}
613
    # XXX: Currently shutdown_timeout parameter is not supported from the
614
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
615
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
616
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
617
    # of OS API is different from the one in Ganeti, and maps to
618
    # 'shutdown_timeout'.
619
    #if reboot_type == "hard":
620
    #    kwargs["shutdown_timeout"] = 0
621
    if settings.TEST:
622
        kwargs["dry_run"] = True
623
    with pooled_rapi_client(vm) as client:
624
        return client.RebootInstance(**kwargs)
625

    
626

    
627
def startup_instance(vm):
628
    with pooled_rapi_client(vm) as client:
629
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
630

    
631

    
632
def shutdown_instance(vm):
633
    with pooled_rapi_client(vm) as client:
634
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
635

    
636

    
637
def resize_instance(vm, vcpus, memory):
638
    beparams = {"vcpus": int(vcpus),
639
                "minmem": int(memory),
640
                "maxmem": int(memory)}
641
    with pooled_rapi_client(vm) as client:
642
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
643

    
644

    
645
def get_instance_console(vm):
646
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
647
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
648
    # useless (see #783).
649
    #
650
    # Until this is fixed on the Ganeti side, construct a console info reply
651
    # directly.
652
    #
653
    # WARNING: This assumes that VNC runs on port network_port on
654
    #          the instance's primary node, and is probably
655
    #          hypervisor-specific.
656
    #
657
    log.debug("Getting console for vm %s", vm)
658

    
659
    console = {}
660
    console['kind'] = 'vnc'
661

    
662
    with pooled_rapi_client(vm) as client:
663
        i = client.GetInstance(vm.backend_vm_id)
664

    
665
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
666
        raise Exception("hv parameter serial_console cannot be true")
667
    console['host'] = i['pnode']
668
    console['port'] = i['network_port']
669

    
670
    return console
671

    
672

    
673
def get_instance_info(vm):
674
    with pooled_rapi_client(vm) as client:
675
        return client.GetInstance(vm.backend_vm_id)
676

    
677

    
678
def vm_exists_in_backend(vm):
679
    try:
680
        get_instance_info(vm)
681
        return True
682
    except GanetiApiError as e:
683
        if e.code == 404:
684
            return False
685
        raise e
686

    
687

    
688
def get_network_info(backend_network):
689
    with pooled_rapi_client(backend_network) as client:
690
        return client.GetNetwork(backend_network.network.backend_id)
691

    
692

    
693
def network_exists_in_backend(backend_network):
694
    try:
695
        get_network_info(backend_network)
696
        return True
697
    except GanetiApiError as e:
698
        if e.code == 404:
699
            return False
700

    
701

    
702
def create_network(network, backend, connect=True):
703
    """Create a network in a Ganeti backend"""
704
    log.debug("Creating network %s in backend %s", network, backend)
705

    
706
    job_id = _create_network(network, backend)
707

    
708
    if connect:
709
        job_ids = connect_network(network, backend, depends=[job_id])
710
        return job_ids
711
    else:
712
        return [job_id]
713

    
714

    
715
def _create_network(network, backend):
716
    """Create a network."""
717

    
718
    tags = network.backend_tag
719
    subnet = None
720
    subnet6 = None
721
    gateway = None
722
    gateway6 = None
723
    for _subnet in network.subnets.all():
724
        if _subnet.ipversion == 4:
725
            if _subnet.dhcp:
726
                tags.append('nfdhcpd')
727
                subnet = _subnet.cidr
728
                gateway = _subnet.gateway
729
        elif _subnet.ipversion == 6:
730
                subnet6 = _subnet.cidr
731
                gateway6 = _subnet.gateway
732

    
733
    if network.public:
734
        conflicts_check = True
735
        tags.append('public')
736
    else:
737
        conflicts_check = False
738
        tags.append('private')
739

    
740
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
741
    # not support IPv6 only networks. To bypass this limitation, we create the
742
    # network with a dummy network subnet, and make Cyclades connect instances
743
    # to such networks, with address=None.
744
    if subnet is None:
745
        subnet = "10.0.0.0/24"
746

    
747
    try:
748
        bn = BackendNetwork.objects.get(network=network, backend=backend)
749
        mac_prefix = bn.mac_prefix
750
    except BackendNetwork.DoesNotExist:
751
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
752
                        " does not exist" % (network.id, backend.id))
753

    
754
    with pooled_rapi_client(backend) as client:
755
        return client.CreateNetwork(network_name=network.backend_id,
756
                                    network=subnet,
757
                                    network6=subnet6,
758
                                    gateway=gateway,
759
                                    gateway6=gateway6,
760
                                    mac_prefix=mac_prefix,
761
                                    conflicts_check=conflicts_check,
762
                                    tags=tags)
763

    
764

    
765
def connect_network(network, backend, depends=[], group=None):
766
    """Connect a network to nodegroups."""
767
    log.debug("Connecting network %s to backend %s", network, backend)
768

    
769
    if network.public:
770
        conflicts_check = True
771
    else:
772
        conflicts_check = False
773

    
774
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
775
    with pooled_rapi_client(backend) as client:
776
        groups = [group] if group is not None else client.GetGroups()
777
        job_ids = []
778
        for group in groups:
779
            job_id = client.ConnectNetwork(network.backend_id, group,
780
                                           network.mode, network.link,
781
                                           conflicts_check,
782
                                           depends=depends)
783
            job_ids.append(job_id)
784
    return job_ids
785

    
786

    
787
def delete_network(network, backend, disconnect=True):
788
    log.debug("Deleting network %s from backend %s", network, backend)
789

    
790
    depends = []
791
    if disconnect:
792
        depends = disconnect_network(network, backend)
793
    _delete_network(network, backend, depends=depends)
794

    
795

    
796
def _delete_network(network, backend, depends=[]):
797
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
798
    with pooled_rapi_client(backend) as client:
799
        return client.DeleteNetwork(network.backend_id, depends)
800

    
801

    
802
def disconnect_network(network, backend, group=None):
803
    log.debug("Disconnecting network %s to backend %s", network, backend)
804

    
805
    with pooled_rapi_client(backend) as client:
806
        groups = [group] if group is not None else client.GetGroups()
807
        job_ids = []
808
        for group in groups:
809
            job_id = client.DisconnectNetwork(network.backend_id, group)
810
            job_ids.append(job_id)
811
    return job_ids
812

    
813

    
814
def connect_to_network(vm, nic):
815
    network = nic.network
816
    backend = vm.backend
817
    network = Network.objects.select_for_update().get(id=network.id)
818
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
819
                                                         network=network)
820
    depend_jobs = []
821
    if bnet.operstate != "ACTIVE":
822
        depend_jobs = create_network(network, backend, connect=True)
823

    
824
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
825

    
826
    nic = {'name': nic.backend_uuid,
827
           'network': network.backend_id,
828
           'ip': nic.ipv4_address}
829

    
830
    log.debug("Adding NIC %s to VM %s", nic, vm)
831

    
832
    kwargs = {
833
        "instance": vm.backend_vm_id,
834
        "nics": [("add", "-1", nic)],
835
        "depends": depends,
836
    }
837
    if vm.backend.use_hotplug():
838
        kwargs["hotplug"] = True
839
    if settings.TEST:
840
        kwargs["dry_run"] = True
841

    
842
    with pooled_rapi_client(vm) as client:
843
        return client.ModifyInstance(**kwargs)
844

    
845

    
846
def disconnect_from_network(vm, nic):
847
    log.debug("Removing NIC %s of VM %s", nic, vm)
848

    
849
    kwargs = {
850
        "instance": vm.backend_vm_id,
851
        "nics": [("remove", nic.backend_uuid, {})],
852
    }
853
    if vm.backend.use_hotplug():
854
        kwargs["hotplug"] = True
855
    if settings.TEST:
856
        kwargs["dry_run"] = True
857

    
858
    with pooled_rapi_client(vm) as client:
859
        jobID = client.ModifyInstance(**kwargs)
860
        firewall_profile = nic.firewall_profile
861
        if firewall_profile and firewall_profile != "DISABLED":
862
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
863
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
864
                                      dry_run=settings.TEST)
865

    
866
        return jobID
867

    
868

    
869
def set_firewall_profile(vm, profile, nic):
870
    uuid = nic.backend_uuid
871
    try:
872
        tag = _firewall_tags[profile] % uuid
873
    except KeyError:
874
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
875

    
876
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
877

    
878
    with pooled_rapi_client(vm) as client:
879
        # Delete previous firewall tags
880
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
881
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
882
                       if (t % uuid) in old_tags]
883
        if delete_tags:
884
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
885
                                      dry_run=settings.TEST)
886

    
887
        if profile != "DISABLED":
888
            client.AddInstanceTags(vm.backend_vm_id, [tag],
889
                                   dry_run=settings.TEST)
890

    
891
        # XXX NOP ModifyInstance call to force process_net_status to run
892
        # on the dispatcher
893
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
894
        client.ModifyInstance(vm.backend_vm_id,
895
                              os_name=os_name)
896
    return None
897

    
898

    
899
def get_instances(backend, bulk=True):
900
    with pooled_rapi_client(backend) as c:
901
        return c.GetInstances(bulk=bulk)
902

    
903

    
904
def get_nodes(backend, bulk=True):
905
    with pooled_rapi_client(backend) as c:
906
        return c.GetNodes(bulk=bulk)
907

    
908

    
909
def get_jobs(backend, bulk=True):
910
    with pooled_rapi_client(backend) as c:
911
        return c.GetJobs(bulk=bulk)
912

    
913

    
914
def get_physical_resources(backend):
915
    """ Get the physical resources of a backend.
916

917
    Get the resources of a backend as reported by the backend (not the db).
918

919
    """
920
    nodes = get_nodes(backend, bulk=True)
921
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
922
    res = {}
923
    for a in attr:
924
        res[a] = 0
925
    for n in nodes:
926
        # Filter out drained, offline and not vm_capable nodes since they will
927
        # not take part in the vm allocation process
928
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
929
        if can_host_vms and n['cnodes']:
930
            for a in attr:
931
                res[a] += int(n[a] or 0)
932
    return res
933

    
934

    
935
def update_backend_resources(backend, resources=None):
936
    """ Update the state of the backend resources in db.
937

938
    """
939

    
940
    if not resources:
941
        resources = get_physical_resources(backend)
942

    
943
    backend.mfree = resources['mfree']
944
    backend.mtotal = resources['mtotal']
945
    backend.dfree = resources['dfree']
946
    backend.dtotal = resources['dtotal']
947
    backend.pinst_cnt = resources['pinst_cnt']
948
    backend.ctotal = resources['ctotal']
949
    backend.updated = datetime.now()
950
    backend.save()
951

    
952

    
953
def get_memory_from_instances(backend):
954
    """ Get the memory that is used from instances.
955

956
    Get the used memory of a backend. Note: This is different for
957
    the real memory used, due to kvm's memory de-duplication.
958

959
    """
960
    with pooled_rapi_client(backend) as client:
961
        instances = client.GetInstances(bulk=True)
962
    mem = 0
963
    for i in instances:
964
        mem += i['oper_ram']
965
    return mem
966

    
967

    
968
def get_available_disk_templates(backend):
969
    """Get the list of available disk templates of a Ganeti backend.
970

971
    The list contains the disk templates that are enabled in the Ganeti backend
972
    and also included in ipolicy-disk-templates.
973

974
    """
975
    with pooled_rapi_client(backend) as c:
976
        info = c.GetInfo()
977
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
978
    try:
979
        enabled_disk_templates = info["enabled_disk_templates"]
980
        return [dp for dp in enabled_disk_templates
981
                if dp in ipolicy_disk_templates]
982
    except KeyError:
983
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
984
        return ipolicy_disk_templates
985

    
986

    
987
def update_backend_disk_templates(backend):
988
    disk_templates = get_available_disk_templates(backend)
989
    backend.disk_templates = disk_templates
990
    backend.save()
991

    
992

    
993
##
994
## Synchronized operations for reconciliation
995
##
996

    
997

    
998
def create_network_synced(network, backend):
999
    result = _create_network_synced(network, backend)
1000
    if result[0] != 'success':
1001
        return result
1002
    result = connect_network_synced(network, backend)
1003
    return result
1004

    
1005

    
1006
def _create_network_synced(network, backend):
1007
    with pooled_rapi_client(backend) as client:
1008
        job = _create_network(network, backend)
1009
        result = wait_for_job(client, job)
1010
    return result
1011

    
1012

    
1013
def connect_network_synced(network, backend):
1014
    with pooled_rapi_client(backend) as client:
1015
        for group in client.GetGroups():
1016
            job = client.ConnectNetwork(network.backend_id, group,
1017
                                        network.mode, network.link)
1018
            result = wait_for_job(client, job)
1019
            if result[0] != 'success':
1020
                return result
1021

    
1022
    return result
1023

    
1024

    
1025
def wait_for_job(client, jobid):
1026
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1027
    status = result['job_info'][0]
1028
    while status not in ['success', 'error', 'cancel']:
1029
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1030
                                         [result], None)
1031
        status = result['job_info'][0]
1032

    
1033
    if status == 'success':
1034
        return (status, None)
1035
    else:
1036
        error = result['job_info'][1]
1037
        return (status, error)