Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 3c52a9df

History | View | Annotate | Download (36.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
NIC_FIELDS = ["state", "mac", "ipv4", "ipv6", "network", "firewall_profile"]
60

    
61

    
62
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
63
    """Handle quotas for updated VirtualMachine.
64

65
    Update quotas for the updated VirtualMachine based on the job that run on
66
    the Ganeti backend. If a commission has been already issued for this job,
67
    then this commission is just accepted or rejected based on the job status.
68
    Otherwise, a new commission for the given change is issued, that is also in
69
    force and auto-accept mode. In this case, previous commissions are
70
    rejected, since they reflect a previous state of the VM.
71

72
    """
73
    if job_status not in ["success", "error", "canceled"]:
74
        return vm
75

    
76
    # Check successful completion of a job will trigger any quotable change in
77
    # the VM state.
78
    action = utils.get_action_from_opcode(job_opcode, job_fields)
79
    if action == "BUILD":
80
        # Quotas for new VMs are automatically accepted by the API
81
        return vm
82
    commission_info = quotas.get_commission_info(vm, action=action,
83
                                                 action_fields=job_fields)
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == "success":
92
            quotas.accept_serial(serial)
93
        elif job_status in ["error", "canceled"]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_serial(serial)
97
        vm.serial = None
98
    elif job_status == "success" and commission_info is not None:
99
        log.debug("Expected job was %s. Processing job %s. Commission for"
100
                  " this job: %s", vm.task_job_id, job_id, commission_info)
101
        # Commission for this change has not been issued, or the issued
102
        # commission was unaware of the current change. Reject all previous
103
        # commissions and create a new one in forced mode!
104
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
105
                           % (vm, job_id))
106
        quotas.handle_resource_commission(vm, action,
107
                                          commission_info=commission_info,
108
                                          commission_name=commission_name,
109
                                          force=True,
110
                                          auto_accept=True)
111
        log.debug("Issued new commission: %s", vm.serial)
112

    
113
    return vm
114

    
115

    
116
@transaction.commit_on_success
117
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
118
                      job_fields=None):
119
    """Process a job progress notification from the backend
120

121
    Process an incoming message from the backend (currently Ganeti).
122
    Job notifications with a terminating status (sucess, error, or canceled),
123
    also update the operating state of the VM.
124

125
    """
126
    # See #1492, #1031, #1111 why this line has been removed
127
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
128
    if status not in [x[0] for x in BACKEND_STATUSES]:
129
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
130

    
131
    vm.backendjobid = jobid
132
    vm.backendjobstatus = status
133
    vm.backendopcode = opcode
134
    vm.backendlogmsg = logmsg
135

    
136
    if status in ["queued", "waiting", "running"]:
137
        vm.save()
138
        return
139

    
140
    if job_fields is None:
141
        job_fields = {}
142
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
143

    
144
    # Notifications of success change the operating state
145
    if status == "success":
146
        if state_for_success is not None:
147
            vm.operstate = state_for_success
148
        beparams = job_fields.get("beparams", None)
149
        if beparams:
150
            # Change the flavor of the VM
151
            _process_resize(vm, beparams)
152
        # Update backendtime only for jobs that have been successfully
153
        # completed, since only these jobs update the state of the VM. Else a
154
        # "race condition" may occur when a successful job (e.g.
155
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
156
        # in reversed order.
157
        vm.backendtime = etime
158

    
159
    if status in ["success", "error", "canceled"] and nics is not None:
160
        # Update the NICs of the VM
161
        _process_net_status(vm, etime, nics)
162

    
163
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
164
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
165
        vm.operstate = 'ERROR'
166
        vm.backendtime = etime
167
    elif opcode == 'OP_INSTANCE_REMOVE':
168
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
169
        # when no instance exists at the Ganeti backend.
170
        # See ticket #799 for all the details.
171
        if status == 'success' or (status == 'error' and
172
                                   not vm_exists_in_backend(vm)):
173
            # VM has been deleted
174
            for nic in vm.nics.all():
175
                # Release the IP
176
                release_nic_address(nic)
