Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ bfd04b01

History | View | Annotate | Download (36.2 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = 180
61

    
62
NIC_FIELDS = ["state", "mac", "ipv4", "ipv6", "network", "firewall_profile",
63
              "index"]
64

    
65

    
66
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
67
    """Handle quotas for updated VirtualMachine.
68

69
    Update quotas for the updated VirtualMachine based on the job that run on
70
    the Ganeti backend. If a commission has been already issued for this job,
71
    then this commission is just accepted or rejected based on the job status.
72
    Otherwise, a new commission for the given change is issued, that is also in
73
    force and auto-accept mode. In this case, previous commissions are
74
    rejected, since they reflect a previous state of the VM.
75

76
    """
77
    if job_status not in ["success", "error", "canceled"]:
78
        return vm
79

    
80
    # Check successful completion of a job will trigger any quotable change in
81
    # the VM state.
82
    action = utils.get_action_from_opcode(job_opcode, job_fields)
83
    if action == "BUILD":
84
        # Quotas for new VMs are automatically accepted by the API
85
        return vm
86
    commission_info = quotas.get_commission_info(vm, action=action,
87
                                                 action_fields=job_fields)
88

    
89
    if vm.task_job_id == job_id and vm.serial is not None:
90
        # Commission for this change has already been issued. So just
91
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
92
        # if fails, must be accepted, as the user must manually remove the
93
        # failed server
94
        serial = vm.serial
95
        if job_status == "success":
96
            quotas.accept_serial(serial)
97
        elif job_status in ["error", "canceled"]:
98
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
99
                      serial)
100
            quotas.reject_serial(serial)
101
        vm.serial = None
102
    elif job_status == "success" and commission_info is not None:
103
        log.debug("Expected job was %s. Processing job %s. Commission for"
104
                  " this job: %s", vm.task_job_id, job_id, commission_info)
105
        # Commission for this change has not been issued, or the issued
106
        # commission was unaware of the current change. Reject all previous
107
        # commissions and create a new one in forced mode!
108
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
109
                           % (vm, job_id))
110
        quotas.handle_resource_commission(vm, action,
111
                                          commission_info=commission_info,
112
                                          commission_name=commission_name,
113
                                          force=True,
114
                                          auto_accept=True)
115
        log.debug("Issued new commission: %s", vm.serial)
116

    
117
    return vm
118

    
119

    
120
@transaction.commit_on_success
121
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
123
    """Process a job progress notification from the backend
124

125
    Process an incoming message from the backend (currently Ganeti).
126
    Job notifications with a terminating status (sucess, error, or canceled),
127
    also update the operating state of the VM.
128

129
    """
130
    # See #1492, #1031, #1111 why this line has been removed
131
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
132
    if status not in [x[0] for x in BACKEND_STATUSES]:
133
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
134

    
135
    vm.backendjobid = jobid
136
    vm.backendjobstatus = status
137
    vm.backendopcode = opcode
138
    vm.backendlogmsg = logmsg
139

    
140
    if status in ["queued", "waiting", "running"]:
141
        vm.save()
142
        return
143

    
144
    if job_fields is None:
145
        job_fields = {}
146
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
147

    
148
    # Notifications of success change the operating state
149
    if status == "success":
150
        if state_for_success is not None:
151
            vm.operstate = state_for_success
152
        beparams = job_fields.get("beparams", None)
153
        if beparams:
154
            # Change the flavor of the VM
155
            _process_resize(vm, beparams)
156
        # Update backendtime only for jobs that have been successfully
157
        # completed, since only these jobs update the state of the VM. Else a
158
        # "race condition" may occur when a successful job (e.g.
159
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
160
        # in reversed order.
161
        vm.backendtime = etime
162

    
163
    if status in ["success", "error", "canceled"] and nics is not None:
164
        # Update the NICs of the VM
165
        _process_net_status(vm, etime, nics)
166

    
167
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
168
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
169
        vm.operstate = 'ERROR'
170
        vm.backendtime = etime
171
    elif opcode == 'OP_INSTANCE_REMOVE':
172
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
173
        # when no instance exists at the Ganeti backend.
