Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ e6fbada1

History | View | Annotate | Download (36.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
NIC_FIELDS = ["state", "mac", "ipv4", "ipv6", "network", "firewall_profile"]
60

    
61

    
62
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
63
    """Handle quotas for updated VirtualMachine.
64

65
    Update quotas for the updated VirtualMachine based on the job that run on
66
    the Ganeti backend. If a commission has been already issued for this job,
67
    then this commission is just accepted or rejected based on the job status.
68
    Otherwise, a new commission for the given change is issued, that is also in
69
    force and auto-accept mode. In this case, previous commissions are
70
    rejected, since they reflect a previous state of the VM.
71

72
    """
73
    if job_status not in ["success", "error", "canceled"]:
74
        return vm
75

    
76
    # Check successful completion of a job will trigger any quotable change in
77
    # the VM state.
78
    action = utils.get_action_from_opcode(job_opcode, job_fields)
79
    if action == "BUILD":
80
        # Quotas for new VMs are automatically accepted by the API
81
        return vm
82
    commission_info = quotas.get_commission_info(vm, action=action,
83
                                                 action_fields=job_fields)
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == "success":
92
            quotas.accept_serial(serial)
93
        elif job_status in ["error", "canceled"]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_serial(serial)
97
        vm.serial = None
98
    elif job_status == "success" and commission_info is not None:
99
        log.debug("Expected job was %s. Processing job %s. Commission for"
100
                  " this job: %s", vm.task_job_id, job_id, commission_info)
101
        # Commission for this change has not been issued, or the issued
102
        # commission was unaware of the current change. Reject all previous
103
        # commissions and create a new one in forced mode!
104
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
105
                           % (vm, job_id))
106
        quotas.handle_resource_commission(vm, action,
107
                                          commission_info=commission_info,
108
                                          commission_name=commission_name,
109
                                          force=True,
110
                                          auto_accept=True)
111
        log.debug("Issued new commission: %s", vm.serial)
112

    
113
    return vm
114

    
115

    
116
@transaction.commit_on_success
117
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
118
                      job_fields=None):
119
    """Process a job progress notification from the backend
120

121
    Process an incoming message from the backend (currently Ganeti).
122
    Job notifications with a terminating status (sucess, error, or canceled),
123
    also update the operating state of the VM.
124

125
    """
126
    # See #1492, #1031, #1111 why this line has been removed
127
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
128
    if status not in [x[0] for x in BACKEND_STATUSES]:
129
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
130

    
131
    vm.backendjobid = jobid
132
    vm.backendjobstatus = status
133
    vm.backendopcode = opcode
134
    vm.backendlogmsg = logmsg
135

    
136
    if status in ["queued", "waiting", "running"]:
137
        vm.save()
138
        return
139

    
140
    if job_fields is None:
141
        job_fields = {}
142
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
143

    
144
    # Notifications of success change the operating state
145
    if status == "success":
146
        if state_for_success is not None:
147
            vm.operstate = state_for_success
148
        beparams = job_fields.get("beparams", None)
149
        if beparams:
150
            # Change the flavor of the VM
151
            _process_resize(vm, beparams)
152
        # Update backendtime only for jobs that have been successfully
153
        # completed, since only these jobs update the state of the VM. Else a
154
        # "race condition" may occur when a successful job (e.g.
155
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
156
        # in reversed order.
157
        vm.backendtime = etime
158

    
159
    if status in ["success", "error", "canceled"] and nics is not None:
160
        # Update the NICs of the VM
161
        _process_net_status(vm, etime, nics)
162

    
163
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
164
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
165
        vm.operstate = 'ERROR'
166
        vm.backendtime = etime
167
    elif opcode == 'OP_INSTANCE_REMOVE':
168
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
169
        # when no instance exists at the Ganeti backend.
170
        # See ticket #799 for all the details.
171
        if status == 'success' or (status == 'error' and
172
                                   not vm_exists_in_backend(vm)):
173
            # VM has been deleted
174
            for nic in vm.nics.all():
175
                # Release the IP
176
                release_nic_address(nic)
177
                # And delete the NIC.