177
                # And delete the NIC.
178
                nic.delete()
179
            vm.deleted = True
180
            vm.operstate = state_for_success
181
            vm.backendtime = etime
182
            status = "success"
183

    
184
    if status in ["success", "error", "canceled"]:
185
        # Job is finalized: Handle quotas/commissioning
186
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
187
                              job_status=status, job_fields=job_fields)
188
        # and clear task fields
189
        if vm.task_job_id == jobid:
190
            vm.task = None
191
            vm.task_job_id = None
192

    
193
    vm.save()
194

    
195

    
196
def _process_resize(vm, beparams):
197
    """Change flavor of a VirtualMachine based on new beparams."""
198
    old_flavor = vm.flavor
199
    vcpus = beparams.get("vcpus", old_flavor.cpu)
200
    ram = beparams.get("maxmem", old_flavor.ram)
201
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
202
        return
203
    try:
204
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
205
                                        disk=old_flavor.disk,
206
                                        disk_template=old_flavor.disk_template)
207
    except Flavor.DoesNotExist:
208
        raise Exception("Can not find flavor for VM")
209
    vm.flavor = new_flavor
210
    vm.save()
211

    
212

    
213
@transaction.commit_on_success
214
def process_net_status(vm, etime, nics):
215
    """Wrap _process_net_status inside transaction."""
216
    _process_net_status(vm, etime, nics)
217

    
218

    
219
def _process_net_status(vm, etime, nics):
220
    """Process a net status notification from the backend
221

222
    Process an incoming message from the Ganeti backend,
223
    detailing the NIC configuration of a VM instance.
224

225
    Update the state of the VM in the DB accordingly.
226

227
    """
228
    ganeti_nics = process_ganeti_nics(nics)
229
    db_nics = dict([(nic.id, nic) for nic in vm.nics.all()])
230

    
231
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
232
    # guarantee that no deadlock will occur with Backend allocator.
233
    Backend.objects.select_for_update().get(id=vm.backend_id)
234

    
235
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
236
        db_nic = db_nics.get(nic_name)
237
        ganeti_nic = ganeti_nics.get(nic_name)
238
        if ganeti_nic is None:
239
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
240
            # state for more than 5 minutes, then we remove the NIC.
241
            # TODO: This is dangerous as the job may be stack in the queue, and
242
            # releasing the IP may lead to duplicate IP use.
243
            if nic.state != "BUILDING" or (nic.state == "BUILDING" and
244
               etime > nic.created + timedelta(minutes=5)):
245
                release_nic_address(nic)
246
                nic.delete()
247
            else:
248
                log.warning("Ignoring recent building NIC: %s", nic)
249
        elif db_nic is None:
250
            # NIC exists in Ganeti but not in DB
251
            if ganeti_nic["ipv4"]:
252
                network = ganeti_nic["network"]
253
                network.reserve_address(ganeti_nic["ipv4"])
254
            vm.nics.create(**ganeti_nic)
255
        elif not nics_are_equal(db_nic, ganeti_nic):
256
            # Special case where the IPv4 address has changed, because you
257
            # need to release the old IPv4 address and reserve the new one
258
            if db_nic.ipv4 != ganeti_nic["ipv4"]:
259
                release_nic_address(db_nic)
260
                if ganeti_nic["ipv4"]:
261
                    ganeti_nic["network"].reserve_address(ganeti_nic["ipv4"])
262

    
263
            # Update the NIC in DB with the values from Ganeti NIC
264
            [setattr(db_nic, f, ganeti_nic[f]) for f in NIC_FIELDS]
265
            db_nic.save()
266

    
267
            # Dummy update the network, to work with 'changed-since'
268
            db_nic.network.save()
269

    
270
    vm.backendtime = etime
271
    vm.save()
272

    
273

    
274
def nics_are_equal(db_nic, gnt_nic):
275
    for field in NIC_FIELDS:
276
        if getattr(db_nic, field) != gnt_nic[field]:
277
            return False
278
    return True
279

    
280

    
281
def process_ganeti_nics(ganeti_nics):
282
    """Process NIC dict from ganeti"""
