Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ cb7b1c23

History | View | Annotate | Download (36.5 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, FloatingIP)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = 180
61

    
62
NIC_FIELDS = ["state", "mac", "ipv4", "ipv6", "network", "firewall_profile",
63
              "index"]
64

    
65

    
66
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
67
    """Handle quotas for updated VirtualMachine.
68

69
    Update quotas for the updated VirtualMachine based on the job that run on
70
    the Ganeti backend. If a commission has been already issued for this job,
71
    then this commission is just accepted or rejected based on the job status.
72
    Otherwise, a new commission for the given change is issued, that is also in
73
    force and auto-accept mode. In this case, previous commissions are
74
    rejected, since they reflect a previous state of the VM.
75

76
    """
77
    if job_status not in ["success", "error", "canceled"]:
78
        return vm
79

    
80
    # Check successful completion of a job will trigger any quotable change in
81
    # the VM state.
82
    action = utils.get_action_from_opcode(job_opcode, job_fields)
83
    if action == "BUILD":
84
        # Quotas for new VMs are automatically accepted by the API
85
        return vm
86
    commission_info = quotas.get_commission_info(vm, action=action,
87
                                                 action_fields=job_fields)
88

    
89
    if vm.task_job_id == job_id and vm.serial is not None:
90
        # Commission for this change has already been issued. So just
91
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
92
        # if fails, must be accepted, as the user must manually remove the
93
        # failed server
94
        serial = vm.serial
95
        if job_status == "success":
96
            quotas.accept_serial(serial)
97
        elif job_status in ["error", "canceled"]:
98
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
99
                      serial)
100
            quotas.reject_serial(serial)
101
        vm.serial = None
102
    elif job_status == "success" and commission_info is not None:
103
        log.debug("Expected job was %s. Processing job %s. Commission for"
104
                  " this job: %s", vm.task_job_id, job_id, commission_info)
105
        # Commission for this change has not been issued, or the issued
106
        # commission was unaware of the current change. Reject all previous
107
        # commissions and create a new one in forced mode!
108
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
109
                           % (vm, job_id))
110
        quotas.handle_resource_commission(vm, action,
111
                                          commission_info=commission_info,
112
                                          commission_name=commission_name,
113
                                          force=True,
114
                                          auto_accept=True)
115
        log.debug("Issued new commission: %s", vm.serial)
116

    
117
    return vm
118

    
119

    
120
@transaction.commit_on_success
121
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
122
                      job_fields=None):
123
    """Process a job progress notification from the backend
124

125
    Process an incoming message from the backend (currently Ganeti).
126
    Job notifications with a terminating status (sucess, error, or canceled),
127
    also update the operating state of the VM.
128

129
    """
130
    # See #1492, #1031, #1111 why this line has been removed
131
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
132
    if status not in [x[0] for x in BACKEND_STATUSES]:
133
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
134

    
135
    vm.backendjobid = jobid
136
    vm.backendjobstatus = status
137
    vm.backendopcode = opcode
138
    vm.backendlogmsg = logmsg
139

    
140
    if status in ["queued", "waiting", "running"]:
141
        vm.save()
142
        return
143

    
144
    if job_fields is None:
145
        job_fields = {}
146
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
147

    
148
    # Notifications of success change the operating state
149
    if status == "success":
150
        if state_for_success is not None:
151
            vm.operstate = state_for_success
152
        beparams = job_fields.get("beparams", None)
153
        if beparams:
154
            # Change the flavor of the VM
155
            _process_resize(vm, beparams)
156
        # Update backendtime only for jobs that have been successfully
157
        # completed, since only these jobs update the state of the VM. Else a
158
        # "race condition" may occur when a successful job (e.g.
159
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
160
        # in reversed order.
161
        vm.backendtime = etime
162

    
163
    if status in ["success", "error", "canceled"] and nics is not None:
164
        # Update the NICs of the VM
165
        _process_net_status(vm, etime, nics)
166

    
167
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
168
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
169
        vm.operstate = 'ERROR'
170
        vm.backendtime = etime
171
    elif opcode == 'OP_INSTANCE_REMOVE':
172
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
173
        # when no instance exists at the Ganeti backend.