178
                nic.delete()
179
            vm.deleted = True
180
            vm.operstate = state_for_success
181
            vm.backendtime = etime
182
            status = "success"
183

    
184
    if status in ["success", "error", "canceled"]:
185
        # Job is finalized: Handle quotas/commissioning
186
        fields = {"nics": nics, "beparams": beparams}
187
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
188
                              job_status=status, job_fields=fields)
189
        # and clear task fields
190
        if vm.task_job_id == jobid:
191
            vm.task = None
192
            vm.task_job_id = None
193

    
194
    vm.save()
195

    
196

    
197
def _process_resize(vm, beparams):
198
    """Change flavor of a VirtualMachine based on new beparams."""
199
    old_flavor = vm.flavor
200
    vcpus = beparams.get("vcpus", old_flavor.cpu)
201
    ram = beparams.get("maxmem", old_flavor.ram)
202
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
203
        return
204
    try:
205
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
206
                                        disk=old_flavor.disk,
207
                                        disk_template=old_flavor.disk_template)
208
    except Flavor.DoesNotExist:
209
        raise Exception("Can not find flavor for VM")
210
    vm.flavor = new_flavor
211
    vm.save()
212

    
213

    
214
@transaction.commit_on_success
215
def process_net_status(vm, etime, nics):
216
    """Wrap _process_net_status inside transaction."""
217
    _process_net_status(vm, etime, nics)
218

    
219

    
220
def _process_net_status(vm, etime, nics):
221
    """Process a net status notification from the backend
222

223
    Process an incoming message from the Ganeti backend,
224
    detailing the NIC configuration of a VM instance.
225

226
    Update the state of the VM in the DB accordingly.
227

228
    """
229
    ganeti_nics = process_ganeti_nics(nics)
230
    db_nics = dict([(nic.id, nic) for nic in vm.nics.all()])
231

    
232
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
233
    # guarantee that no deadlock will occur with Backend allocator.
234
    Backend.objects.select_for_update().get(id=vm.backend_id)
235

    
236
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
237
        db_nic = db_nics.get(nic_name)
238
        ganeti_nic = ganeti_nics.get(nic_name)
239
        if ganeti_nic is None:
240
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
241
            # state for more than 5 minutes, then we remove the NIC.
242
            # TODO: This is dangerous as the job may be stack in the queue, and
243
            # releasing the IP may lead to duplicate IP use.
244
            if nic.state != "BUILDING" or (nic.state == "BUILDING" and
245
               etime > nic.created + timedelta(minutes=5)):
246
                release_nic_address(nic)
247
                nic.delete()
248
            else:
249
                log.warning("Ignoring recent building NIC: %s", nic)
250
        elif db_nic is None:
251
            # NIC exists in Ganeti but not in DB
252
            if ganeti_nic["ipv4"]:
253
                network = ganeti_nic["network"]
254
                network.reserve_address(ganeti_nic["ipv4"])
255
            vm.nics.create(**ganeti_nic)
256
        elif not nics_are_equal(db_nic, ganeti_nic):
257
            # Special case where the IPv4 address has changed, because you
258
            # need to release the old IPv4 address and reserve the new one
259
            if db_nic.ipv4 != ganeti_nic["ipv4"]:
260
                release_nic_address(db_nic)
261
                if ganeti_nic["ipv4"]:
262
                    ganeti_nic["network"].reserve_address(ganeti_nic["ipv4"])
263

    
264
            # Update the NIC in DB with the values from Ganeti NIC
265
            [setattr(db_nic, f, ganeti_nic[f]) for f in NIC_FIELDS]
266
            db_nic.save()
267

    
268
            # Dummy update the network, to work with 'changed-since'
269
            db_nic.network.save()
270

    
271
    vm.backendtime = etime
272
    vm.save()
273

    
274

    
275
def nics_are_equal(db_nic, gnt_nic):
276
    for field in NIC_FIELDS:
277
        if getattr(db_nic, field) != gnt_nic[field]:
278
            return False
279
    return True
280

    
281

    
282
def process_ganeti_nics(ganeti_nics):
283
    """Process NIC dict from ganeti"""