283
    new_nics = []
284
    for index, gnic in enumerate(ganeti_nics):
285
        nic_name = gnic.get("name", None)
286
        if nic_name is not None:
287
            nic_id = utils.id_from_nic_name(nic_name)
288
        else:
289
            # Put as default value the index. If it is an unknown NIC to
290
            # synnefo it will be created automaticaly.
291
            nic_id = "unknown-" + str(index)
292
        network_name = gnic.get('network', '')
293
        network_id = utils.id_from_network_name(network_name)
294
        network = Network.objects.get(id=network_id)
295

    
296
        # Get the new nic info
297
        mac = gnic.get('mac')
298
        ipv4 = gnic.get('ip')
299
        ipv6 = mac2eui64(mac, network.subnet6)\
300
            if network.subnet6 is not None else None
301

    
302
        firewall = gnic.get('firewall')
303
        firewall_profile = _reverse_tags.get(firewall)
304
        if not firewall_profile and network.public:
305
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
306

    
307
        nic_info = {
308
            'index': index,
309
            'network': network,
310
            'mac': mac,
311
            'ipv4': ipv4,
312
            'ipv6': ipv6,
313
            'firewall_profile': firewall_profile,
314
            'state': 'ACTIVE'}
315

    
316
        new_nics.append((nic_id, nic_info))
317
    return dict(new_nics)
318

    
319

    
320
def release_nic_address(nic):
321
    """Release the IPv4 address of a NIC.
322

323
    Check if an instance's NIC has an IPv4 address and release it if it is not
324
    a Floating IP.
325

326
    """
327

    
328
    if nic.ipv4 and not nic.ip_type == "FLOATING":
329
        nic.network.release_address(nic.ipv4)
330

    
331

    
332
@transaction.commit_on_success
333
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
334
    if status not in [x[0] for x in BACKEND_STATUSES]:
335
        raise Network.InvalidBackendMsgError(opcode, status)
336

    
337
    back_network.backendjobid = jobid
338
    back_network.backendjobstatus = status
339
    back_network.backendopcode = opcode
340
    back_network.backendlogmsg = logmsg
341

    
342
    network = back_network.network
343

    
344
    # Notifications of success change the operating state
345
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
346
    if status == 'success' and state_for_success is not None:
347
        back_network.operstate = state_for_success
348

    
349
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
350
        back_network.operstate = 'ERROR'
351
        back_network.backendtime = etime
352

    
353
    if opcode == 'OP_NETWORK_REMOVE':
354
        network_is_deleted = (status == "success")
355
        if network_is_deleted or (status == "error" and not
356
                                  network_exists_in_backend(back_network)):
357
            back_network.operstate = state_for_success
358
            back_network.deleted = True
359
            back_network.backendtime = etime
360

    
361
    if status == 'success':
362
        back_network.backendtime = etime
363
    back_network.save()
364
    # Also you must update the state of the Network!!
365
    update_network_state(network)
366

    
367

    
368
def update_network_state(network):
369
    """Update the state of a Network based on BackendNetwork states.
370

371
    Update the state of a Network based on the operstate of the networks in the
372
    backends that network exists.
373

374
    The state of the network is:
375
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
376
    * DELETED: If it is is 'DELETED' in all backends that have been created.
377

378
    This function also releases the resources (MAC prefix or Bridge) and the
379
    quotas for the network.
380

381
    """
382
    if network.deleted:
383
        # Network has already been deleted. Just assert that state is also
384
        # DELETED
385
        if not network.state == "DELETED":
386
            network.state = "DELETED"
387
            network.save()
388
        return
389

    
390
    backend_states = [s.operstate for s in network.backend_networks.all()]
391
    if not backend_states and network.action != "DESTROY":
392
        if network.state != "ACTIVE":
393
            network.state = "ACTIVE"
394
            network.save()
395
            return
396

    
397
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
398
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
399
                     "DELETED")
400

    
401
    # Release the resources on the deletion of the Network
402
    if deleted:
403
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
404
                 network.id, network.mac_prefix, network.link)
405
        network.deleted = True