174
        # See ticket #799 for all the details.
175
        if status == 'success' or (status == 'error' and
176
                                   not vm_exists_in_backend(vm)):
177
            # VM has been deleted
178
            for nic in vm.nics.all():
179
                # Release the IP
180
                release_nic_address(nic)
181
                # And delete the NIC.
182
                nic.delete()
183
            vm.deleted = True
184
            vm.operstate = state_for_success
185
            vm.backendtime = etime
186
            status = "success"
187

    
188
    if status in ["success", "error", "canceled"]:
189
        # Job is finalized: Handle quotas/commissioning
190
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
191
                              job_status=status, job_fields=job_fields)
192
        # and clear task fields
193
        if vm.task_job_id == jobid:
194
            vm.task = None
195
            vm.task_job_id = None
196

    
197
    vm.save()
198

    
199

    
200
def _process_resize(vm, beparams):
201
    """Change flavor of a VirtualMachine based on new beparams."""
202
    old_flavor = vm.flavor
203
    vcpus = beparams.get("vcpus", old_flavor.cpu)
204
    ram = beparams.get("maxmem", old_flavor.ram)
205
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
206
        return
207
    try:
208
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
209
                                        disk=old_flavor.disk,
210
                                        disk_template=old_flavor.disk_template)
211
    except Flavor.DoesNotExist:
212
        raise Exception("Can not find flavor for VM")
213
    vm.flavor = new_flavor
214
    vm.save()
215

    
216

    
217
@transaction.commit_on_success
218
def process_net_status(vm, etime, nics):
219
    """Wrap _process_net_status inside transaction."""
220
    _process_net_status(vm, etime, nics)
221

    
222

    
223
def _process_net_status(vm, etime, nics):
224
    """Process a net status notification from the backend
225

226
    Process an incoming message from the Ganeti backend,
227
    detailing the NIC configuration of a VM instance.
228

229
    Update the state of the VM in the DB accordingly.
230

231
    """
232
    ganeti_nics = process_ganeti_nics(nics)
233
    db_nics = dict([(nic.id, nic) for nic in vm.nics.all()])
234

    
235
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
236
    # guarantee that no deadlock will occur with Backend allocator.
237
    Backend.objects.select_for_update().get(id=vm.backend_id)
238

    
239
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
240
        db_nic = db_nics.get(nic_name)
241
        ganeti_nic = ganeti_nics.get(nic_name)
242
        if ganeti_nic is None:
243
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
244
            # state for more than 5 minutes, then we remove the NIC.
245
            # TODO: This is dangerous as the job may be stack in the queue, and
246
            # releasing the IP may lead to duplicate IP use.
247
            if nic.state != "BUILDING" or (nic.state == "BUILDING" and
248
               etime > nic.created + timedelta(seconds=BUILDING_NIC_TIMEOUT)):
249
                release_nic_address(nic)
250
                nic.delete()
251
            else:
252
                log.warning("Ignoring recent building NIC: %s", nic)
253
        elif db_nic is None:
254
            # NIC exists in Ganeti but not in DB
255
            if ganeti_nic["ipv4"]:
256
                network = ganeti_nic["network"]
257
                network.reserve_address(ganeti_nic["ipv4"])
258
            vm.nics.create(**ganeti_nic)
259
        elif not nics_are_equal(db_nic, ganeti_nic):
260
            # Special case where the IPv4 address has changed, because you
261
            # need to release the old IPv4 address and reserve the new one
262
            if db_nic.ipv4 != ganeti_nic["ipv4"]:
263
                release_nic_address(db_nic)
264
                if ganeti_nic["ipv4"]:
265
                    ganeti_nic["network"].reserve_address(ganeti_nic["ipv4"])
266

    
267
            # Update the NIC in DB with the values from Ganeti NIC
268
            [setattr(db_nic, f, ganeti_nic[f]) for f in NIC_FIELDS]
269
            db_nic.save()
270

    
271
            # Dummy update the network, to work with 'changed-since'
272
            db_nic.network.save()
273

    
274
    vm.backendtime = etime
275
    vm.save()