284
    new_nics = []
285
    for index, gnic in enumerate(ganeti_nics):
286
        nic_name = gnic.get("name", None)
287
        if nic_name is not None:
288
            nic_id = utils.id_from_nic_name(nic_name)
289
        else:
290
            # Put as default value the index. If it is an unknown NIC to
291
            # synnefo it will be created automaticaly.
292
            nic_id = "unknown-" + str(index)
293
        network_name = gnic.get('network', '')
294
        network_id = utils.id_from_network_name(network_name)
295
        network = Network.objects.get(id=network_id)
296

    
297
        # Get the new nic info
298
        mac = gnic.get('mac')
299
        ipv4 = gnic.get('ip')
300
        ipv6 = mac2eui64(mac, network.subnet6)\
301
            if network.subnet6 is not None else None
302

    
303
        firewall = gnic.get('firewall')
304
        firewall_profile = _reverse_tags.get(firewall)
305
        if not firewall_profile and network.public:
306
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
307

    
308
        nic_info = {
309
            'index': index,
310
            'network': network,
311
            'mac': mac,
312
            'ipv4': ipv4,
313
            'ipv6': ipv6,
314
            'firewall_profile': firewall_profile,
315
            'state': 'ACTIVE'}
316

    
317
        new_nics.append((nic_id, nic_info))
318
    return dict(new_nics)
319

    
320

    
321
def release_nic_address(nic):
322
    """Release the IPv4 address of a NIC.
323

324
    Check if an instance's NIC has an IPv4 address and release it if it is not
325
    a Floating IP.
326

327
    """
328

    
329
    if nic.ipv4 and not nic.ip_type == "FLOATING":
330
        nic.network.release_address(nic.ipv4)
331

    
332

    
333
@transaction.commit_on_success
334
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
335
    if status not in [x[0] for x in BACKEND_STATUSES]:
336
        raise Network.InvalidBackendMsgError(opcode, status)
337

    
338
    back_network.backendjobid = jobid
339
    back_network.backendjobstatus = status
340
    back_network.backendopcode = opcode
341
    back_network.backendlogmsg = logmsg
342

    
343
    network = back_network.network
344

    
345
    # Notifications of success change the operating state
346
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
347
    if status == 'success' and state_for_success is not None:
348
        back_network.operstate = state_for_success
349

    
350
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
351
        back_network.operstate = 'ERROR'
352
        back_network.backendtime = etime
353

    
354
    if opcode == 'OP_NETWORK_REMOVE':
355
        network_is_deleted = (status == "success")
356
        if network_is_deleted or (status == "error" and not
357
                                  network_exists_in_backend(back_network)):
358
            back_network.operstate = state_for_success
359
            back_network.deleted = True
360
            back_network.backendtime = etime
361

    
362
    if status == 'success':
363
        back_network.backendtime = etime
364
    back_network.save()
365
    # Also you must update the state of the Network!!
366
    update_network_state(network)
367

    
368

    
369
def update_network_state(network):
370
    """Update the state of a Network based on BackendNetwork states.
371

372
    Update the state of a Network based on the operstate of the networks in the
373
    backends that network exists.
374

375
    The state of the network is:
376
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
377
    * DELETED: If it is is 'DELETED' in all backends that have been created.
378

379
    This function also releases the resources (MAC prefix or Bridge) and the
380
    quotas for the network.
381

382
    """
383
    if network.deleted:
384
        # Network has already been deleted. Just assert that state is also
385
        # DELETED
386
        if not network.state == "DELETED":
387
            network.state = "DELETED"
388
            network.save()
389
        return
390

    
391
    backend_states = [s.operstate for s in network.backend_networks.all()]
392
    if not backend_states and network.action != "DESTROY":
393
        if network.state != "ACTIVE":
394
            network.state = "ACTIVE"
395
            network.save()
396
            return
397

    
398
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
399
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
400
                     "DELETED")
401

    
402
    # Release the resources on the deletion of the Network
403
    if deleted:
404
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
405
                 network.id, network.mac_prefix, network.link)