406
        network.state = "DELETED"
407
        if network.mac_prefix:
408
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
409
                release_resource(res_type="mac_prefix",
410
                                 value=network.mac_prefix)
411
        if network.link:
412
            if network.FLAVORS[network.flavor]["link"] == "pool":
413
                release_resource(res_type="bridge", value=network.link)
414

    
415
        # Issue commission
416
        if network.userid:
417
            quotas.issue_and_accept_commission(network, delete=True)
418
            # the above has already saved the object and committed;
419
            # a second save would override others' changes, since the
420
            # object is now unlocked
421
            return
422
        elif not network.public:
423
            log.warning("Network %s does not have an owner!", network.id)
424
    network.save()
425

    
426

    
427
@transaction.commit_on_success
428
def process_network_modify(back_network, etime, jobid, opcode, status,
429
                           job_fields):
430
    assert (opcode == "OP_NETWORK_SET_PARAMS")
431
    if status not in [x[0] for x in BACKEND_STATUSES]:
432
        raise Network.InvalidBackendMsgError(opcode, status)
433

    
434
    back_network.backendjobid = jobid
435
    back_network.backendjobstatus = status
436
    back_network.opcode = opcode
437

    
438
    add_reserved_ips = job_fields.get("add_reserved_ips")
439
    if add_reserved_ips:
440
        net = back_network.network
441
        pool = net.get_pool()
442
        if add_reserved_ips:
443
            for ip in add_reserved_ips:
444
                pool.reserve(ip, external=True)
445
        pool.save()
446

    
447
    if status == 'success':
448
        back_network.backendtime = etime
449
    back_network.save()
450

    
451

    
452
@transaction.commit_on_success
453
def process_create_progress(vm, etime, progress):
454

    
455
    percentage = int(progress)
456

    
457
    # The percentage may exceed 100%, due to the way
458
    # snf-image:copy-progress tracks bytes read by image handling processes
459
    percentage = 100 if percentage > 100 else percentage
460
    if percentage < 0:
461
        raise ValueError("Percentage cannot be negative")
462

    
463
    # FIXME: log a warning here, see #1033
464
#   if last_update > percentage:
465
#       raise ValueError("Build percentage should increase monotonically " \
466
#                        "(old = %d, new = %d)" % (last_update, percentage))
467

    
468
    # This assumes that no message of type 'ganeti-create-progress' is going to
469
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
470
    # the instance is STARTED.  What if the two messages are processed by two
471
    # separate dispatcher threads, and the 'ganeti-op-status' message for
472
    # successful creation gets processed before the 'ganeti-create-progress'
473
    # message? [vkoukis]
474
    #
475
    #if not vm.operstate == 'BUILD':
476
    #    raise VirtualMachine.IllegalState("VM is not in building state")
477

    
478
    vm.buildpercentage = percentage
479
    vm.backendtime = etime
480
    vm.save()
481

    
482

    
483
@transaction.commit_on_success
484
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
485
                               details=None):
486
    """
487
    Create virtual machine instance diagnostic entry.
488

489
    :param vm: VirtualMachine instance to create diagnostic for.
490
    :param message: Diagnostic message.
491
    :param source: Diagnostic source identifier (e.g. image-helper).
492
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
493
    :param etime: The time the message occured (if available).
494
    :param details: Additional details or debug information.
495
    """
496
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
497
                                                   source_date=etime,
498
                                                   message=message,
499
                                                   details=details)
500

    
501

    
502
def create_instance(vm, nics, flavor, image):
503
    """`image` is a dictionary which should contain the keys:
504
            'backend_id', 'format' and 'metadata'
505

506
        metadata value should be a dictionary.
507
    """
508

    
509
    # Handle arguments to CreateInstance() as a dictionary,
510
    # initialize it based on a deployment-specific value.
511
    # This enables the administrator to override deployment-specific
512
    # arguments, such as the disk template to use, name of os provider
513
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
514
    #
515
    kw = vm.backend.get_create_params()
516
    kw['mode'] = 'create'
517
    kw['name'] = vm.backend_vm_id
