Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 562bf712

History | View | Annotate | Download (35.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               FloatingIP,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46
from synnefo.logic.rapi import GanetiApiError
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59

    
60
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
61
    """Handle quotas for updated VirtualMachine.
62

63
    Update quotas for the updated VirtualMachine based on the job that run on
64
    the Ganeti backend. If a commission has been already issued for this job,
65
    then this commission is just accepted or rejected based on the job status.
66
    Otherwise, a new commission for the given change is issued, that is also in
67
    force and auto-accept mode. In this case, previous commissions are
68
    rejected, since they reflect a previous state of the VM.
69

70
    """
71
    if job_status not in ["success", "error", "canceled"]:
72
        return
73

    
74
    # Check successful completion of a job will trigger any quotable change in
75
    # the VM state.
76
    action = utils.get_action_from_opcode(job_opcode, job_fields)
77
    commission_info = quotas.get_commission_info(vm, action=action,
78
                                                 action_fields=job_fields)
79

    
80
    if vm.task_job_id == job_id and vm.serial is not None:
81
        # Commission for this change has already been issued. So just
82
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
83
        # if fails, must be accepted, as the user must manually remove the
84
        # failed server
85
        serial = vm.serial
86
        if job_status == "success" or job_opcode == "OP_INSTANCE_CREATE":
87
            quotas.accept_serial(serial)
88
        elif job_status in ["error", "canceled"]:
89
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
90
                      serial)
91
            quotas.reject_serial(serial)
92
        vm.serial = None
93
    elif job_status == "success" and commission_info is not None:
94
        log.debug("Expected job was %s. Processing job %s. Commission for"
95
                  " this job: %s", vm.task_job_id, job_id, commission_info)
96
        # Commission for this change has not been issued, or the issued
97
        # commission was unaware of the current change. Reject all previous
98
        # commissions and create a new one in forced mode!
99
        previous_serial = vm.serial
100
        if previous_serial and not previous_serial.resolved:
101
            quotas.resolve_vm_commission(previous_serial)
102
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
103
                           % (vm, job_id))
104
        serial = quotas.issue_commission(user=vm.userid,
105
                                         source=quotas.DEFAULT_SOURCE,
106
                                         provisions=commission_info,
107
                                         name=commission_name,
108
                                         force=True,
109
                                         auto_accept=True)
110
        # Clear VM's serial. Expected job may arrive later. However correlated
111
        # serial must not be accepted, since it reflects a previous VM state
112
        vm.serial = None
113

    
114
    return vm
115

    
116

    
117
@transaction.commit_on_success
118
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
119
                      beparams=None):
120
    """Process a job progress notification from the backend
121

122
    Process an incoming message from the backend (currently Ganeti).
123
    Job notifications with a terminating status (sucess, error, or canceled),
124
    also update the operating state of the VM.
125

126
    """
127
    # See #1492, #1031, #1111 why this line has been removed
128
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
129
    if status not in [x[0] for x in BACKEND_STATUSES]:
130
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
131

    
132
    vm.backendjobid = jobid
133
    vm.backendjobstatus = status
134
    vm.backendopcode = opcode
135
    vm.backendlogmsg = logmsg
136

    
137
    if status in ["queued", "waiting", "running"]:
138
        vm.save()
139
        return
140

    
141
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
142

    
143
    # Notifications of success change the operating state
144
    if status == "success":
145
        if state_for_success is not None:
146
            vm.operstate = state_for_success
147
        if beparams:
148
            # Change the flavor of the VM
149
            _process_resize(vm, beparams)
150
        # Update backendtime only for jobs that have been successfully
151
        # completed, since only these jobs update the state of the VM. Else a
152
        # "race condition" may occur when a successful job (e.g.
153
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
154
        # in reversed order.
155
        vm.backendtime = etime
156

    
157
    if status in ["success", "error", "canceled"] and nics is not None:
158
        # Update the NICs of the VM
159
        _process_net_status(vm, etime, nics)