406
        network.deleted = True
407
        network.state = "DELETED"
408
        if network.mac_prefix:
409
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
410
                release_resource(res_type="mac_prefix",
411
                                 value=network.mac_prefix)
412
        if network.link:
413
            if network.FLAVORS[network.flavor]["link"] == "pool":
414
                release_resource(res_type="bridge", value=network.link)
415

    
416
        # Issue commission
417
        if network.userid:
418
            quotas.issue_and_accept_commission(network, delete=True)
419
            # the above has already saved the object and committed;
420
            # a second save would override others' changes, since the
421
            # object is now unlocked
422
            return
423
        elif not network.public:
424
            log.warning("Network %s does not have an owner!", network.id)
425
    network.save()
426

    
427

    
428
@transaction.commit_on_success
429
def process_network_modify(back_network, etime, jobid, opcode, status,
430
                           job_fields):
431
    assert (opcode == "OP_NETWORK_SET_PARAMS")
432
    if status not in [x[0] for x in BACKEND_STATUSES]:
433
        raise Network.InvalidBackendMsgError(opcode, status)
434

    
435
    back_network.backendjobid = jobid
436
    back_network.backendjobstatus = status
437
    back_network.opcode = opcode
438

    
439
    add_reserved_ips = job_fields.get("add_reserved_ips")
440
    if add_reserved_ips:
441
        net = back_network.network
442
        pool = net.get_pool()
443
        if add_reserved_ips:
444
            for ip in add_reserved_ips:
445
                pool.reserve(ip, external=True)
446
        pool.save()
447

    
448
    if status == 'success':
449
        back_network.backendtime = etime
450
    back_network.save()
451

    
452

    
453
@transaction.commit_on_success
454
def process_create_progress(vm, etime, progress):
455

    
456
    percentage = int(progress)
457

    
458
    # The percentage may exceed 100%, due to the way
459
    # snf-image:copy-progress tracks bytes read by image handling processes
460
    percentage = 100 if percentage > 100 else percentage
461
    if percentage < 0:
462
        raise ValueError("Percentage cannot be negative")
463

    
464
    # FIXME: log a warning here, see #1033
465
#   if last_update > percentage:
466
#       raise ValueError("Build percentage should increase monotonically " \
467
#                        "(old = %d, new = %d)" % (last_update, percentage))
468

    
469
    # This assumes that no message of type 'ganeti-create-progress' is going to
470
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
471
    # the instance is STARTED.  What if the two messages are processed by two
472
    # separate dispatcher threads, and the 'ganeti-op-status' message for
473
    # successful creation gets processed before the 'ganeti-create-progress'
474
    # message? [vkoukis]
475
    #
476
    #if not vm.operstate == 'BUILD':
477
    #    raise VirtualMachine.IllegalState("VM is not in building state")
478

    
479
    vm.buildpercentage = percentage
480
    vm.backendtime = etime
481
    vm.save()
482

    
483

    
484
@transaction.commit_on_success
485
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
486
                               details=None):
487
    """
488
    Create virtual machine instance diagnostic entry.
489

490
    :param vm: VirtualMachine instance to create diagnostic for.
491
    :param message: Diagnostic message.
492
    :param source: Diagnostic source identifier (e.g. image-helper).
493
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
494
    :param etime: The time the message occured (if available).
495
    :param details: Additional details or debug information.
496
    """
497
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
498
                                                   source_date=etime,
499
                                                   message=message,
500
                                                   details=details)
501

    
502

    
503
def create_instance(vm, nics, flavor, image):
504
    """`image` is a dictionary which should contain the keys:
505
            'backend_id', 'format' and 'metadata'
506

507
        metadata value should be a dictionary.
508
    """
509

    
510
    # Handle arguments to CreateInstance() as a dictionary,
511
    # initialize it based on a deployment-specific value.
512
    # This enables the administrator to override deployment-specific
513
    # arguments, such as the disk template to use, name of os provider
514
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
515
    #
516
    kw = vm.backend.get_create_params()
517
    kw['mode'] = 'create'