518
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
519

    
520
    kw['disk_template'] = flavor.disk_template
521
    kw['disks'] = [{"size": flavor.disk * 1024}]
522
    provider = flavor.disk_provider
523
    if provider:
524
        kw['disks'][0]['provider'] = provider
525
        kw['disks'][0]['origin'] = flavor.disk_origin
526

    
527
    kw['nics'] = [{"name": nic.backend_uuid,
528
                   "network": nic.network.backend_id,
529
                   "ip": nic.ipv4}
530
                  for nic in nics]
531
    backend = vm.backend
532
    depend_jobs = []
533
    for nic in nics:
534
        network = Network.objects.select_for_update().get(id=nic.network.id)
535
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
536
                                                             network=network)
537
        if bnet.operstate != "ACTIVE":
538
            if network.public:
539
                msg = "Can not connect instance to network %s. Network is not"\
540
                      " ACTIVE in backend %s." % (network, backend)
541
                raise Exception(msg)
542
            else:
543
                jobs = create_network(network, backend, connect=True)
544
                if isinstance(jobs, list):
545
                    depend_jobs.extend(jobs)
546
                else:
547
                    depend_jobs.append(jobs)
548
    kw["depends"] = [[job, ["success", "error", "canceled"]]
549
                     for job in depend_jobs]
550

    
551
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
552
    # kw['os'] = settings.GANETI_OS_PROVIDER
553
    kw['ip_check'] = False
554
    kw['name_check'] = False
555

    
556
    # Do not specific a node explicitly, have
557
    # Ganeti use an iallocator instead
558
    #kw['pnode'] = rapi.GetNodes()[0]
559

    
560
    kw['dry_run'] = settings.TEST
561

    
562
    kw['beparams'] = {
563
        'auto_balance': True,
564
        'vcpus': flavor.cpu,
565
        'memory': flavor.ram}
566

    
567
    kw['osparams'] = {
568
        'config_url': vm.config_url,
569
        # Store image id and format to Ganeti
570
        'img_id': image['backend_id'],
571
        'img_format': image['format']}
572

    
573
    # Use opportunistic locking
574
    kw['opportunistic_locking'] = True
575

    
576
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
577
    # kw['hvparams'] = dict(serial_console=False)
578

    
579
    log.debug("Creating instance %s", utils.hide_pass(kw))
580
    with pooled_rapi_client(vm) as client:
581
        return client.CreateInstance(**kw)
582

    
583

    
584
def delete_instance(vm):
585
    with pooled_rapi_client(vm) as client:
586
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
587

    
588

    
589
def reboot_instance(vm, reboot_type):
590
    assert reboot_type in ('soft', 'hard')
591
    kwargs = {"instance": vm.backend_vm_id,
592
              "reboot_type": "hard"}
593
    # XXX: Currently shutdown_timeout parameter is not supported from the
594
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
595
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
596
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
597
    # of OS API is different from the one in Ganeti, and maps to
598
    # 'shutdown_timeout'.
599
    #if reboot_type == "hard":
600
    #    kwargs["shutdown_timeout"] = 0
601
    if settings.TEST:
602
        kwargs["dry_run"] = True
603
    with pooled_rapi_client(vm) as client:
604
        return client.RebootInstance(**kwargs)
605

    
606

    
607
def startup_instance(vm):
608
    with pooled_rapi_client(vm) as client:
609
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
610

    
611

    
612
def shutdown_instance(vm):
613
    with pooled_rapi_client(vm) as client:
614
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
615

    
616

    
617
def resize_instance(vm, vcpus, memory):
618
    beparams = {"vcpus": int(vcpus),
619
                "minmem": int(memory),
620
                "maxmem": int(memory)}
621
    with pooled_rapi_client(vm) as client:
622
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
623

    
624

    
625
def get_instance_console(vm):
626
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
627
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
628
    # useless (see #783).
629
    #
630
    # Until this is fixed on the Ganeti side, construct a console info reply
631
    # directly.
632
    #
633
    # WARNING: This assumes that VNC runs on port network_port on
634
    #          the instance's primary node, and is probably