160

    
161
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
162
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
163
        vm.operstate = 'ERROR'
164
        vm.backendtime = etime
165
    elif opcode == 'OP_INSTANCE_REMOVE':
166
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
167
        # when no instance exists at the Ganeti backend.
168
        # See ticket #799 for all the details.
169
        if status == 'success' or (status == 'error' and
170
                                   not vm_exists_in_backend(vm)):
171
            # VM has been deleted. Release the instance IPs
172
            release_instance_ips(vm, [])
173
            # And delete the releated NICs (must be performed after release!)
174
            vm.nics.all().delete()
175
            vm.deleted = True
176
            vm.operstate = state_for_success
177
            vm.backendtime = etime
178
            status = "success"
179

    
180
    if status in ["success", "error", "canceled"]:
181
        # Job is finalized: Handle quotas/commissioning
182
        job_fields = {"nics": nics, "beparams": beparams}
183
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
184
                              job_status=status, job_fields=job_fields)
185
        # and clear task fields
186
        if vm.task_job_id == jobid:
187
            vm.task = None
188
            vm.task_job_id = None
189

    
190
    vm.save()
191

    
192

    
193
def _process_resize(vm, beparams):
194
    """Change flavor of a VirtualMachine based on new beparams."""
195
    old_flavor = vm.flavor
196
    vcpus = beparams.get("vcpus", old_flavor.cpu)
197
    ram = beparams.get("maxmem", old_flavor.ram)
198
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
199
        return
200
    try:
201
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
202
                                        disk=old_flavor.disk,
203
                                        disk_template=old_flavor.disk_template)
204
    except Flavor.DoesNotExist:
205
        raise Exception("Can not find flavor for VM")
206
    vm.flavor = new_flavor
207
    vm.save()
208

    
209

    
210
@transaction.commit_on_success
211
def process_net_status(vm, etime, nics):
212
    """Wrap _process_net_status inside transaction."""
213
    _process_net_status(vm, etime, nics)
214

    
215

    
216
def _process_net_status(vm, etime, nics):
217
    """Process a net status notification from the backend
218

219
    Process an incoming message from the Ganeti backend,
220
    detailing the NIC configuration of a VM instance.
221

222
    Update the state of the VM in the DB accordingly.
223
    """
224

    
225
    ganeti_nics = process_ganeti_nics(nics)
226
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
227
        log.debug("NICs for VM %s have not changed", vm)
228
        return
229

    
230
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
231
    # guarantee that no deadlock will occur with Backend allocator.
232
    Backend.objects.select_for_update().get(id=vm.backend_id)
233

    
234
    # NICs have changed. Release the instance IPs
235
    release_instance_ips(vm, ganeti_nics)
236
    # And delete the releated NICs (must be performed after release!)
237
    vm.nics.all().delete()
238

    
239
    for nic in ganeti_nics:
240
        ipv4 = nic["ipv4"]
241
        net = nic['network']
242
        if ipv4:
243
            net.reserve_address(ipv4)
244

    
245
        nic['dirty'] = False
246
        vm.nics.create(**nic)
247
        # Dummy save the network, because UI uses changed-since for VMs
248
        # and Networks in order to show the VM NICs
249
        net.save()
250

    
251
    vm.backendtime = etime
252
    vm.save()
253

    
254

    
255
def process_ganeti_nics(ganeti_nics):
256
    """Process NIC dict from ganeti hooks."""
257
    new_nics = []
258
    for i, new_nic in enumerate(ganeti_nics):
259
        network = new_nic.get('network', '')
260
        n = str(network)
261
        pk = utils.id_from_network_name(n)
262

    
263
        net = Network.objects.get(pk=pk)
264

    
265
        # Get the new nic info
266
        mac = new_nic.get('mac')
267
        ipv4 = new_nic.get('ip')
268
        ipv6 = mac2eui64(mac, net.subnet6) if net.subnet6 is not None else None
269

    
270
        firewall = new_nic.get('firewall')