174
        # See ticket #799 for all the details.
175
        if status == 'success' or (status == 'error' and
176
                                   not vm_exists_in_backend(vm)):
177
            # VM has been deleted
178
            for nic in vm.nics.all():
179
                # Release the IP
180
                release_nic_address(nic)
181
                # And delete the NIC.
182
                nic.delete()
183
            vm.deleted = True
184
            vm.operstate = state_for_success
185
            vm.backendtime = etime
186
            status = "success"
187

    
188
    if status in ["success", "error", "canceled"]:
189
        # Job is finalized: Handle quotas/commissioning
190
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
191
                              job_status=status, job_fields=job_fields)
192
        # and clear task fields
193
        if vm.task_job_id == jobid:
194
            vm.task = None
195
            vm.task_job_id = None
196

    
197
    vm.save()
198

    
199

    
200
def _process_resize(vm, beparams):
201
    """Change flavor of a VirtualMachine based on new beparams."""
202
    old_flavor = vm.flavor
203
    vcpus = beparams.get("vcpus", old_flavor.cpu)
204
    ram = beparams.get("maxmem", old_flavor.ram)
205
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
206
        return
207
    try:
208
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
209
                                        disk=old_flavor.disk,
210
                                        disk_template=old_flavor.disk_template)
211
    except Flavor.DoesNotExist:
212
        raise Exception("Can not find flavor for VM")
213
    vm.flavor = new_flavor
214
    vm.save()
215

    
216

    
217
@transaction.commit_on_success
218
def process_net_status(vm, etime, nics):
219
    """Wrap _process_net_status inside transaction."""
220
    _process_net_status(vm, etime, nics)
221

    
222

    
223
def _process_net_status(vm, etime, nics):
224
    """Process a net status notification from the backend
225

226
    Process an incoming message from the Ganeti backend,
227
    detailing the NIC configuration of a VM instance.
228

229
    Update the state of the VM in the DB accordingly.
230

231
    """
232
    ganeti_nics = process_ganeti_nics(nics)
233
    db_nics = dict([(nic.id, nic) for nic in vm.nics.all()])
234

    
235
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
236
    # guarantee that no deadlock will occur with Backend allocator.
237
    Backend.objects.select_for_update().get(id=vm.backend_id)
238

    
239
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
240
        db_nic = db_nics.get(nic_name)
241
        ganeti_nic = ganeti_nics.get(nic_name)
242
        if ganeti_nic is None:
243
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
244
            # state for more than 5 minutes, then we remove the NIC.
245
            # TODO: This is dangerous as the job may be stack in the queue, and
246
            # releasing the IP may lead to duplicate IP use.
247
            if nic.state != "BUILDING" or (nic.state == "BUILDING" and
248
               etime > nic.created + timedelta(seconds=BUILDING_NIC_TIMEOUT)):
249
                release_nic_address(nic)
250
                nic.delete()
251
            else:
252
                log.warning("Ignoring recent building NIC: %s", nic)
253
        elif db_nic is None:
254
            # NIC exists in Ganeti but not in DB
255
            if ganeti_nic["ipv4"]:
256
                network = ganeti_nic["network"]
257
                network.reserve_address(ganeti_nic["ipv4"])
258
            vm.nics.create(**ganeti_nic)
259
        elif not nics_are_equal(db_nic, ganeti_nic):
260
            # Special case where the IPv4 address has changed, because you
261
            # need to release the old IPv4 address and reserve the new one
262
            if db_nic.ipv4 != ganeti_nic["ipv4"]:
263
                release_nic_address(db_nic)
264
                if ganeti_nic["ipv4"]:
265
                    ganeti_nic["network"].reserve_address(ganeti_nic["ipv4"])
266

    
267
            # Update the NIC in DB with the values from Ganeti NIC
268
            [setattr(db_nic, f, ganeti_nic[f]) for f in NIC_FIELDS]
269
            db_nic.save()
270

    
271
            # Dummy update the network, to work with 'changed-since'
272
            db_nic.network.save()
273

    
274
    vm.backendtime = etime
275
    vm.save()
276

    
277

    
278
def nics_are_equal(db_nic, gnt_nic):
279
    for field in NIC_FIELDS:
280
        if getattr(db_nic, field) != gnt_nic[field]:
281
            return False
282
    return True
283

    
284

    
285
def process_ganeti_nics(ganeti_nics):
286
    """Process NIC dict from ganeti"""
287
    new_nics = []
288
    for index, gnic in enumerate(ganeti_nics):
289
        nic_name = gnic.get("name", None)
290
        if nic_name is not None:
291
            nic_id = utils.id_from_nic_name(nic_name)
292
        else:
293
            # Put as default value the index. If it is an unknown NIC to
294
            # synnefo it will be created automaticaly.
295
            nic_id = "unknown-" + str(index)
296
        network_name = gnic.get('network', '')
297
        network_id = utils.id_from_network_name(network_name)
298
        network = Network.objects.get(id=network_id)
299

    
300
        # Get the new nic info
301
        mac = gnic.get('mac')
302
        ipv4 = gnic.get('ip')
303
        ipv6 = mac2eui64(mac, network.subnet6)\
304
            if network.subnet6 is not None else None
305

    
306
        firewall = gnic.get('firewall')
307
        firewall_profile = _reverse_tags.get(firewall)
308
        if not firewall_profile and network.public:
309
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
310

    
311
        nic_info = {
312
            'index': index,
313
            'network': network,
314
            'mac': mac,
315
            'ipv4': ipv4,
316
            'ipv6': ipv6,
317
            'firewall_profile': firewall_profile,
318
            'state': 'ACTIVE'}
319

    
320
        new_nics.append((nic_id, nic_info))
321
    return dict(new_nics)
322

    
323

    
324
def release_nic_address(nic):
325
    """Release the IPv4 address of a NIC.
326

327
    Check if an instance's NIC has an IPv4 address and release it if it is not
328
    a Floating IP. If it is as Floating IP, then disassociate the FloatingIP
329
    from the machine.
330

331
    """
332

    
333
    if nic.ipv4:
334
        if nic.ip_type == "FLOATING":
335
            FloatingIP.objects.filter(machine=nic.machine_id,
336
                                      network=nic.network_id,
337
                                      ipv4=nic.ipv4).update(machine=None)
338
        else:
339
            nic.network.release_address(nic.ipv4)
340

    
341

    
342
@transaction.commit_on_success
343
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
344
    if status not in [x[0] for x in BACKEND_STATUSES]:
345
        raise Network.InvalidBackendMsgError(opcode, status)
346

    
347
    back_network.backendjobid = jobid
348
    back_network.backendjobstatus = status
349
    back_network.backendopcode = opcode
350
    back_network.backendlogmsg = logmsg
351

    
352
    network = back_network.network
353

    
354
    # Notifications of success change the operating state
355
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
356
    if status == 'success' and state_for_success is not None:
357
        back_network.operstate = state_for_success
358

    
359
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
360
        back_network.operstate = 'ERROR'
361
        back_network.backendtime = etime
362

    
363
    if opcode == 'OP_NETWORK_REMOVE':
364
        network_is_deleted = (status == "success")
365
        if network_is_deleted or (status == "error" and not
366
                                  network_exists_in_backend(back_network)):
367
            back_network.operstate = state_for_success
368
            back_network.deleted = True
369
            back_network.backendtime = etime
370

    
371
    if status == 'success':
372
        back_network.backendtime = etime
373
    back_network.save()
374
    # Also you must update the state of the Network!!
375
    update_network_state(network)
376

    
377

    
378
def update_network_state(network):
379
    """Update the state of a Network based on BackendNetwork states.
380

381
    Update the state of a Network based on the operstate of the networks in the
382
    backends that network exists.
383

384
    The state of the network is:
385
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
386
    * DELETED: If it is is 'DELETED' in all backends that have been created.
387

388
    This function also releases the resources (MAC prefix or Bridge) and the
389
    quotas for the network.
390

391
    """
392
    if network.deleted:
393
        # Network has already been deleted. Just assert that state is also
394
        # DELETED
395
        if not network.state == "DELETED":
396
            network.state = "DELETED"
397
            network.save()
398
        return
399

    
400
    backend_states = [s.operstate for s in network.backend_networks.all()]