276

    
277

    
278
def nics_are_equal(db_nic, gnt_nic):
279
    for field in NIC_FIELDS:
280
        if getattr(db_nic, field) != gnt_nic[field]:
281
            return False
282
    return True
283

    
284

    
285
def process_ganeti_nics(ganeti_nics):
286
    """Process NIC dict from ganeti"""
287
    new_nics = []
288
    for index, gnic in enumerate(ganeti_nics):
289
        nic_name = gnic.get("name", None)
290
        if nic_name is not None:
291
            nic_id = utils.id_from_nic_name(nic_name)
292
        else:
293
            # Put as default value the index. If it is an unknown NIC to
294
            # synnefo it will be created automaticaly.
295
            nic_id = "unknown-" + str(index)
296
        network_name = gnic.get('network', '')
297
        network_id = utils.id_from_network_name(network_name)
298
        network = Network.objects.get(id=network_id)
299

    
300
        # Get the new nic info
301
        mac = gnic.get('mac')
302
        ipv4 = gnic.get('ip')
303
        ipv6 = mac2eui64(mac, network.subnet6)\
304
            if network.subnet6 is not None else None
305

    
306
        firewall = gnic.get('firewall')
307
        firewall_profile = _reverse_tags.get(firewall)
308
        if not firewall_profile and network.public:
309
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
310

    
311
        nic_info = {
312
            'index': index,
313
            'network': network,
314
            'mac': mac,
315
            'ipv4': ipv4,
316
            'ipv6': ipv6,
317
            'firewall_profile': firewall_profile,
318
            'state': 'ACTIVE'}
319

    
320
        new_nics.append((nic_id, nic_info))
321
    return dict(new_nics)
322

    
323

    
324
def release_nic_address(nic):
325
    """Release the IPv4 address of a NIC.
326

327
    Check if an instance's NIC has an IPv4 address and release it if it is not
328
    a Floating IP.
329

330
    """
331

    
332
    if nic.ipv4 and not nic.ip_type == "FLOATING":
333
        nic.network.release_address(nic.ipv4)
334

    
335

    
336
@transaction.commit_on_success
337
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
338
    if status not in [x[0] for x in BACKEND_STATUSES]:
339
        raise Network.InvalidBackendMsgError(opcode, status)
340

    
341
    back_network.backendjobid = jobid
342
    back_network.backendjobstatus = status
343
    back_network.backendopcode = opcode
344
    back_network.backendlogmsg = logmsg
345

    
346
    network = back_network.network
347

    
348
    # Notifications of success change the operating state
349
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
350
    if status == 'success' and state_for_success is not None:
351
        back_network.operstate = state_for_success
352

    
353
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
354
        back_network.operstate = 'ERROR'
355
        back_network.backendtime = etime
356

    
357
    if opcode == 'OP_NETWORK_REMOVE':
358
        network_is_deleted = (status == "success")
359
        if network_is_deleted or (status == "error" and not
360
                                  network_exists_in_backend(back_network)):
361
            back_network.operstate = state_for_success
362
            back_network.deleted = True
363
            back_network.backendtime = etime
364

    
365
    if status == 'success':
366
        back_network.backendtime = etime
367
    back_network.save()
368
    # Also you must update the state of the Network!!
369
    update_network_state(network)
370

    
371

    
372
def update_network_state(network):
373
    """Update the state of a Network based on BackendNetwork states.
374

375
    Update the state of a Network based on the operstate of the networks in the
376
    backends that network exists.
377

378
    The state of the network is:
379
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
380
    * DELETED: If it is is 'DELETED' in all backends that have been created.
381

382
    This function also releases the resources (MAC prefix or Bridge) and the
383
    quotas for the network.
384

385
    """
386
    if network.deleted:
387
        # Network has already been deleted. Just assert that state is also
388
        # DELETED
389
        if not network.state == "DELETED":
390
            network.state = "DELETED"
391
            network.save()
392
        return
393

    
394
    backend_states = [s.operstate for s in network.backend_networks.all()]
395
    if not backend_states and network.action != "DESTROY":
396
        if network.state != "ACTIVE":
397
            network.state = "ACTIVE"
398
            network.save()
399
            return