271
        firewall_profile = _reverse_tags.get(firewall)
272
        if not firewall_profile and net.public:
273
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
274

    
275
        nic = {
276
            'index': i,
277
            'network': net,
278
            'mac': mac,
279
            'ipv4': ipv4,
280
            'ipv6': ipv6,
281
            'firewall_profile': firewall_profile,
282
            'state': 'ACTIVE'}
283

    
284
        new_nics.append(nic)
285
    return new_nics
286

    
287

    
288
def nics_changed(old_nics, new_nics):
289
    """Return True if NICs have changed in any way."""
290
    if len(old_nics) != len(new_nics):
291
        return True
292
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
293
    for old_nic, new_nic in zip(old_nics, new_nics):
294
        for field in fields:
295
            if getattr(old_nic, field) != new_nic[field]:
296
                return True
297
    return False
298

    
299

    
300
def release_instance_ips(vm, ganeti_nics):
301
    old_addresses = set(vm.nics.values_list("network", "ipv4"))
302
    new_addresses = set(map(lambda nic: (nic["network"].id, nic["ipv4"]),
303
                            ganeti_nics))
304
    to_release = old_addresses - new_addresses
305
    for (network_id, ipv4) in to_release:
306
        if ipv4:
307
            # Get X-Lock before searching floating IP, to exclusively search
308
            # and release floating IP. Otherwise you may release a floating IP
309
            # that has been just reserved.
310
            net = Network.objects.select_for_update().get(id=network_id)
311
            if net.floating_ip_pool:
312
                try:
313
                    floating_ip = net.floating_ips.select_for_update()\
314
                                                  .get(ipv4=ipv4, machine=vm,
315
                                                       deleted=False)
316
                    floating_ip.machine = None
317
                    floating_ip.save()
318
                except FloatingIP.DoesNotExist:
319
                    net.release_address(ipv4)
320
            else:
321
                net.release_address(ipv4)
322

    
323

    
324
@transaction.commit_on_success
325
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
326
    if status not in [x[0] for x in BACKEND_STATUSES]:
327
        raise Network.InvalidBackendMsgError(opcode, status)
328

    
329
    back_network.backendjobid = jobid
330
    back_network.backendjobstatus = status
331
    back_network.backendopcode = opcode
332
    back_network.backendlogmsg = logmsg
333

    
334
    network = back_network.network
335

    
336
    # Notifications of success change the operating state
337
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
338
    if status == 'success' and state_for_success is not None:
339
        back_network.operstate = state_for_success
340

    
341
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
342
        back_network.operstate = 'ERROR'
343
        back_network.backendtime = etime
344

    
345
    if opcode == 'OP_NETWORK_REMOVE':
346
        network_is_deleted = (status == "success")
347
        if network_is_deleted or (status == "error" and not
348
                                  network_exists_in_backend(back_network)):
349
            back_network.operstate = state_for_success
350
            back_network.deleted = True
351
            back_network.backendtime = etime
352

    
353
    if status == 'success':
354
        back_network.backendtime = etime
355
    back_network.save()
356
    # Also you must update the state of the Network!!
357
    update_network_state(network)
358

    
359

    
360
def update_network_state(network):
361
    """Update the state of a Network based on BackendNetwork states.
362

363
    Update the state of a Network based on the operstate of the networks in the
364
    backends that network exists.
365

366
    The state of the network is:
367
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
368
    * DELETED: If it is is 'DELETED' in all backends that have been created.
369

370
    This function also releases the resources (MAC prefix or Bridge) and the
371
    quotas for the network.
372

373
    """
374
    if network.deleted:
375
        # Network has already been deleted. Just assert that state is also
376
        # DELETED
377
        if not network.state == "DELETED":
378
            network.state = "DELETED"
379
            network.save()
380
        return
381

    
382
    backend_states = [s.operstate for s in network.backend_networks.all()]
383
    if not backend_states and network.action != "DESTROY":
384
        if network.state != "ACTIVE":