401
    if not backend_states and network.action != "DESTROY":
402
        if network.state != "ACTIVE":
403
            network.state = "ACTIVE"
404
            network.save()
405
            return
406

    
407
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
408
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
409
                     "DELETED")
410

    
411
    # Release the resources on the deletion of the Network
412
    if deleted:
413
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
414
                 network.id, network.mac_prefix, network.link)
415
        network.deleted = True
416
        network.state = "DELETED"
417
        if network.mac_prefix:
418
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
419
                release_resource(res_type="mac_prefix",
420
                                 value=network.mac_prefix)
421
        if network.link:
422
            if network.FLAVORS[network.flavor]["link"] == "pool":
423
                release_resource(res_type="bridge", value=network.link)
424

    
425
        # Issue commission
426
        if network.userid:
427
            quotas.issue_and_accept_commission(network, delete=True)
428
            # the above has already saved the object and committed;
429
            # a second save would override others' changes, since the
430
            # object is now unlocked
431
            return
432
        elif not network.public:
433
            log.warning("Network %s does not have an owner!", network.id)
434
    network.save()
435

    
436

    
437
@transaction.commit_on_success
438
def process_network_modify(back_network, etime, jobid, opcode, status,
439
                           job_fields):
440
    assert (opcode == "OP_NETWORK_SET_PARAMS")
441
    if status not in [x[0] for x in BACKEND_STATUSES]:
442
        raise Network.InvalidBackendMsgError(opcode, status)
443

    
444
    back_network.backendjobid = jobid
445
    back_network.backendjobstatus = status
446
    back_network.opcode = opcode
447

    
448
    add_reserved_ips = job_fields.get("add_reserved_ips")
449
    if add_reserved_ips:
450
        net = back_network.network
451
        pool = net.get_pool()
452
        if add_reserved_ips:
453
            for ip in add_reserved_ips:
454
                pool.reserve(ip, external=True)
455
        pool.save()
456

    
457
    if status == 'success':
458
        back_network.backendtime = etime
459
    back_network.save()
460

    
461

    
462
@transaction.commit_on_success
463
def process_create_progress(vm, etime, progress):
464

    
465
    percentage = int(progress)
466

    
467
    # The percentage may exceed 100%, due to the way
468
    # snf-image:copy-progress tracks bytes read by image handling processes
469
    percentage = 100 if percentage > 100 else percentage
470
    if percentage < 0:
471
        raise ValueError("Percentage cannot be negative")
472

    
473
    # FIXME: log a warning here, see #1033
474
#   if last_update > percentage:
475
#       raise ValueError("Build percentage should increase monotonically " \
476
#                        "(old = %d, new = %d)" % (last_update, percentage))
477

    
478
    # This assumes that no message of type 'ganeti-create-progress' is going to
479
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
480
    # the instance is STARTED.  What if the two messages are processed by two
481
    # separate dispatcher threads, and the 'ganeti-op-status' message for
482
    # successful creation gets processed before the 'ganeti-create-progress'
483
    # message? [vkoukis]
484
    #
485
    #if not vm.operstate == 'BUILD':
486
    #    raise VirtualMachine.IllegalState("VM is not in building state")
487

    
488
    vm.buildpercentage = percentage
489
    vm.backendtime = etime
490
    vm.save()
491

    
492

    
493
@transaction.commit_on_success
494
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
495
                               details=None):
496
    """
497
    Create virtual machine instance diagnostic entry.
498

499
    :param vm: VirtualMachine instance to create diagnostic for.
500
    :param message: Diagnostic message.
501
    :param source: Diagnostic source identifier (e.g. image-helper).
502
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
503
    :param etime: The time the message occured (if available).
504
    :param details: Additional details or debug information.
505
    """
506
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
507
                                                   source_date=etime,
508
                                                   message=message,
509
                                                   details=details)
510

    
511

    
512
def create_instance(vm, nics, flavor, image):
513
    """`image` is a dictionary which should contain the keys:
514
            'backend_id', 'format' and 'metadata'
515

516
        metadata value should be a dictionary.
517
    """
518

    
519
    # Handle arguments to CreateInstance() as a dictionary,