518
    kw['name'] = vm.backend_vm_id
519
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
520

    
521
    kw['disk_template'] = flavor.disk_template
522
    kw['disks'] = [{"size": flavor.disk * 1024}]
523
    provider = flavor.disk_provider
524
    if provider:
525
        kw['disks'][0]['provider'] = provider
526
        kw['disks'][0]['origin'] = flavor.disk_origin
527

    
528
    kw['nics'] = [{"name": nic.backend_uuid,
529
                   "network": nic.network.backend_id,
530
                   "ip": nic.ipv4}
531
                  for nic in nics]
532
    backend = vm.backend
533
    depend_jobs = []
534
    for nic in nics:
535
        network = Network.objects.select_for_update().get(id=nic.network.id)
536
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
537
                                                             network=network)
538
        if bnet.operstate != "ACTIVE":
539
            if network.public:
540
                msg = "Can not connect instance to network %s. Network is not"\
541
                      " ACTIVE in backend %s." % (network, backend)
542
                raise Exception(msg)
543
            else:
544
                jobs = create_network(network, backend, connect=True)
545
                if isinstance(jobs, list):
546
                    depend_jobs.extend(jobs)
547
                else:
548
                    depend_jobs.append(jobs)
549
    kw["depends"] = [[job, ["success", "error", "canceled"]]
550
                     for job in depend_jobs]
551

    
552
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
553
    # kw['os'] = settings.GANETI_OS_PROVIDER
554
    kw['ip_check'] = False
555
    kw['name_check'] = False
556

    
557
    # Do not specific a node explicitly, have
558
    # Ganeti use an iallocator instead
559
    #kw['pnode'] = rapi.GetNodes()[0]
560

    
561
    kw['dry_run'] = settings.TEST
562

    
563
    kw['beparams'] = {
564
        'auto_balance': True,
565
        'vcpus': flavor.cpu,
566
        'memory': flavor.ram}
567

    
568
    kw['osparams'] = {
569
        'config_url': vm.config_url,
570
        # Store image id and format to Ganeti
571
        'img_id': image['backend_id'],
572
        'img_format': image['format']}
573

    
574
    # Use opportunistic locking
575
    kw['opportunistic_locking'] = True
576

    
577
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
578
    # kw['hvparams'] = dict(serial_console=False)
579

    
580
    log.debug("Creating instance %s", utils.hide_pass(kw))
581
    with pooled_rapi_client(vm) as client:
582
        return client.CreateInstance(**kw)
583

    
584

    
585
def delete_instance(vm):
586
    with pooled_rapi_client(vm) as client:
587
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
588

    
589

    
590
def reboot_instance(vm, reboot_type):
591
    assert reboot_type in ('soft', 'hard')
592
    kwargs = {"instance": vm.backend_vm_id,
593
              "reboot_type": "hard"}
594
    # XXX: Currently shutdown_timeout parameter is not supported from the
595
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
596
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
597
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
598
    # of OS API is different from the one in Ganeti, and maps to
599
    # 'shutdown_timeout'.
600
    #if reboot_type == "hard":
601
    #    kwargs["shutdown_timeout"] = 0
602
    if settings.TEST:
603
        kwargs["dry_run"] = True
604
    with pooled_rapi_client(vm) as client:
605
        return client.RebootInstance(**kwargs)
606

    
607

    
608
def startup_instance(vm):
609
    with pooled_rapi_client(vm) as client:
610
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
611

    
612

    
613
def shutdown_instance(vm):
614
    with pooled_rapi_client(vm) as client:
615
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
616

    
617

    
618
def resize_instance(vm, vcpus, memory):
619
    beparams = {"vcpus": int(vcpus),
620
                "minmem": int(memory),
621
                "maxmem": int(memory)}
622
    with pooled_rapi_client(vm) as client:
623
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
624

    
625

    
626
def get_instance_console(vm):
627
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
628
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
629
    # useless (see #783).
630
    #
631
    # Until this is fixed on the Ganeti side, construct a console info reply
632
    # directly.
633
    #
634
    # WARNING: This assumes that VNC runs on port network_port on