635
    #          hypervisor-specific.
636
    #
637
    log.debug("Getting console for vm %s", vm)
638

    
639
    console = {}
640
    console['kind'] = 'vnc'
641

    
642
    with pooled_rapi_client(vm) as client:
643
        i = client.GetInstance(vm.backend_vm_id)
644

    
645
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
646
        raise Exception("hv parameter serial_console cannot be true")
647
    console['host'] = i['pnode']
648
    console['port'] = i['network_port']
649

    
650
    return console
651

    
652

    
653
def get_instance_info(vm):
654
    with pooled_rapi_client(vm) as client:
655
        return client.GetInstance(vm.backend_vm_id)
656

    
657

    
658
def vm_exists_in_backend(vm):
659
    try:
660
        get_instance_info(vm)
661
        return True
662
    except GanetiApiError as e:
663
        if e.code == 404:
664
            return False
665
        raise e
666

    
667

    
668
def get_network_info(backend_network):
669
    with pooled_rapi_client(backend_network) as client:
670
        return client.GetNetwork(backend_network.network.backend_id)
671

    
672

    
673
def network_exists_in_backend(backend_network):
674
    try:
675
        get_network_info(backend_network)
676
        return True
677
    except GanetiApiError as e:
678
        if e.code == 404:
679
            return False
680

    
681

    
682
def create_network(network, backend, connect=True):
683
    """Create a network in a Ganeti backend"""
684
    log.debug("Creating network %s in backend %s", network, backend)
685

    
686
    job_id = _create_network(network, backend)
687

    
688
    if connect:
689
        job_ids = connect_network(network, backend, depends=[job_id])
690
        return job_ids
691
    else:
692
        return [job_id]
693

    
694

    
695
def _create_network(network, backend):
696
    """Create a network."""
697

    
698
    tags = network.backend_tag
699
    if network.dhcp:
700
        tags.append('nfdhcpd')
701

    
702
    if network.public:
703
        conflicts_check = True
704
        tags.append('public')
705
    else:
706
        conflicts_check = False
707
        tags.append('private')
708

    
709
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
710
    # not support IPv6 only networks. To bypass this limitation, we create the
711
    # network with a dummy network subnet, and make Cyclades connect instances
712
    # to such networks, with address=None.
713
    subnet = network.subnet
714
    if subnet is None:
715
        subnet = "10.0.0.0/24"
716

    
717
    try:
718
        bn = BackendNetwork.objects.get(network=network, backend=backend)
719
        mac_prefix = bn.mac_prefix
720
    except BackendNetwork.DoesNotExist:
721
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
722
                        " does not exist" % (network.id, backend.id))
723

    
724
    with pooled_rapi_client(backend) as client:
725
        return client.CreateNetwork(network_name=network.backend_id,
726
                                    network=subnet,
727
                                    network6=network.subnet6,
728
                                    gateway=network.gateway,
729
                                    gateway6=network.gateway6,
730
                                    mac_prefix=mac_prefix,
731
                                    conflicts_check=conflicts_check,
732
                                    tags=tags)
733

    
734

    
735
def connect_network(network, backend, depends=[], group=None):
736
    """Connect a network to nodegroups."""
737
    log.debug("Connecting network %s to backend %s", network, backend)
738

    
739
    if network.public:
740
        conflicts_check = True
741
    else:
742
        conflicts_check = False
743

    
744
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
745
    with pooled_rapi_client(backend) as client:
746
        groups = [group] if group is not None else client.GetGroups()
747
        job_ids = []
748
        for group in groups:
749
            job_id = client.ConnectNetwork(network.backend_id, group,
750
                                           network.mode, network.link,
751
                                           conflicts_check,
752
                                           depends=depends)
753
            job_ids.append(job_id)
754
    return job_ids
755

    
756

    
757
def delete_network(network, backend, disconnect=True):
758
    log.debug("Deleting network %s from backend %s", network, backend)
759

    
760
    depends = []
761
    if disconnect:
762
        depends = disconnect_network(network, backend)
763
    _delete_network(network, backend, depends=depends)