520
    # initialize it based on a deployment-specific value.
521
    # This enables the administrator to override deployment-specific
522
    # arguments, such as the disk template to use, name of os provider
523
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
524
    #
525
    kw = vm.backend.get_create_params()
526
    kw['mode'] = 'create'
527
    kw['name'] = vm.backend_vm_id
528
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
529

    
530
    kw['disk_template'] = flavor.disk_template
531
    kw['disks'] = [{"size": flavor.disk * 1024}]
532
    provider = flavor.disk_provider
533
    if provider:
534
        kw['disks'][0]['provider'] = provider
535
        kw['disks'][0]['origin'] = flavor.disk_origin
536

    
537
    kw['nics'] = [{"name": nic.backend_uuid,
538
                   "network": nic.network.backend_id,
539
                   "ip": nic.ipv4}
540
                  for nic in nics]
541
    backend = vm.backend
542
    depend_jobs = []
543
    for nic in nics:
544
        network = Network.objects.select_for_update().get(id=nic.network.id)
545
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
546
                                                             network=network)
547
        if bnet.operstate != "ACTIVE":
548
            if network.public:
549
                msg = "Can not connect instance to network %s. Network is not"\
550
                      " ACTIVE in backend %s." % (network, backend)
551
                raise Exception(msg)
552
            else:
553
                jobs = create_network(network, backend, connect=True)
554
                if isinstance(jobs, list):
555
                    depend_jobs.extend(jobs)
556
                else:
557
                    depend_jobs.append(jobs)
558
    kw["depends"] = [[job, ["success", "error", "canceled"]]
559
                     for job in depend_jobs]
560

    
561
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
562
    # kw['os'] = settings.GANETI_OS_PROVIDER
563
    kw['ip_check'] = False
564
    kw['name_check'] = False
565

    
566
    # Do not specific a node explicitly, have
567
    # Ganeti use an iallocator instead
568
    #kw['pnode'] = rapi.GetNodes()[0]
569

    
570
    kw['dry_run'] = settings.TEST
571

    
572
    kw['beparams'] = {
573
        'auto_balance': True,
574
        'vcpus': flavor.cpu,
575
        'memory': flavor.ram}
576

    
577
    kw['osparams'] = {
578
        'config_url': vm.config_url,
579
        # Store image id and format to Ganeti
580
        'img_id': image['backend_id'],
581
        'img_format': image['format']}
582

    
583
    # Use opportunistic locking
584
    kw['opportunistic_locking'] = True
585

    
586
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
587
    # kw['hvparams'] = dict(serial_console=False)
588

    
589
    log.debug("Creating instance %s", utils.hide_pass(kw))
590
    with pooled_rapi_client(vm) as client:
591
        return client.CreateInstance(**kw)
592

    
593

    
594
def delete_instance(vm):
595
    with pooled_rapi_client(vm) as client:
596
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
597

    
598

    
599
def reboot_instance(vm, reboot_type):
600
    assert reboot_type in ('soft', 'hard')
601
    kwargs = {"instance": vm.backend_vm_id,
602
              "reboot_type": "hard"}
603
    # XXX: Currently shutdown_timeout parameter is not supported from the
604
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
605
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
606
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
607
    # of OS API is different from the one in Ganeti, and maps to
608
    # 'shutdown_timeout'.
609
    #if reboot_type == "hard":
610
    #    kwargs["shutdown_timeout"] = 0
611
    if settings.TEST:
612
        kwargs["dry_run"] = True
613
    with pooled_rapi_client(vm) as client:
614
        return client.RebootInstance(**kwargs)
615

    
616

    
617
def startup_instance(vm):
618
    with pooled_rapi_client(vm) as client:
619
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
620

    
621

    
622
def shutdown_instance(vm):
623
    with pooled_rapi_client(vm) as client:
624
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
625

    
626

    
627
def resize_instance(vm, vcpus, memory):
628
    beparams = {"vcpus": int(vcpus),
629
                "minmem": int(memory),
630
                "maxmem": int(memory)}
631
    with pooled_rapi_client(vm) as client:
632
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
633

    
634

    
635
def get_instance_console(vm):
636
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
637
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
638
    # useless (see #783).