385
            network.state = "ACTIVE"
386
            network.save()
387
            return
388

    
389
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
390
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
391
                     "DELETED")
392

    
393
    # Release the resources on the deletion of the Network
394
    if deleted:
395
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
396
                 network.id, network.mac_prefix, network.link)
397
        network.deleted = True
398
        network.state = "DELETED"
399
        if network.mac_prefix:
400
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
401
                release_resource(res_type="mac_prefix",
402
                                 value=network.mac_prefix)
403
        if network.link:
404
            if network.FLAVORS[network.flavor]["link"] == "pool":
405
                release_resource(res_type="bridge", value=network.link)
406

    
407
        # Issue commission
408
        if network.userid:
409
            quotas.issue_and_accept_commission(network, delete=True)
410
            # the above has already saved the object and committed;
411
            # a second save would override others' changes, since the
412
            # object is now unlocked
413
            return
414
        elif not network.public:
415
            log.warning("Network %s does not have an owner!", network.id)
416
    network.save()
417

    
418

    
419
@transaction.commit_on_success
420
def process_network_modify(back_network, etime, jobid, opcode, status,
421
                           add_reserved_ips, remove_reserved_ips):
422
    assert (opcode == "OP_NETWORK_SET_PARAMS")
423
    if status not in [x[0] for x in BACKEND_STATUSES]:
424
        raise Network.InvalidBackendMsgError(opcode, status)
425

    
426
    back_network.backendjobid = jobid
427
    back_network.backendjobstatus = status
428
    back_network.opcode = opcode
429

    
430
    if add_reserved_ips or remove_reserved_ips:
431
        net = back_network.network
432
        pool = net.get_pool()
433
        if add_reserved_ips:
434
            for ip in add_reserved_ips:
435
                pool.reserve(ip, external=True)
436
        if remove_reserved_ips:
437
            for ip in remove_reserved_ips:
438
                pool.put(ip, external=True)
439
        pool.save()
440

    
441
    if status == 'success':
442
        back_network.backendtime = etime
443
    back_network.save()
444

    
445

    
446
@transaction.commit_on_success
447
def process_create_progress(vm, etime, progress):
448

    
449
    percentage = int(progress)
450

    
451
    # The percentage may exceed 100%, due to the way
452
    # snf-image:copy-progress tracks bytes read by image handling processes
453
    percentage = 100 if percentage > 100 else percentage
454
    if percentage < 0:
455
        raise ValueError("Percentage cannot be negative")
456

    
457
    # FIXME: log a warning here, see #1033
458
#   if last_update > percentage:
459
#       raise ValueError("Build percentage should increase monotonically " \
460
#                        "(old = %d, new = %d)" % (last_update, percentage))
461

    
462
    # This assumes that no message of type 'ganeti-create-progress' is going to
463
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
464
    # the instance is STARTED.  What if the two messages are processed by two
465
    # separate dispatcher threads, and the 'ganeti-op-status' message for
466
    # successful creation gets processed before the 'ganeti-create-progress'
467
    # message? [vkoukis]
468
    #
469
    #if not vm.operstate == 'BUILD':
470
    #    raise VirtualMachine.IllegalState("VM is not in building state")
471

    
472
    vm.buildpercentage = percentage
473
    vm.backendtime = etime
474
    vm.save()
475

    
476

    
477
@transaction.commit_on_success
478
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
479
                               details=None):
480
    """
481
    Create virtual machine instance diagnostic entry.
482

483
    :param vm: VirtualMachine instance to create diagnostic for.
484
    :param message: Diagnostic message.
485
    :param source: Diagnostic source identifier (e.g. image-helper).
486
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
487
    :param etime: The time the message occured (if available).
488
    :param details: Additional details or debug information.
489
    """
490
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
491
                                                   source_date=etime,
492
                                                   message=message,
493
                                                   details=details)