400

    
401
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
402
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
403
                     "DELETED")
404

    
405
    # Release the resources on the deletion of the Network
406
    if deleted:
407
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
408
                 network.id, network.mac_prefix, network.link)
409
        network.deleted = True
410
        network.state = "DELETED"
411
        if network.mac_prefix:
412
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
413
                release_resource(res_type="mac_prefix",
414
                                 value=network.mac_prefix)
415
        if network.link:
416
            if network.FLAVORS[network.flavor]["link"] == "pool":
417
                release_resource(res_type="bridge", value=network.link)
418

    
419
        # Issue commission
420
        if network.userid:
421
            quotas.issue_and_accept_commission(network, delete=True)
422
            # the above has already saved the object and committed;
423
            # a second save would override others' changes, since the
424
            # object is now unlocked
425
            return
426
        elif not network.public:
427
            log.warning("Network %s does not have an owner!", network.id)
428
    network.save()
429

    
430

    
431
@transaction.commit_on_success
432
def process_network_modify(back_network, etime, jobid, opcode, status,
433
                           job_fields):
434
    assert (opcode == "OP_NETWORK_SET_PARAMS")
435
    if status not in [x[0] for x in BACKEND_STATUSES]:
436
        raise Network.InvalidBackendMsgError(opcode, status)
437

    
438
    back_network.backendjobid = jobid
439
    back_network.backendjobstatus = status
440
    back_network.opcode = opcode
441

    
442
    add_reserved_ips = job_fields.get("add_reserved_ips")
443
    if add_reserved_ips:
444
        net = back_network.network
445
        pool = net.get_pool()
446
        if add_reserved_ips:
447
            for ip in add_reserved_ips:
448
                pool.reserve(ip, external=True)
449
        pool.save()
450

    
451
    if status == 'success':
452
        back_network.backendtime = etime
453
    back_network.save()
454

    
455

    
456
@transaction.commit_on_success
457
def process_create_progress(vm, etime, progress):
458

    
459
    percentage = int(progress)
460

    
461
    # The percentage may exceed 100%, due to the way
462
    # snf-image:copy-progress tracks bytes read by image handling processes
463
    percentage = 100 if percentage > 100 else percentage
464
    if percentage < 0:
465
        raise ValueError("Percentage cannot be negative")
466

    
467
    # FIXME: log a warning here, see #1033
468
#   if last_update > percentage:
469
#       raise ValueError("Build percentage should increase monotonically " \
470
#                        "(old = %d, new = %d)" % (last_update, percentage))
471

    
472
    # This assumes that no message of type 'ganeti-create-progress' is going to
473
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
474
    # the instance is STARTED.  What if the two messages are processed by two
475
    # separate dispatcher threads, and the 'ganeti-op-status' message for
476
    # successful creation gets processed before the 'ganeti-create-progress'
477
    # message? [vkoukis]
478
    #
479
    #if not vm.operstate == 'BUILD':
480
    #    raise VirtualMachine.IllegalState("VM is not in building state")
481

    
482
    vm.buildpercentage = percentage
483
    vm.backendtime = etime
484
    vm.save()
485

    
486

    
487
@transaction.commit_on_success
488
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
489
                               details=None):
490
    """
491
    Create virtual machine instance diagnostic entry.
492

493
    :param vm: VirtualMachine instance to create diagnostic for.
494
    :param message: Diagnostic message.
495
    :param source: Diagnostic source identifier (e.g. image-helper).
496
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
497
    :param etime: The time the message occured (if available).
498
    :param details: Additional details or debug information.
499
    """
500
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
501
                                                   source_date=etime,
502
                                                   message=message,
503
                                                   details=details)
504

    
505

    
506
def create_instance(vm, nics, flavor, image):
507
    """`image` is a dictionary which should contain the keys:
508
            'backend_id', 'format' and 'metadata'
509

510
        metadata value should be a dictionary.
511
    """
512

    
513
    # Handle arguments to CreateInstance() as a dictionary,
514
    # initialize it based on a deployment-specific value.
515
    # This enables the administrator to override deployment-specific
516
    # arguments, such as the disk template to use, name of os provider