639
    #
640
    # Until this is fixed on the Ganeti side, construct a console info reply
641
    # directly.
642
    #
643
    # WARNING: This assumes that VNC runs on port network_port on
644
    #          the instance's primary node, and is probably
645
    #          hypervisor-specific.
646
    #
647
    log.debug("Getting console for vm %s", vm)
648

    
649
    console = {}
650
    console['kind'] = 'vnc'
651

    
652
    with pooled_rapi_client(vm) as client:
653
        i = client.GetInstance(vm.backend_vm_id)
654

    
655
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
656
        raise Exception("hv parameter serial_console cannot be true")
657
    console['host'] = i['pnode']
658
    console['port'] = i['network_port']
659

    
660
    return console
661

    
662

    
663
def get_instance_info(vm):
664
    with pooled_rapi_client(vm) as client:
665
        return client.GetInstance(vm.backend_vm_id)
666

    
667

    
668
def vm_exists_in_backend(vm):
669
    try:
670
        get_instance_info(vm)
671
        return True
672
    except GanetiApiError as e:
673
        if e.code == 404:
674
            return False
675
        raise e
676

    
677

    
678
def get_network_info(backend_network):
679
    with pooled_rapi_client(backend_network) as client:
680
        return client.GetNetwork(backend_network.network.backend_id)
681

    
682

    
683
def network_exists_in_backend(backend_network):
684
    try:
685
        get_network_info(backend_network)
686
        return True
687
    except GanetiApiError as e:
688
        if e.code == 404:
689
            return False
690

    
691

    
692
def create_network(network, backend, connect=True):
693
    """Create a network in a Ganeti backend"""
694
    log.debug("Creating network %s in backend %s", network, backend)
695

    
696
    job_id = _create_network(network, backend)
697

    
698
    if connect:
699
        job_ids = connect_network(network, backend, depends=[job_id])
700
        return job_ids
701
    else:
702
        return [job_id]
703

    
704

    
705
def _create_network(network, backend):
706
    """Create a network."""
707

    
708
    tags = network.backend_tag
709
    if network.dhcp:
710
        tags.append('nfdhcpd')
711

    
712
    if network.public:
713
        conflicts_check = True
714
        tags.append('public')
715
    else:
716
        conflicts_check = False
717
        tags.append('private')
718

    
719
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
720
    # not support IPv6 only networks. To bypass this limitation, we create the
721
    # network with a dummy network subnet, and make Cyclades connect instances
722
    # to such networks, with address=None.
723
    subnet = network.subnet
724
    if subnet is None:
725
        subnet = "10.0.0.0/24"
726

    
727
    try:
728
        bn = BackendNetwork.objects.get(network=network, backend=backend)
729
        mac_prefix = bn.mac_prefix
730
    except BackendNetwork.DoesNotExist:
731
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
732
                        " does not exist" % (network.id, backend.id))
733

    
734
    with pooled_rapi_client(backend) as client:
735
        return client.CreateNetwork(network_name=network.backend_id,
736
                                    network=subnet,
737
                                    network6=network.subnet6,
738
                                    gateway=network.gateway,
739
                                    gateway6=network.gateway6,
740
                                    mac_prefix=mac_prefix,
741
                                    conflicts_check=conflicts_check,
742
                                    tags=tags)
743

    
744

    
745
def connect_network(network, backend, depends=[], group=None):
746
    """Connect a network to nodegroups."""
747
    log.debug("Connecting network %s to backend %s", network, backend)
748

    
749
    if network.public:
750
        conflicts_check = True
751
    else:
752
        conflicts_check = False
753

    
754
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
755
    with pooled_rapi_client(backend) as client:
756
        groups = [group] if group is not None else client.GetGroups()
757
        job_ids = []
758
        for group in groups:
759
            job_id = client.ConnectNetwork(network.backend_id, group,
760
                                           network.mode, network.link,
761
                                           conflicts_check,
762
                                           depends=depends)
763
            job_ids.append(job_id)
764
    return job_ids
765

    
766

    
767
def delete_network(network, backend, disconnect=True):
768
    log.debug("Deleting network %s from backend %s", network, backend)