764

    
765

    
766
def _delete_network(network, backend, depends=[]):
767
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
768
    with pooled_rapi_client(backend) as client:
769
        return client.DeleteNetwork(network.backend_id, depends)
770

    
771

    
772
def disconnect_network(network, backend, group=None):
773
    log.debug("Disconnecting network %s to backend %s", network, backend)
774

    
775
    with pooled_rapi_client(backend) as client:
776
        groups = [group] if group is not None else client.GetGroups()
777
        job_ids = []
778
        for group in groups:
779
            job_id = client.DisconnectNetwork(network.backend_id, group)
780
            job_ids.append(job_id)
781
    return job_ids
782

    
783

    
784
def connect_to_network(vm, nic):
785
    network = nic.network
786
    backend = vm.backend
787
    network = Network.objects.select_for_update().get(id=network.id)
788
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
789
                                                         network=network)
790
    depend_jobs = []
791
    if bnet.operstate != "ACTIVE":
792
        depend_jobs = create_network(network, backend, connect=True)
793

    
794
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
795

    
796
    nic = {'name': nic.backend_uuid,
797
           'network': network.backend_id,
798
           'ip': nic.ipv4}
799

    
800
    log.debug("Adding NIC %s to VM %s", nic, vm)
801

    
802
    kwargs = {
803
        "instance": vm.backend_vm_id,
804
        "nics": [("add", "-1", nic)],
805
        "depends": depends,
806
    }
807
    if vm.backend.use_hotplug():
808
        kwargs["hotplug"] = True
809
    if settings.TEST:
810
        kwargs["dry_run"] = True
811

    
812
    with pooled_rapi_client(vm) as client:
813
        return client.ModifyInstance(**kwargs)
814

    
815

    
816
def disconnect_from_network(vm, nic):
817
    log.debug("Removing NIC %s of VM %s", nic, vm)
818

    
819
    kwargs = {
820
        "instance": vm.backend_vm_id,
821
        "nics": [("remove", nic.index, {})],
822
    }
823
    if vm.backend.use_hotplug():
824
        kwargs["hotplug"] = True
825
    if settings.TEST:
826
        kwargs["dry_run"] = True
827

    
828
    with pooled_rapi_client(vm) as client:
829
        jobID = client.ModifyInstance(**kwargs)
830
        # If the NIC has a tag for a firewall profile it must be deleted,
831
        # otherwise it may affect another NIC. XXX: Deleting the tag should
832
        # depend on the removing the NIC, but currently RAPI client does not
833
        # support this, this may result in clearing the firewall profile
834
        # without successfully removing the NIC. This issue will be fixed with
835
        # use of NIC UUIDs.
836
        firewall_profile = nic.firewall_profile
837
        if firewall_profile and firewall_profile != "DISABLED":
838
            tag = _firewall_tags[firewall_profile] % nic.index
839
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
840
                                      dry_run=settings.TEST)
841

    
842
        return jobID
843

    
844

    
845
def set_firewall_profile(vm, profile, index=0):
846
    try:
847
        tag = _firewall_tags[profile] % index
848
    except KeyError:
849
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
850

    
851
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
852

    
853
    with pooled_rapi_client(vm) as client:
854
        # Delete previous firewall tags
855
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
856
        delete_tags = [(t % index) for t in _firewall_tags.values()
857
                       if (t % index) in old_tags]
858
        if delete_tags:
859
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
860
                                      dry_run=settings.TEST)
861

    
862
        if profile != "DISABLED":
863
            client.AddInstanceTags(vm.backend_vm_id, [tag],
864
                                   dry_run=settings.TEST)
865

    
866
        # XXX NOP ModifyInstance call to force process_net_status to run
867
        # on the dispatcher
868
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
869
        client.ModifyInstance(vm.backend_vm_id,
870
                              os_name=os_name)
871
    return None
872

    
873

    
874
def get_instances(backend, bulk=True):
875
    with pooled_rapi_client(backend) as c:
876
        return c.GetInstances(bulk=bulk)