494

    
495

    
496
def create_instance(vm, nics, flavor, image):
497
    """`image` is a dictionary which should contain the keys:
498
            'backend_id', 'format' and 'metadata'
499

500
        metadata value should be a dictionary.
501
    """
502

    
503
    # Handle arguments to CreateInstance() as a dictionary,
504
    # initialize it based on a deployment-specific value.
505
    # This enables the administrator to override deployment-specific
506
    # arguments, such as the disk template to use, name of os provider
507
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
508
    #
509
    kw = vm.backend.get_create_params()
510
    kw['mode'] = 'create'
511
    kw['name'] = vm.backend_vm_id
512
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
513

    
514
    kw['disk_template'] = flavor.disk_template
515
    kw['disks'] = [{"size": flavor.disk * 1024}]
516
    provider = flavor.disk_provider
517
    if provider:
518
        kw['disks'][0]['provider'] = provider
519
        kw['disks'][0]['origin'] = flavor.disk_origin
520

    
521
    kw['nics'] = [{"network": nic.network.backend_id, "ip": nic.ipv4}
522
                  for nic in nics]
523
    backend = vm.backend
524
    depend_jobs = []
525
    for nic in nics:
526
        network = Network.objects.select_for_update().get(id=nic.network.id)
527
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
528
                                                             network=network)
529
        if bnet.operstate != "ACTIVE":
530
            if network.public:
531
                msg = "Can not connect instance to network %s. Network is not"\
532
                      " ACTIVE in backend %s." % (network, backend)
533
                raise Exception(msg)
534
            else:
535
                jobs = create_network(network, backend, connect=True)
536
                if isinstance(jobs, list):
537
                    depend_jobs.extend(jobs)
538
                else:
539
                    depend_jobs.append(jobs)
540
    kw["depends"] = [[job, ["success", "error", "canceled"]]
541
                     for job in depend_jobs]
542

    
543
    if vm.backend.use_hotplug():
544
        kw['hotplug'] = True
545
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
546
    # kw['os'] = settings.GANETI_OS_PROVIDER
547
    kw['ip_check'] = False
548
    kw['name_check'] = False
549

    
550
    # Do not specific a node explicitly, have
551
    # Ganeti use an iallocator instead
552
    #kw['pnode'] = rapi.GetNodes()[0]
553

    
554
    kw['dry_run'] = settings.TEST
555

    
556
    kw['beparams'] = {
557
        'auto_balance': True,
558
        'vcpus': flavor.cpu,
559
        'memory': flavor.ram}
560

    
561
    kw['osparams'] = {
562
        'config_url': vm.config_url,
563
        # Store image id and format to Ganeti
564
        'img_id': image['backend_id'],
565
        'img_format': image['format']}
566

    
567
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
568
    # kw['hvparams'] = dict(serial_console=False)
569

    
570
    log.debug("Creating instance %s", utils.hide_pass(kw))
571
    with pooled_rapi_client(vm) as client:
572
        return client.CreateInstance(**kw)
573

    
574

    
575
def delete_instance(vm):
576
    with pooled_rapi_client(vm) as client:
577
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
578

    
579

    
580
def reboot_instance(vm, reboot_type):
581
    assert reboot_type in ('soft', 'hard')
582
    kwargs = {"instance": vm.backend_vm_id,
583
              "reboot_type": "hard"}
584
    # XXX: Currently shutdown_timeout parameter is not supported from the
585
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
586
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
587
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
588
    # of OS API is different from the one in Ganeti, and maps to
589
    # 'shutdown_timeout'.
590
    #if reboot_type == "hard":
591
    #    kwargs["shutdown_timeout"] = 0
592
    if settings.TEST:
593
        kwargs["dry_run"] = True
594
    with pooled_rapi_client(vm) as client:
595
        return client.RebootInstance(**kwargs)
596

    
597

    
598
def startup_instance(vm):
599
    with pooled_rapi_client(vm) as client:
600
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
601

    
602

    
603
def shutdown_instance(vm):
604
    with pooled_rapi_client(vm) as client:
605
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
606

    
607

    
608
def resize_instance(vm, vcpus, memory):
609
    beparams = {"vcpus": int(vcpus),
610
                "minmem": int(memory),
611
                "maxmem": int(memory)}
612
    with pooled_rapi_client(vm) as client:
613
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
614

    
615

    
616
def get_instance_console(vm):
617
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
618
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
619
    # useless (see #783).
620
    #
621
    # Until this is fixed on the Ganeti side, construct a console info reply
622
    # directly.
623
    #
624
    # WARNING: This assumes that VNC runs on port network_port on
625
    #          the instance's primary node, and is probably
626
    #          hypervisor-specific.
627
    #
628
    log.debug("Getting console for vm %s", vm)
629

    
630
    console = {}
631
    console['kind'] = 'vnc'
632

    
633
    with pooled_rapi_client(vm) as client:
634
        i = client.GetInstance(vm.backend_vm_id)
635

    
636
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
637
        raise Exception("hv parameter serial_console cannot be true")
638
    console['host'] = i['pnode']
639
    console['port'] = i['network_port']
640

    
641
    return console
642

    
643

    
644
def get_instance_info(vm):
645
    with pooled_rapi_client(vm) as client:
646
        return client.GetInstance(vm.backend_vm_id)
647

    
648

    
649
def vm_exists_in_backend(vm):
650
    try:
651
        get_instance_info(vm)
652
        return True
653
    except GanetiApiError as e:
654
        if e.code == 404:
655
            return False
656
        raise e
657

    
658

    
659
def get_network_info(backend_network):
660
    with pooled_rapi_client(backend_network) as client:
661
        return client.GetNetwork(backend_network.network.backend_id)
662

    
663

    
664
def network_exists_in_backend(backend_network):
665
    try:
666
        get_network_info(backend_network)
667
        return True
668
    except GanetiApiError as e:
669
        if e.code == 404:
670
            return False
671

    
672

    
673
def create_network(network, backend, connect=True):
674
    """Create a network in a Ganeti backend"""
675
    log.debug("Creating network %s in backend %s", network, backend)
676

    
677
    job_id = _create_network(network, backend)
678

    
679
    if connect:
680
        job_ids = connect_network(network, backend, depends=[job_id])
681
        return job_ids
682
    else:
683
        return [job_id]
684

    
685

    
686
def _create_network(network, backend):
687
    """Create a network."""
688

    
689
    network_type = network.public and 'public' or 'private'
690

    
691
    tags = network.backend_tag
692
    if network.dhcp:
693
        tags.append('nfdhcpd')
694

    
695
    if network.public:
696
        conflicts_check = True
697
    else:
698
        conflicts_check = False
699

    
700
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
701
    # not support IPv6 only networks. To bypass this limitation, we create the
702
    # network with a dummy network subnet, and make Cyclades connect instances
703
    # to such networks, with address=None.
704
    subnet = network.subnet
705
    if subnet is None:
706
        subnet = "10.0.0.0/24"
707

    
708
    try:
709
        bn = BackendNetwork.objects.get(network=network, backend=backend)
710
        mac_prefix = bn.mac_prefix
711
    except BackendNetwork.DoesNotExist:
712
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
713
                        " does not exist" % (network.id, backend.id))
714

    
715
    with pooled_rapi_client(backend) as client:
716
        return client.CreateNetwork(network_name=network.backend_id,
717
                                    network=subnet,
718
                                    network6=network.subnet6,
719
                                    gateway=network.gateway,
720
                                    gateway6=network.gateway6,
721
                                    network_type=network_type,
722
                                    mac_prefix=mac_prefix,
723
                                    conflicts_check=conflicts_check,
724
                                    tags=tags)
725

    
726

    
727
def connect_network(network, backend, depends=[], group=None):
728
    """Connect a network to nodegroups."""
729
    log.debug("Connecting network %s to backend %s", network, backend)
730

    
731
    if network.public:
732
        conflicts_check = True