769

    
770
    depends = []
771
    if disconnect:
772
        depends = disconnect_network(network, backend)
773
    _delete_network(network, backend, depends=depends)
774

    
775

    
776
def _delete_network(network, backend, depends=[]):
777
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
778
    with pooled_rapi_client(backend) as client:
779
        return client.DeleteNetwork(network.backend_id, depends)
780

    
781

    
782
def disconnect_network(network, backend, group=None):
783
    log.debug("Disconnecting network %s to backend %s", network, backend)
784

    
785
    with pooled_rapi_client(backend) as client:
786
        groups = [group] if group is not None else client.GetGroups()
787
        job_ids = []
788
        for group in groups:
789
            job_id = client.DisconnectNetwork(network.backend_id, group)
790
            job_ids.append(job_id)
791
    return job_ids
792

    
793

    
794
def connect_to_network(vm, nic):
795
    network = nic.network
796
    backend = vm.backend
797
    network = Network.objects.select_for_update().get(id=network.id)
798
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
799
                                                         network=network)
800
    depend_jobs = []
801
    if bnet.operstate != "ACTIVE":
802
        depend_jobs = create_network(network, backend, connect=True)
803

    
804
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
805

    
806
    nic = {'name': nic.backend_uuid,
807
           'network': network.backend_id,
808
           'ip': nic.ipv4}
809

    
810
    log.debug("Adding NIC %s to VM %s", nic, vm)
811

    
812
    kwargs = {
813
        "instance": vm.backend_vm_id,
814
        "nics": [("add", "-1", nic)],
815
        "depends": depends,
816
    }
817
    if vm.backend.use_hotplug():
818
        kwargs["hotplug"] = True
819
    if settings.TEST:
820
        kwargs["dry_run"] = True
821

    
822
    with pooled_rapi_client(vm) as client:
823
        return client.ModifyInstance(**kwargs)
824

    
825

    
826
def disconnect_from_network(vm, nic):
827
    log.debug("Removing NIC %s of VM %s", nic, vm)
828

    
829
    kwargs = {
830
        "instance": vm.backend_vm_id,
831
        "nics": [("remove", nic.backend_uuid, {})],
832
    }
833
    if vm.backend.use_hotplug():
834
        kwargs["hotplug"] = True
835
    if settings.TEST:
836
        kwargs["dry_run"] = True
837

    
838
    with pooled_rapi_client(vm) as client:
839
        jobID = client.ModifyInstance(**kwargs)
840
        # If the NIC has a tag for a firewall profile it must be deleted,
841
        # otherwise it may affect another NIC. XXX: Deleting the tag should
842
        # depend on the removing the NIC, but currently RAPI client does not
843
        # support this, this may result in clearing the firewall profile
844
        # without successfully removing the NIC. This issue will be fixed with
845
        # use of NIC UUIDs.
846
        firewall_profile = nic.firewall_profile
847
        if firewall_profile and firewall_profile != "DISABLED":
848
            tag = _firewall_tags[firewall_profile] % nic.index
849
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
850
                                      dry_run=settings.TEST)
851

    
852
        return jobID
853

    
854

    
855
def set_firewall_profile(vm, profile, index=0):
856
    try:
857
        tag = _firewall_tags[profile] % index
858
    except KeyError:
859
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
860

    
861
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
862

    
863
    with pooled_rapi_client(vm) as client:
864
        # Delete previous firewall tags
865
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
866
        delete_tags = [(t % index) for t in _firewall_tags.values()
867
                       if (t % index) in old_tags]
868
        if delete_tags:
869
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
870
                                      dry_run=settings.TEST)
871

    
872
        if profile != "DISABLED":
873
            client.AddInstanceTags(vm.backend_vm_id, [tag],
874
                                   dry_run=settings.TEST)
875

    
876
        # XXX NOP ModifyInstance call to force process_net_status to run
877
        # on the dispatcher
878
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
879
        client.ModifyInstance(vm.backend_vm_id,
880
                              os_name=os_name)
881
    return None
882

    
883

    
884
def get_instances(backend, bulk=True):
885
    with pooled_rapi_client(backend) as c:
886
        return c.GetInstances(bulk=bulk)