517
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
518
    #
519
    kw = vm.backend.get_create_params()
520
    kw['mode'] = 'create'
521
    kw['name'] = vm.backend_vm_id
522
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
523

    
524
    kw['disk_template'] = flavor.disk_template
525
    kw['disks'] = [{"size": flavor.disk * 1024}]
526
    provider = flavor.disk_provider
527
    if provider:
528
        kw['disks'][0]['provider'] = provider
529
        kw['disks'][0]['origin'] = flavor.disk_origin
530

    
531
    kw['nics'] = [{"name": nic.backend_uuid,
532
                   "network": nic.network.backend_id,
533
                   "ip": nic.ipv4}
534
                  for nic in nics]
535
    backend = vm.backend
536
    depend_jobs = []
537
    for nic in nics:
538
        network = Network.objects.select_for_update().get(id=nic.network.id)
539
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
540
                                                             network=network)
541
        if bnet.operstate != "ACTIVE":
542
            if network.public:
543
                msg = "Can not connect instance to network %s. Network is not"\
544
                      " ACTIVE in backend %s." % (network, backend)
545
                raise Exception(msg)
546
            else:
547
                jobs = create_network(network, backend, connect=True)
548
                if isinstance(jobs, list):
549
                    depend_jobs.extend(jobs)
550
                else:
551
                    depend_jobs.append(jobs)
552
    kw["depends"] = [[job, ["success", "error", "canceled"]]
553
                     for job in depend_jobs]
554

    
555
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
556
    # kw['os'] = settings.GANETI_OS_PROVIDER
557
    kw['ip_check'] = False
558
    kw['name_check'] = False
559

    
560
    # Do not specific a node explicitly, have
561
    # Ganeti use an iallocator instead
562
    #kw['pnode'] = rapi.GetNodes()[0]
563

    
564
    kw['dry_run'] = settings.TEST
565

    
566
    kw['beparams'] = {
567
        'auto_balance': True,
568
        'vcpus': flavor.cpu,
569
        'memory': flavor.ram}
570

    
571
    kw['osparams'] = {
572
        'config_url': vm.config_url,
573
        # Store image id and format to Ganeti
574
        'img_id': image['backend_id'],
575
        'img_format': image['format']}
576

    
577
    # Use opportunistic locking
578
    kw['opportunistic_locking'] = True
579

    
580
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
581
    # kw['hvparams'] = dict(serial_console=False)
582

    
583
    log.debug("Creating instance %s", utils.hide_pass(kw))
584
    with pooled_rapi_client(vm) as client:
585
        return client.CreateInstance(**kw)
586

    
587

    
588
def delete_instance(vm):
589
    with pooled_rapi_client(vm) as client:
590
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
591

    
592

    
593
def reboot_instance(vm, reboot_type):
594
    assert reboot_type in ('soft', 'hard')
595
    kwargs = {"instance": vm.backend_vm_id,
596
              "reboot_type": "hard"}
597
    # XXX: Currently shutdown_timeout parameter is not supported from the
598
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
599
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
600
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
601
    # of OS API is different from the one in Ganeti, and maps to
602
    # 'shutdown_timeout'.
603
    #if reboot_type == "hard":
604
    #    kwargs["shutdown_timeout"] = 0
605
    if settings.TEST:
606
        kwargs["dry_run"] = True
607
    with pooled_rapi_client(vm) as client:
608
        return client.RebootInstance(**kwargs)
609

    
610

    
611
def startup_instance(vm):
612
    with pooled_rapi_client(vm) as client:
613
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
614

    
615

    
616
def shutdown_instance(vm):
617
    with pooled_rapi_client(vm) as client:
618
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
619

    
620

    
621
def resize_instance(vm, vcpus, memory):
622
    beparams = {"vcpus": int(vcpus),
623
                "minmem": int(memory),
624
                "maxmem": int(memory)}
625
    with pooled_rapi_client(vm) as client:
626
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
627

    
628

    
629
def get_instance_console(vm):
630
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
631
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
632
    # useless (see #783).
633
    #
634
    # Until this is fixed on the Ganeti side, construct a console info reply