733
    else:
734
        conflicts_check = False
735

    
736
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
737
    with pooled_rapi_client(backend) as client:
738
        groups = [group] if group is not None else client.GetGroups()
739
        job_ids = []
740
        for group in groups:
741
            job_id = client.ConnectNetwork(network.backend_id, group,
742
                                           network.mode, network.link,
743
                                           conflicts_check,
744
                                           depends=depends)
745
            job_ids.append(job_id)
746
    return job_ids
747

    
748

    
749
def delete_network(network, backend, disconnect=True):
750
    log.debug("Deleting network %s from backend %s", network, backend)
751

    
752
    depends = []
753
    if disconnect:
754
        depends = disconnect_network(network, backend)
755
    _delete_network(network, backend, depends=depends)
756

    
757

    
758
def _delete_network(network, backend, depends=[]):
759
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
760
    with pooled_rapi_client(backend) as client:
761
        return client.DeleteNetwork(network.backend_id, depends)
762

    
763

    
764
def disconnect_network(network, backend, group=None):
765
    log.debug("Disconnecting network %s to backend %s", network, backend)
766

    
767
    with pooled_rapi_client(backend) as client:
768
        groups = [group] if group is not None else client.GetGroups()
769
        job_ids = []
770
        for group in groups:
771
            job_id = client.DisconnectNetwork(network.backend_id, group)
772
            job_ids.append(job_id)
773
    return job_ids
774

    
775

    
776
def connect_to_network(vm, nic):
777
    network = nic.network
778
    backend = vm.backend
779
    network = Network.objects.select_for_update().get(id=network.id)
780
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
781
                                                         network=network)
782
    depend_jobs = []
783
    if bnet.operstate != "ACTIVE":
784
        depend_jobs = create_network(network, backend, connect=True)
785

    
786
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
787

    
788
    nic = {'ip': nic.ipv4, 'network': network.backend_id}
789

    
790
    log.debug("Connecting NIC %s to VM %s", nic, vm)
791

    
792
    kwargs = {
793
        "instance": vm.backend_vm_id,
794
        "nics": [("add", nic)],
795
        "depends": depends,
796
    }
797
    if vm.backend.use_hotplug():
798
        kwargs["hotplug"] = True
799
    if settings.TEST:
800
        kwargs["dry_run"] = True
801

    
802
    with pooled_rapi_client(vm) as client:
803
        return client.ModifyInstance(**kwargs)
804

    
805

    
806
def disconnect_from_network(vm, nic):
807
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
808

    
809
    kwargs = {
810
        "instance": vm.backend_vm_id,
811
        "nics": [("remove", nic.index, {})],
812
    }
813
    if vm.backend.use_hotplug():
814
        kwargs["hotplug"] = True
815
    if settings.TEST:
816
        kwargs["dry_run"] = True
817

    
818
    with pooled_rapi_client(vm) as client:
819
        jobID = client.ModifyInstance(**kwargs)
820
        # If the NIC has a tag for a firewall profile it must be deleted,
821
        # otherwise it may affect another NIC. XXX: Deleting the tag should
822
        # depend on the removing the NIC, but currently RAPI client does not
823
        # support this, this may result in clearing the firewall profile
824
        # without successfully removing the NIC. This issue will be fixed with
825
        # use of NIC UUIDs.
826
        firewall_profile = nic.firewall_profile
827
        if firewall_profile and firewall_profile != "DISABLED":
828
            tag = _firewall_tags[firewall_profile] % nic.index
829
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
830
                                      dry_run=settings.TEST)
831

    
832
        return jobID
833

    
834

    
835
def set_firewall_profile(vm, profile, index=0):
836
    try:
837
        tag = _firewall_tags[profile] % index
838
    except KeyError:
839
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
840

    
841
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
842

    
843
    with pooled_rapi_client(vm) as client:
844
        # Delete previous firewall tags
845
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
846
        delete_tags = [(t % index) for t in _firewall_tags.values()
847
                       if (t % index) in old_tags]