887

    
888

    
889
def get_nodes(backend, bulk=True):
890
    with pooled_rapi_client(backend) as c:
891
        return c.GetNodes(bulk=bulk)
892

    
893

    
894
def get_jobs(backend, bulk=True):
895
    with pooled_rapi_client(backend) as c:
896
        return c.GetJobs(bulk=bulk)
897

    
898

    
899
def get_physical_resources(backend):
900
    """ Get the physical resources of a backend.
901

902
    Get the resources of a backend as reported by the backend (not the db).
903

904
    """
905
    nodes = get_nodes(backend, bulk=True)
906
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
907
    res = {}
908
    for a in attr:
909
        res[a] = 0
910
    for n in nodes:
911
        # Filter out drained, offline and not vm_capable nodes since they will
912
        # not take part in the vm allocation process
913
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
914
        if can_host_vms and n['cnodes']:
915
            for a in attr:
916
                res[a] += int(n[a])
917
    return res
918

    
919

    
920
def update_backend_resources(backend, resources=None):
921
    """ Update the state of the backend resources in db.
922

923
    """
924

    
925
    if not resources:
926
        resources = get_physical_resources(backend)
927

    
928
    backend.mfree = resources['mfree']
929
    backend.mtotal = resources['mtotal']
930
    backend.dfree = resources['dfree']
931
    backend.dtotal = resources['dtotal']
932
    backend.pinst_cnt = resources['pinst_cnt']
933
    backend.ctotal = resources['ctotal']
934
    backend.updated = datetime.now()
935
    backend.save()
936

    
937

    
938
def get_memory_from_instances(backend):
939
    """ Get the memory that is used from instances.
940

941
    Get the used memory of a backend. Note: This is different for
942
    the real memory used, due to kvm's memory de-duplication.
943

944
    """
945
    with pooled_rapi_client(backend) as client:
946
        instances = client.GetInstances(bulk=True)
947
    mem = 0
948
    for i in instances:
949
        mem += i['oper_ram']
950
    return mem
951

    
952

    
953
def get_available_disk_templates(backend):
954
    """Get the list of available disk templates of a Ganeti backend.
955

956
    The list contains the disk templates that are enabled in the Ganeti backend
957
    and also included in ipolicy-disk-templates.
958

959
    """
960
    with pooled_rapi_client(backend) as c:
961
        info = c.GetInfo()
962
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
963
    try:
964
        enabled_disk_templates = info["enabled_disk_templates"]
965
        return [dp for dp in enabled_disk_templates
966
                if dp in ipolicy_disk_templates]
967
    except KeyError:
968
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
969
        return ipolicy_disk_templates
970

    
971

    
972
def update_backend_disk_templates(backend):
973
    disk_templates = get_available_disk_templates(backend)
974
    backend.disk_templates = disk_templates
975
    backend.save()
976

    
977

    
978
##
979
## Synchronized operations for reconciliation
980
##
981

    
982

    
983
def create_network_synced(network, backend):
984
    result = _create_network_synced(network, backend)
985
    if result[0] != 'success':
986
        return result
987
    result = connect_network_synced(network, backend)
988
    return result
989

    
990

    
991
def _create_network_synced(network, backend):
992
    with pooled_rapi_client(backend) as client:
993
        job = _create_network(network, backend)
994
        result = wait_for_job(client, job)
995
    return result
996

    
997

    
998
def connect_network_synced(network, backend):
999
    with pooled_rapi_client(backend) as client:
1000
        for group in client.GetGroups():
1001
            job = client.ConnectNetwork(network.backend_id, group,
1002
                                        network.mode, network.link)
1003
            result = wait_for_job(client, job)
1004
            if result[0] != 'success':
1005
                return result
1006

    
1007
    return result
1008

    
1009

    
1010
def wait_for_job(client, jobid):
1011
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1012
    status = result['job_info'][0]
1013
    while status not in ['success', 'error', 'cancel']:
1014
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1015
                                         [result], None)
1016
        status = result['job_info'][0]
1017

    
1018
    if status == 'success':
1019
        return (status, None)
1020
    else:
1021
        error = result['job_info'][1]
1022
        return (status, error)