877

    
878

    
879
def get_nodes(backend, bulk=True):
880
    with pooled_rapi_client(backend) as c:
881
        return c.GetNodes(bulk=bulk)
882

    
883

    
884
def get_jobs(backend, bulk=True):
885
    with pooled_rapi_client(backend) as c:
886
        return c.GetJobs(bulk=bulk)
887

    
888

    
889
def get_physical_resources(backend):
890
    """ Get the physical resources of a backend.
891

892
    Get the resources of a backend as reported by the backend (not the db).
893

894
    """
895
    nodes = get_nodes(backend, bulk=True)
896
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
897
    res = {}
898
    for a in attr:
899
        res[a] = 0
900
    for n in nodes:
901
        # Filter out drained, offline and not vm_capable nodes since they will
902
        # not take part in the vm allocation process
903
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
904
        if can_host_vms and n['cnodes']:
905
            for a in attr:
906
                res[a] += int(n[a])
907
    return res
908

    
909

    
910
def update_backend_resources(backend, resources=None):
911
    """ Update the state of the backend resources in db.
912

913
    """
914

    
915
    if not resources:
916
        resources = get_physical_resources(backend)
917

    
918
    backend.mfree = resources['mfree']
919
    backend.mtotal = resources['mtotal']
920
    backend.dfree = resources['dfree']
921
    backend.dtotal = resources['dtotal']
922
    backend.pinst_cnt = resources['pinst_cnt']
923
    backend.ctotal = resources['ctotal']
924
    backend.updated = datetime.now()
925
    backend.save()
926

    
927

    
928
def get_memory_from_instances(backend):
929
    """ Get the memory that is used from instances.
930

931
    Get the used memory of a backend. Note: This is different for
932
    the real memory used, due to kvm's memory de-duplication.
933

934
    """
935
    with pooled_rapi_client(backend) as client:
936
        instances = client.GetInstances(bulk=True)
937
    mem = 0
938
    for i in instances:
939
        mem += i['oper_ram']
940
    return mem
941

    
942

    
943
def get_available_disk_templates(backend):
944
    """Get the list of available disk templates of a Ganeti backend.
945

946
    The list contains the disk templates that are enabled in the Ganeti backend
947
    and also included in ipolicy-disk-templates.
948

949
    """
950
    with pooled_rapi_client(backend) as c:
951
        info = c.GetInfo()
952
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
953
    try:
954
        enabled_disk_templates = info["enabled_disk_templates"]
955
        return [dp for dp in enabled_disk_templates
956
                if dp in ipolicy_disk_templates]
957
    except KeyError:
958
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
959
        return ipolicy_disk_templates
960

    
961

    
962
def update_backend_disk_templates(backend):
963
    disk_templates = get_available_disk_templates(backend)
964
    backend.disk_templates = disk_templates
965
    backend.save()
966

    
967

    
968
##
969
## Synchronized operations for reconciliation
970
##
971

    
972

    
973
def create_network_synced(network, backend):
974
    result = _create_network_synced(network, backend)
975
    if result[0] != 'success':
976
        return result
977
    result = connect_network_synced(network, backend)
978
    return result
979

    
980

    
981
def _create_network_synced(network, backend):
982
    with pooled_rapi_client(backend) as client:
983
        job = _create_network(network, backend)
984
        result = wait_for_job(client, job)
985
    return result
986

    
987

    
988
def connect_network_synced(network, backend):
989
    with pooled_rapi_client(backend) as client:
990
        for group in client.GetGroups():
991
            job = client.ConnectNetwork(network.backend_id, group,
992
                                        network.mode, network.link)
993
            result = wait_for_job(client, job)
994
            if result[0] != 'success':
995
                return result
996

    
997
    return result
998

    
999

    
1000
def wait_for_job(client, jobid):
1001
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1002
    status = result['job_info'][0]
1003
    while status not in ['success', 'error', 'cancel']:
1004
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1005
                                         [result], None)
1006
        status = result['job_info'][0]
1007

    
1008
    if status == 'success':
1009
        return (status, None)
1010
    else:
1011
        error = result['job_info'][1]
1012
        return (status, error)