635
    #          the instance's primary node, and is probably
636
    #          hypervisor-specific.
637
    #
638
    log.debug("Getting console for vm %s", vm)
639

    
640
    console = {}
641
    console['kind'] = 'vnc'
642

    
643
    with pooled_rapi_client(vm) as client:
644
        i = client.GetInstance(vm.backend_vm_id)
645

    
646
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
647
        raise Exception("hv parameter serial_console cannot be true")
648
    console['host'] = i['pnode']
649
    console['port'] = i['network_port']
650

    
651
    return console
652

    
653

    
654
def get_instance_info(vm):
655
    with pooled_rapi_client(vm) as client:
656
        return client.GetInstance(vm.backend_vm_id)
657

    
658

    
659
def vm_exists_in_backend(vm):
660
    try:
661
        get_instance_info(vm)
662
        return True
663
    except GanetiApiError as e:
664
        if e.code == 404:
665
            return False
666
        raise e
667

    
668

    
669
def get_network_info(backend_network):
670
    with pooled_rapi_client(backend_network) as client:
671
        return client.GetNetwork(backend_network.network.backend_id)
672

    
673

    
674
def network_exists_in_backend(backend_network):
675
    try:
676
        get_network_info(backend_network)
677
        return True
678
    except GanetiApiError as e:
679
        if e.code == 404:
680
            return False
681

    
682

    
683
def create_network(network, backend, connect=True):
684
    """Create a network in a Ganeti backend"""
685
    log.debug("Creating network %s in backend %s", network, backend)
686

    
687
    job_id = _create_network(network, backend)
688

    
689
    if connect:
690
        job_ids = connect_network(network, backend, depends=[job_id])
691
        return job_ids
692
    else:
693
        return [job_id]
694

    
695

    
696
def _create_network(network, backend):
697
    """Create a network."""
698

    
699
    tags = network.backend_tag
700
    if network.dhcp:
701
        tags.append('nfdhcpd')
702

    
703
    if network.public:
704
        conflicts_check = True
705
        tags.append('public')
706
    else:
707
        conflicts_check = False
708
        tags.append('private')
709

    
710
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
711
    # not support IPv6 only networks. To bypass this limitation, we create the
712
    # network with a dummy network subnet, and make Cyclades connect instances
713
    # to such networks, with address=None.
714
    subnet = network.subnet
715
    if subnet is None:
716
        subnet = "10.0.0.0/24"
717

    
718
    try:
719
        bn = BackendNetwork.objects.get(network=network, backend=backend)
720
        mac_prefix = bn.mac_prefix
721
    except BackendNetwork.DoesNotExist:
722
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
723
                        " does not exist" % (network.id, backend.id))
724

    
725
    with pooled_rapi_client(backend) as client:
726
        return client.CreateNetwork(network_name=network.backend_id,
727
                                    network=subnet,
728
                                    network6=network.subnet6,
729
                                    gateway=network.gateway,
730
                                    gateway6=network.gateway6,
731
                                    mac_prefix=mac_prefix,
732
                                    conflicts_check=conflicts_check,
733
                                    tags=tags)
734

    
735

    
736
def connect_network(network, backend, depends=[], group=None):
737
    """Connect a network to nodegroups."""
738
    log.debug("Connecting network %s to backend %s", network, backend)
739

    
740
    if network.public:
741
        conflicts_check = True
742
    else:
743
        conflicts_check = False
744

    
745
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
746
    with pooled_rapi_client(backend) as client:
747
        groups = [group] if group is not None else client.GetGroups()
748
        job_ids = []
749
        for group in groups:
750
            job_id = client.ConnectNetwork(network.backend_id, group,
751
                                           network.mode, network.link,
752
                                           conflicts_check,
753
                                           depends=depends)
754
            job_ids.append(job_id)
755
    return job_ids
756

    
757

    
758
def delete_network(network, backend, disconnect=True):
759
    log.debug("Deleting network %s from backend %s", network, backend)
760

    
761
    depends = []
762
    if disconnect:
763
        depends = disconnect_network(network, backend)
764
    _delete_network(network, backend, depends=depends)