635
    # directly.
636
    #
637
    # WARNING: This assumes that VNC runs on port network_port on
638
    #          the instance's primary node, and is probably
639
    #          hypervisor-specific.
640
    #
641
    log.debug("Getting console for vm %s", vm)
642

    
643
    console = {}
644
    console['kind'] = 'vnc'
645

    
646
    with pooled_rapi_client(vm) as client:
647
        i = client.GetInstance(vm.backend_vm_id)
648

    
649
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
650
        raise Exception("hv parameter serial_console cannot be true")
651
    console['host'] = i['pnode']
652
    console['port'] = i['network_port']
653

    
654
    return console
655

    
656

    
657
def get_instance_info(vm):
658
    with pooled_rapi_client(vm) as client:
659
        return client.GetInstance(vm.backend_vm_id)
660

    
661

    
662
def vm_exists_in_backend(vm):
663
    try:
664
        get_instance_info(vm)
665
        return True
666
    except GanetiApiError as e:
667
        if e.code == 404:
668
            return False
669
        raise e
670

    
671

    
672
def get_network_info(backend_network):
673
    with pooled_rapi_client(backend_network) as client:
674
        return client.GetNetwork(backend_network.network.backend_id)
675

    
676

    
677
def network_exists_in_backend(backend_network):
678
    try:
679
        get_network_info(backend_network)
680
        return True
681
    except GanetiApiError as e:
682
        if e.code == 404:
683
            return False
684

    
685

    
686
def create_network(network, backend, connect=True):
687
    """Create a network in a Ganeti backend"""
688
    log.debug("Creating network %s in backend %s", network, backend)
689

    
690
    job_id = _create_network(network, backend)
691

    
692
    if connect:
693
        job_ids = connect_network(network, backend, depends=[job_id])
694
        return job_ids
695
    else:
696
        return [job_id]
697

    
698

    
699
def _create_network(network, backend):
700
    """Create a network."""
701

    
702
    tags = network.backend_tag
703
    if network.dhcp:
704
        tags.append('nfdhcpd')
705

    
706
    if network.public:
707
        conflicts_check = True
708
        tags.append('public')
709
    else:
710
        conflicts_check = False
711
        tags.append('private')
712

    
713
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
714
    # not support IPv6 only networks. To bypass this limitation, we create the
715
    # network with a dummy network subnet, and make Cyclades connect instances
716
    # to such networks, with address=None.
717
    subnet = network.subnet
718
    if subnet is None:
719
        subnet = "10.0.0.0/24"
720

    
721
    try:
722
        bn = BackendNetwork.objects.get(network=network, backend=backend)
723
        mac_prefix = bn.mac_prefix
724
    except BackendNetwork.DoesNotExist:
725
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
726
                        " does not exist" % (network.id, backend.id))
727

    
728
    with pooled_rapi_client(backend) as client:
729
        return client.CreateNetwork(network_name=network.backend_id,
730
                                    network=subnet,
731
                                    network6=network.subnet6,
732
                                    gateway=network.gateway,
733
                                    gateway6=network.gateway6,
734
                                    mac_prefix=mac_prefix,
735
                                    conflicts_check=conflicts_check,
736
                                    tags=tags)
737

    
738

    
739
def connect_network(network, backend, depends=[], group=None):
740
    """Connect a network to nodegroups."""
741
    log.debug("Connecting network %s to backend %s", network, backend)
742

    
743
    if network.public:
744
        conflicts_check = True
745
    else:
746
        conflicts_check = False
747

    
748
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
749
    with pooled_rapi_client(backend) as client:
750
        groups = [group] if group is not None else client.GetGroups()
751
        job_ids = []
752
        for group in groups:
753
            job_id = client.ConnectNetwork(network.backend_id, group,
754
                                           network.mode, network.link,
755
                                           conflicts_check,
756
                                           depends=depends)
757
            job_ids.append(job_id)
758
    return job_ids
759

    
760

    
761
def delete_network(network, backend, disconnect=True):
762
    log.debug("Deleting network %s from backend %s", network, backend)
763

    
764
    depends = []
765
    if disconnect:
766
        depends = disconnect_network(network, backend)
767
    _delete_network(network, backend, depends=depends)
768

    
769

    
770
def _delete_network(network, backend, depends=[]):
771
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
772
    with pooled_rapi_client(backend) as client:
773
        return client.DeleteNetwork(network.backend_id, depends)
774

    
775

    
776
def disconnect_network(network, backend, group=None):
777
    log.debug("Disconnecting network %s to backend %s", network, backend)
778

    
779
    with pooled_rapi_client(backend) as client:
780
        groups = [group] if group is not None else client.GetGroups()
781
        job_ids = []
782
        for group in groups:
783
            job_id = client.DisconnectNetwork(network.backend_id, group)
784
            job_ids.append(job_id)
785
    return job_ids
786

    
787

    
788
def connect_to_network(vm, nic):
789
    network = nic.network
790
    backend = vm.backend
791
    network = Network.objects.select_for_update().get(id=network.id)
792
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
793
                                                         network=network)
794
    depend_jobs = []
795
    if bnet.operstate != "ACTIVE":
796
        depend_jobs = create_network(network, backend, connect=True)
797

    
798
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
799

    
800
    nic = {'name': nic.backend_uuid,
801
           'network': network.backend_id,
802
           'ip': nic.ipv4}
803

    
804
    log.debug("Adding NIC %s to VM %s", nic, vm)
805

    
806
    kwargs = {
807
        "instance": vm.backend_vm_id,
808
        "nics": [("add", "-1", nic)],
809
        "depends": depends,
810
    }
811
    if vm.backend.use_hotplug():
812
        kwargs["hotplug"] = True
813
    if settings.TEST:
814
        kwargs["dry_run"] = True
815

    
816
    with pooled_rapi_client(vm) as client:
817
        return client.ModifyInstance(**kwargs)
818

    
819

    
820
def disconnect_from_network(vm, nic):
821
    log.debug("Removing NIC %s of VM %s", nic, vm)
822

    
823
    kwargs = {
824
        "instance": vm.backend_vm_id,
825
        "nics": [("remove", nic.backend_uuid, {})],
826
    }
827
    if vm.backend.use_hotplug():
828
        kwargs["hotplug"] = True
829
    if settings.TEST:
830
        kwargs["dry_run"] = True
831

    
832
    with pooled_rapi_client(vm) as client:
833
        jobID = client.ModifyInstance(**kwargs)
834
        # If the NIC has a tag for a firewall profile it must be deleted,
835
        # otherwise it may affect another NIC. XXX: Deleting the tag should
836
        # depend on the removing the NIC, but currently RAPI client does not
837
        # support this, this may result in clearing the firewall profile
838
        # without successfully removing the NIC. This issue will be fixed with
839
        # use of NIC UUIDs.
840
        firewall_profile = nic.firewall_profile
841
        if firewall_profile and firewall_profile != "DISABLED":
842
            tag = _firewall_tags[firewall_profile] % nic.index
843
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
844
                                      dry_run=settings.TEST)
845

    
846
        return jobID
847

    
848

    
849
def set_firewall_profile(vm, profile, index=0):
850
    try:
851
        tag = _firewall_tags[profile] % index
852
    except KeyError:
853
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
854

    
855
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
856

    
857
    with pooled_rapi_client(vm) as client:
858
        # Delete previous firewall tags
859
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
860
        delete_tags = [(t % index) for t in _firewall_tags.values()
861
                       if (t % index) in old_tags]
862
        if delete_tags:
863
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
864
                                      dry_run=settings.TEST)
865

    
866
        if profile != "DISABLED":
867
            client.AddInstanceTags(vm.backend_vm_id, [tag],
868
                                   dry_run=settings.TEST)
869

    
870
        # XXX NOP ModifyInstance call to force process_net_status to run
871
        # on the dispatcher
872
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
873
        client.ModifyInstance(vm.backend_vm_id,
874
                              os_name=os_name)
875
    return None
876

    
877

    
878
def get_instances(backend, bulk=True):
879
    with pooled_rapi_client(backend) as c:
880
        return c.GetInstances(bulk=bulk)