848
        if delete_tags:
849
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
850
                                      dry_run=settings.TEST)
851

    
852
        if profile != "DISABLED":
853
            client.AddInstanceTags(vm.backend_vm_id, [tag],
854
                                   dry_run=settings.TEST)
855

    
856
        # XXX NOP ModifyInstance call to force process_net_status to run
857
        # on the dispatcher
858
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
859
        client.ModifyInstance(vm.backend_vm_id,
860
                              os_name=os_name)
861
    return None
862

    
863

    
864
def get_instances(backend, bulk=True):
865
    with pooled_rapi_client(backend) as c:
866
        return c.GetInstances(bulk=bulk)
867

    
868

    
869
def get_nodes(backend, bulk=True):
870
    with pooled_rapi_client(backend) as c:
871
        return c.GetNodes(bulk=bulk)
872

    
873

    
874
def get_jobs(backend, bulk=True):
875
    with pooled_rapi_client(backend) as c:
876
        return c.GetJobs(bulk=bulk)
877

    
878

    
879
def get_physical_resources(backend):
880
    """ Get the physical resources of a backend.
881

882
    Get the resources of a backend as reported by the backend (not the db).
883

884
    """
885
    nodes = get_nodes(backend, bulk=True)
886
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
887
    res = {}
888
    for a in attr:
889
        res[a] = 0
890
    for n in nodes:
891
        # Filter out drained, offline and not vm_capable nodes since they will
892
        # not take part in the vm allocation process
893
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
894
        if can_host_vms and n['cnodes']:
895
            for a in attr:
896
                res[a] += int(n[a])
897
    return res
898

    
899

    
900
def update_resources(backend, resources=None):
901
    """ Update the state of the backend resources in db.
902

903
    """
904

    
905
    if not resources:
906
        resources = get_physical_resources(backend)
907

    
908
    backend.mfree = resources['mfree']
909
    backend.mtotal = resources['mtotal']
910
    backend.dfree = resources['dfree']
911
    backend.dtotal = resources['dtotal']
912
    backend.pinst_cnt = resources['pinst_cnt']
913
    backend.ctotal = resources['ctotal']
914
    backend.updated = datetime.now()
915
    backend.save()
916

    
917

    
918
def get_memory_from_instances(backend):
919
    """ Get the memory that is used from instances.
920

921
    Get the used memory of a backend. Note: This is different for
922
    the real memory used, due to kvm's memory de-duplication.
923

924
    """
925
    with pooled_rapi_client(backend) as client:
926
        instances = client.GetInstances(bulk=True)
927
    mem = 0
928
    for i in instances:
929
        mem += i['oper_ram']
930
    return mem
931

    
932
##
933
## Synchronized operations for reconciliation
934
##
935

    
936

    
937
def create_network_synced(network, backend):
938
    result = _create_network_synced(network, backend)
939
    if result[0] != 'success':
940
        return result
941
    result = connect_network_synced(network, backend)
942
    return result
943

    
944

    
945
def _create_network_synced(network, backend):
946
    with pooled_rapi_client(backend) as client:
947
        job = _create_network(network, backend)
948
        result = wait_for_job(client, job)
949
    return result
950

    
951

    
952
def connect_network_synced(network, backend):
953
    with pooled_rapi_client(backend) as client:
954
        for group in client.GetGroups():
955
            job = client.ConnectNetwork(network.backend_id, group,
956
                                        network.mode, network.link)
957
            result = wait_for_job(client, job)
958
            if result[0] != 'success':
959
                return result
960

    
961
    return result
962

    
963

    
964
def wait_for_job(client, jobid):
965
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
966
    status = result['job_info'][0]
967
    while status not in ['success', 'error', 'cancel']:
968
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
969
                                         [result], None)
970
        status = result['job_info'][0]
971

    
972
    if status == 'success':
973
        return (status, None)
974
    else:
975
        error = result['job_info'][1]
976
        return (status, error)