765

    
766

    
767
def _delete_network(network, backend, depends=[]):
768
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
769
    with pooled_rapi_client(backend) as client:
770
        return client.DeleteNetwork(network.backend_id, depends)
771

    
772

    
773
def disconnect_network(network, backend, group=None):
774
    log.debug("Disconnecting network %s to backend %s", network, backend)
775

    
776
    with pooled_rapi_client(backend) as client:
777
        groups = [group] if group is not None else client.GetGroups()
778
        job_ids = []
779
        for group in groups:
780
            job_id = client.DisconnectNetwork(network.backend_id, group)
781
            job_ids.append(job_id)
782
    return job_ids
783

    
784

    
785
def connect_to_network(vm, nic):
786
    network = nic.network
787
    backend = vm.backend
788
    network = Network.objects.select_for_update().get(id=network.id)
789
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
790
                                                         network=network)
791
    depend_jobs = []
792
    if bnet.operstate != "ACTIVE":
793
        depend_jobs = create_network(network, backend, connect=True)
794

    
795
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
796

    
797
    nic = {'name': nic.backend_uuid,
798
           'network': network.backend_id,
799
           'ip': nic.ipv4}
800

    
801
    log.debug("Adding NIC %s to VM %s", nic, vm)
802

    
803
    kwargs = {
804
        "instance": vm.backend_vm_id,
805
        "nics": [("add", "-1", nic)],
806
        "depends": depends,
807
    }
808
    if vm.backend.use_hotplug():
809
        kwargs["hotplug"] = True
810
    if settings.TEST:
811
        kwargs["dry_run"] = True
812

    
813
    with pooled_rapi_client(vm) as client:
814
        return client.ModifyInstance(**kwargs)
815

    
816

    
817
def disconnect_from_network(vm, nic):
818
    log.debug("Removing NIC %s of VM %s", nic, vm)
819

    
820
    kwargs = {
821
        "instance": vm.backend_vm_id,
822
        "nics": [("remove", nic.index, {})],
823
    }
824
    if vm.backend.use_hotplug():
825
        kwargs["hotplug"] = True
826
    if settings.TEST:
827
        kwargs["dry_run"] = True
828

    
829
    with pooled_rapi_client(vm) as client:
830
        jobID = client.ModifyInstance(**kwargs)
831
        # If the NIC has a tag for a firewall profile it must be deleted,
832
        # otherwise it may affect another NIC. XXX: Deleting the tag should
833
        # depend on the removing the NIC, but currently RAPI client does not
834
        # support this, this may result in clearing the firewall profile
835
        # without successfully removing the NIC. This issue will be fixed with
836
        # use of NIC UUIDs.
837
        firewall_profile = nic.firewall_profile
838
        if firewall_profile and firewall_profile != "DISABLED":
839
            tag = _firewall_tags[firewall_profile] % nic.index
840
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
841
                                      dry_run=settings.TEST)
842

    
843
        return jobID
844

    
845

    
846
def set_firewall_profile(vm, profile, index=0):
847
    try:
848
        tag = _firewall_tags[profile] % index
849
    except KeyError:
850
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
851

    
852
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
853

    
854
    with pooled_rapi_client(vm) as client:
855
        # Delete previous firewall tags
856
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
857
        delete_tags = [(t % index) for t in _firewall_tags.values()
858
                       if (t % index) in old_tags]
859
        if delete_tags:
860
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
861
                                      dry_run=settings.TEST)
862

    
863
        if profile != "DISABLED":
864
            client.AddInstanceTags(vm.backend_vm_id, [tag],
865
                                   dry_run=settings.TEST)
866

    
867
        # XXX NOP ModifyInstance call to force process_net_status to run
868
        # on the dispatcher
869
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
870
        client.ModifyInstance(vm.backend_vm_id,
871
                              os_name=os_name)
872
    return None
873

    
874

    
875
def get_instances(backend, bulk=True):
876
    with pooled_rapi_client(backend) as c:
877
        return c.GetInstances(bulk=bulk)