881

    
882

    
883
def get_nodes(backend, bulk=True):
884
    with pooled_rapi_client(backend) as c:
885
        return c.GetNodes(bulk=bulk)
886

    
887

    
888
def get_jobs(backend, bulk=True):
889
    with pooled_rapi_client(backend) as c:
890
        return c.GetJobs(bulk=bulk)
891

    
892

    
893
def get_physical_resources(backend):
894
    """ Get the physical resources of a backend.
895

896
    Get the resources of a backend as reported by the backend (not the db).
897

898
    """
899
    nodes = get_nodes(backend, bulk=True)
900
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
901
    res = {}
902
    for a in attr:
903
        res[a] = 0
904
    for n in nodes:
905
        # Filter out drained, offline and not vm_capable nodes since they will
906
        # not take part in the vm allocation process
907
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
908
        if can_host_vms and n['cnodes']:
909
            for a in attr:
910
                res[a] += int(n[a])
911
    return res
912

    
913

    
914
def update_backend_resources(backend, resources=None):
915
    """ Update the state of the backend resources in db.
916

917
    """
918

    
919
    if not resources:
920
        resources = get_physical_resources(backend)
921

    
922
    backend.mfree = resources['mfree']
923
    backend.mtotal = resources['mtotal']
924
    backend.dfree = resources['dfree']
925
    backend.dtotal = resources['dtotal']
926
    backend.pinst_cnt = resources['pinst_cnt']
927
    backend.ctotal = resources['ctotal']
928
    backend.updated = datetime.now()
929
    backend.save()
930

    
931

    
932
def get_memory_from_instances(backend):
933
    """ Get the memory that is used from instances.
934

935
    Get the used memory of a backend. Note: This is different for
936
    the real memory used, due to kvm's memory de-duplication.
937

938
    """
939
    with pooled_rapi_client(backend) as client:
940
        instances = client.GetInstances(bulk=True)
941
    mem = 0
942
    for i in instances:
943
        mem += i['oper_ram']
944
    return mem
945

    
946

    
947
def get_available_disk_templates(backend):
948
    """Get the list of available disk templates of a Ganeti backend.
949

950
    The list contains the disk templates that are enabled in the Ganeti backend
951
    and also included in ipolicy-disk-templates.
952

953
    """
954
    with pooled_rapi_client(backend) as c:
955
        info = c.GetInfo()
956
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
957
    try:
958
        enabled_disk_templates = info["enabled_disk_templates"]
959
        return [dp for dp in enabled_disk_templates
960
                if dp in ipolicy_disk_templates]
961
    except KeyError:
962
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
963
        return ipolicy_disk_templates
964

    
965

    
966
def update_backend_disk_templates(backend):
967
    disk_templates = get_available_disk_templates(backend)
968
    backend.disk_templates = disk_templates
969
    backend.save()
970

    
971

    
972
##
973
## Synchronized operations for reconciliation
974
##
975

    
976

    
977
def create_network_synced(network, backend):
978
    result = _create_network_synced(network, backend)
979
    if result[0] != 'success':
980
        return result
981
    result = connect_network_synced(network, backend)
982
    return result
983

    
984

    
985
def _create_network_synced(network, backend):
986
    with pooled_rapi_client(backend) as client:
987
        job = _create_network(network, backend)
988
        result = wait_for_job(client, job)
989
    return result
990

    
991

    
992
def connect_network_synced(network, backend):
993
    with pooled_rapi_client(backend) as client:
994
        for group in client.GetGroups():
995
            job = client.ConnectNetwork(network.backend_id, group,
996
                                        network.mode, network.link)
997
            result = wait_for_job(client, job)
998
            if result[0] != 'success':
999
                return result
1000

    
1001
    return result
1002

    
1003

    
1004
def wait_for_job(client, jobid):
1005
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1006
    status = result['job_info'][0]
1007
    while status not in ['success', 'error', 'cancel']:
1008
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1009
                                         [result], None)
1010
        status = result['job_info'][0]
1011

    
1012
    if status == 'success':
1013
        return (status, None)
1014
    else:
1015
        error = result['job_info'][1]
1016
        return (status, error)