878

    
879

    
880
def get_nodes(backend, bulk=True):
881
    with pooled_rapi_client(backend) as c:
882
        return c.GetNodes(bulk=bulk)
883

    
884

    
885
def get_jobs(backend, bulk=True):
886
    with pooled_rapi_client(backend) as c:
887
        return c.GetJobs(bulk=bulk)
888

    
889

    
890
def get_physical_resources(backend):
891
    """ Get the physical resources of a backend.
892

893
    Get the resources of a backend as reported by the backend (not the db).
894

895
    """
896
    nodes = get_nodes(backend, bulk=True)
897
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
898
    res = {}
899
    for a in attr:
900
        res[a] = 0
901
    for n in nodes:
902
        # Filter out drained, offline and not vm_capable nodes since they will
903
        # not take part in the vm allocation process
904
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
905
        if can_host_vms and n['cnodes']:
906
            for a in attr:
907
                res[a] += int(n[a])
908
    return res
909

    
910

    
911
def update_backend_resources(backend, resources=None):
912
    """ Update the state of the backend resources in db.
913

914
    """
915

    
916
    if not resources:
917
        resources = get_physical_resources(backend)
918

    
919
    backend.mfree = resources['mfree']
920
    backend.mtotal = resources['mtotal']
921
    backend.dfree = resources['dfree']
922
    backend.dtotal = resources['dtotal']
923
    backend.pinst_cnt = resources['pinst_cnt']
924
    backend.ctotal = resources['ctotal']
925
    backend.updated = datetime.now()
926
    backend.save()
927

    
928

    
929
def get_memory_from_instances(backend):
930
    """ Get the memory that is used from instances.
931

932
    Get the used memory of a backend. Note: This is different for
933
    the real memory used, due to kvm's memory de-duplication.
934

935
    """
936
    with pooled_rapi_client(backend) as client:
937
        instances = client.GetInstances(bulk=True)
938
    mem = 0
939
    for i in instances:
940
        mem += i['oper_ram']
941
    return mem
942

    
943

    
944
def get_available_disk_templates(backend):
945
    """Get the list of available disk templates of a Ganeti backend.
946

947
    The list contains the disk templates that are enabled in the Ganeti backend
948
    and also included in ipolicy-disk-templates.
949

950
    """
951
    with pooled_rapi_client(backend) as c:
952
        info = c.GetInfo()
953
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
954
    try:
955
        enabled_disk_templates = info["enabled_disk_templates"]
956
        return [dp for dp in enabled_disk_templates
957
                if dp in ipolicy_disk_templates]
958
    except KeyError:
959
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
960
        return ipolicy_disk_templates
961

    
962

    
963
def update_backend_disk_templates(backend):
964
    disk_templates = get_available_disk_templates(backend)
965
    backend.disk_templates = disk_templates
966
    backend.save()
967

    
968

    
969
##
970
## Synchronized operations for reconciliation
971
##
972

    
973

    
974
def create_network_synced(network, backend):
975
    result = _create_network_synced(network, backend)
976
    if result[0] != 'success':
977
        return result
978
    result = connect_network_synced(network, backend)
979
    return result
980

    
981

    
982
def _create_network_synced(network, backend):
983
    with pooled_rapi_client(backend) as client:
984
        job = _create_network(network, backend)
985
        result = wait_for_job(client, job)
986
    return result
987

    
988

    
989
def connect_network_synced(network, backend):
990
    with pooled_rapi_client(backend) as client:
991
        for group in client.GetGroups():
992
            job = client.ConnectNetwork(network.backend_id, group,
993
                                        network.mode, network.link)
994
            result = wait_for_job(client, job)
995
            if result[0] != 'success':
996
                return result
997

    
998
    return result
999

    
1000

    
1001
def wait_for_job(client, jobid):
1002
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1003
    status = result['job_info'][0]
1004
    while status not in ['success', 'error', 'cancel']:
1005
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1006
                                         [result], None)
1007
        status = result['job_info'][0]
1008

    
1009
    if status == 'success':
1010
        return (status, None)
1011
    else:
1012
        error = result['job_info'][1]
1013
        return (status, error)