Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ a1baa42b

History | View | Annotate | Download (36 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58

    
59
NIC_FIELDS = ["state", "mac", "ipv4", "ipv6", "network", "firewall_profile"]
60

    
61

    
62
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
63
    """Handle quotas for updated VirtualMachine.
64

65
    Update quotas for the updated VirtualMachine based on the job that run on
66
    the Ganeti backend. If a commission has been already issued for this job,
67
    then this commission is just accepted or rejected based on the job status.
68
    Otherwise, a new commission for the given change is issued, that is also in
69
    force and auto-accept mode. In this case, previous commissions are
70
    rejected, since they reflect a previous state of the VM.
71

72
    """
73
    if job_status not in ["success", "error", "canceled"]:
74
        return vm
75

    
76
    # Check successful completion of a job will trigger any quotable change in
77
    # the VM state.
78
    action = utils.get_action_from_opcode(job_opcode, job_fields)
79
    if action == "BUILD":
80
        # Quotas for new VMs are automatically accepted by the API
81
        return vm
82
    commission_info = quotas.get_commission_info(vm, action=action,
83
                                                 action_fields=job_fields)
84

    
85
    if vm.task_job_id == job_id and vm.serial is not None:
86
        # Commission for this change has already been issued. So just
87
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
88
        # if fails, must be accepted, as the user must manually remove the
89
        # failed server
90
        serial = vm.serial
91
        if job_status == "success":
92
            quotas.accept_serial(serial)
93
        elif job_status in ["error", "canceled"]:
94
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
95
                      serial)
96
            quotas.reject_serial(serial)
97
        vm.serial = None
98
    elif job_status == "success" and commission_info is not None:
99
        log.debug("Expected job was %s. Processing job %s. Commission for"
100
                  " this job: %s", vm.task_job_id, job_id, commission_info)
101
        # Commission for this change has not been issued, or the issued
102
        # commission was unaware of the current change. Reject all previous
103
        # commissions and create a new one in forced mode!
104
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
105
                           % (vm, job_id))
106
        quotas.handle_resource_commission(vm, action,
107
                                          commission_info=commission_info,
108
                                          commission_name=commission_name,
109
                                          force=True,
110
                                          auto_accept=True)
111
        log.debug("Issued new commission: %s", vm.serial)
112

    
113
    return vm
114

    
115

    
116
@transaction.commit_on_success
117
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
118
                      beparams=None):
119
    """Process a job progress notification from the backend
120

121
    Process an incoming message from the backend (currently Ganeti).
122
    Job notifications with a terminating status (sucess, error, or canceled),
123
    also update the operating state of the VM.
124

125
    """
126
    # See #1492, #1031, #1111 why this line has been removed
127
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
128
    if status not in [x[0] for x in BACKEND_STATUSES]:
129
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
130

    
131
    vm.backendjobid = jobid
132
    vm.backendjobstatus = status
133
    vm.backendopcode = opcode
134
    vm.backendlogmsg = logmsg
135

    
136
    if status in ["queued", "waiting", "running"]:
137
        vm.save()
138
        return
139

    
140
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
141

    
142
    # Notifications of success change the operating state
143
    if status == "success":
144
        if state_for_success is not None:
145
            vm.operstate = state_for_success
146
        if beparams:
147
            # Change the flavor of the VM
148
            _process_resize(vm, beparams)
149
        # Update backendtime only for jobs that have been successfully
150
        # completed, since only these jobs update the state of the VM. Else a
151
        # "race condition" may occur when a successful job (e.g.
152
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
153
        # in reversed order.
154
        vm.backendtime = etime
155

    
156
    if status in ["success", "error", "canceled"] and nics is not None:
157
        # Update the NICs of the VM
158
        _process_net_status(vm, etime, nics)
159

    
160
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
161
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
162
        vm.operstate = 'ERROR'
163
        vm.backendtime = etime
164
    elif opcode == 'OP_INSTANCE_REMOVE':
165
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
166
        # when no instance exists at the Ganeti backend.
167
        # See ticket #799 for all the details.
168
        if status == 'success' or (status == 'error' and
169
                                   not vm_exists_in_backend(vm)):
170
            # VM has been deleted
171
            for nic in vm.nics.all():
172
                # Release the IP
173
                release_nic_address(nic)
174
                # And delete the NIC.
175
                nic.delete()
176
            vm.deleted = True
177
            vm.operstate = state_for_success
178
            vm.backendtime = etime
179
            status = "success"
180

    
181
    if status in ["success", "error", "canceled"]:
182
        # Job is finalized: Handle quotas/commissioning
183
        job_fields = {"nics": nics, "beparams": beparams}
184
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
185
                              job_status=status, job_fields=job_fields)
186
        # and clear task fields
187
        if vm.task_job_id == jobid:
188
            vm.task = None
189
            vm.task_job_id = None
190

    
191
    vm.save()
192

    
193

    
194
def _process_resize(vm, beparams):
195
    """Change flavor of a VirtualMachine based on new beparams."""
196
    old_flavor = vm.flavor
197
    vcpus = beparams.get("vcpus", old_flavor.cpu)
198
    ram = beparams.get("maxmem", old_flavor.ram)
199
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
200
        return
201
    try:
202
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
203
                                        disk=old_flavor.disk,
204
                                        disk_template=old_flavor.disk_template)
205
    except Flavor.DoesNotExist:
206
        raise Exception("Can not find flavor for VM")
207
    vm.flavor = new_flavor
208
    vm.save()
209

    
210

    
211
@transaction.commit_on_success
212
def process_net_status(vm, etime, nics):
213
    """Wrap _process_net_status inside transaction."""
214
    _process_net_status(vm, etime, nics)
215

    
216

    
217
def _process_net_status(vm, etime, nics):
218
    """Process a net status notification from the backend
219

220
    Process an incoming message from the Ganeti backend,
221
    detailing the NIC configuration of a VM instance.
222

223
    Update the state of the VM in the DB accordingly.
224

225
    """
226
    ganeti_nics = process_ganeti_nics(nics)
227
    db_nics = dict([(nic.id, nic) for nic in vm.nics.all()])
228

    
229
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
230
    # guarantee that no deadlock will occur with Backend allocator.
231
    Backend.objects.select_for_update().get(id=vm.backend_id)
232

    
233
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
234
        db_nic = db_nics.get(nic_name)
235
        ganeti_nic = ganeti_nics.get(nic_name)
236
        if ganeti_nic is None:
237
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
238
            # state for more than 5 minutes, then we remove the NIC.
239
            # TODO: This is dangerous as the job may be stack in the queue, and
240
            # releasing the IP may lead to duplicate IP use.
241
            if nic.state != "BUILDING" or (nic.state == "BUILDING" and
242
               etime > nic.created + timedelta(minutes=5)):
243
                release_nic_address(nic)
244
                nic.delete()
245
            else:
246
                log.warning("Ignoring recent building NIC: %s", nic)
247
        elif db_nic is None:
248
            # NIC exists in Ganeti but not in DB
249
            if ganeti_nic["ipv4"]:
250
                network = ganeti_nic["network"]
251
                network.reserve_address(ganeti_nic["ipv4"])
252
            vm.nics.create(**ganeti_nic)
253
        elif not nics_are_equal(db_nic, ganeti_nic):
254
            # Special case where the IPv4 address has changed, because you
255
            # need to release the old IPv4 address and reserve the new one
256
            if db_nic.ipv4 != ganeti_nic["ipv4"]:
257
                release_nic_address(db_nic)
258
                if ganeti_nic["ipv4"]:
259
                    ganeti_nic["network"].reserve_address(ganeti_nic["ipv4"])
260

    
261
            # Update the NIC in DB with the values from Ganeti NIC
262
            [setattr(db_nic, f, ganeti_nic[f]) for f in NIC_FIELDS]
263
            db_nic.save()
264

    
265
            # Dummy update the network, to work with 'changed-since'
266
            db_nic.network.save()
267

    
268
    vm.backendtime = etime
269
    vm.save()
270

    
271

    
272
def nics_are_equal(db_nic, gnt_nic):
273
    for field in NIC_FIELDS:
274
        if getattr(db_nic, field) != gnt_nic[field]:
275
            return False
276
    return True
277

    
278

    
279
def process_ganeti_nics(ganeti_nics):
280
    """Process NIC dict from ganeti"""
281
    new_nics = []
282
    for index, gnic in enumerate(ganeti_nics):
283
        nic_name = gnic.get("name", None)
284
        if nic_name is not None:
285
            nic_id = utils.id_from_nic_name(nic_name)
286
        else:
287
            # Put as default value the index. If it is an unknown NIC to
288
            # synnefo it will be created automaticaly.
289
            nic_id = "unknown-" + str(index)
290
        network_name = gnic.get('network', '')
291
        network_id = utils.id_from_network_name(network_name)
292
        network = Network.objects.get(id=network_id)
293

    
294
        # Get the new nic info
295
        mac = gnic.get('mac')
296
        ipv4 = gnic.get('ip')
297
        ipv6 = mac2eui64(mac, network.subnet6)\
298
            if network.subnet6 is not None else None
299

    
300
        firewall = gnic.get('firewall')
301
        firewall_profile = _reverse_tags.get(firewall)
302
        if not firewall_profile and network.public:
303
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
304

    
305
        nic_info = {
306
            'index': index,
307
            'network': network,
308
            'mac': mac,
309
            'ipv4': ipv4,
310
            'ipv6': ipv6,
311
            'firewall_profile': firewall_profile,
312
            'state': 'ACTIVE'}
313

    
314
        new_nics.append((nic_id, nic_info))
315
    return dict(new_nics)
316

    
317

    
318
def release_nic_address(nic):
319
    """Release the IPv4 address of a NIC.
320

321
    Check if an instance's NIC has an IPv4 address and release it if it is not
322
    a Floating IP.
323

324
    """
325

    
326
    if nic.ipv4 and not nic.ip_type == "FLOATING":
327
        nic.network.release_address(nic.ipv4)
328

    
329

    
330
@transaction.commit_on_success
331
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
332
    if status not in [x[0] for x in BACKEND_STATUSES]:
333
        raise Network.InvalidBackendMsgError(opcode, status)
334

    
335
    back_network.backendjobid = jobid
336
    back_network.backendjobstatus = status
337
    back_network.backendopcode = opcode
338
    back_network.backendlogmsg = logmsg
339

    
340
    network = back_network.network
341

    
342
    # Notifications of success change the operating state
343
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
344
    if status == 'success' and state_for_success is not None:
345
        back_network.operstate = state_for_success
346

    
347
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
348
        back_network.operstate = 'ERROR'
349
        back_network.backendtime = etime
350

    
351
    if opcode == 'OP_NETWORK_REMOVE':
352
        network_is_deleted = (status == "success")
353
        if network_is_deleted or (status == "error" and not
354
                                  network_exists_in_backend(back_network)):
355
            back_network.operstate = state_for_success
356
            back_network.deleted = True
357
            back_network.backendtime = etime
358

    
359
    if status == 'success':
360
        back_network.backendtime = etime
361
    back_network.save()
362
    # Also you must update the state of the Network!!
363
    update_network_state(network)
364

    
365

    
366
def update_network_state(network):
367
    """Update the state of a Network based on BackendNetwork states.
368

369
    Update the state of a Network based on the operstate of the networks in the
370
    backends that network exists.
371

372
    The state of the network is:
373
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
374
    * DELETED: If it is is 'DELETED' in all backends that have been created.
375

376
    This function also releases the resources (MAC prefix or Bridge) and the
377
    quotas for the network.
378

379
    """
380
    if network.deleted:
381
        # Network has already been deleted. Just assert that state is also
382
        # DELETED
383
        if not network.state == "DELETED":
384
            network.state = "DELETED"
385
            network.save()
386
        return
387

    
388
    backend_states = [s.operstate for s in network.backend_networks.all()]
389
    if not backend_states and network.action != "DESTROY":
390
        if network.state != "ACTIVE":
391
            network.state = "ACTIVE"
392
            network.save()
393
            return
394

    
395
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
396
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
397
                     "DELETED")
398

    
399
    # Release the resources on the deletion of the Network
400
    if deleted:
401
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
402
                 network.id, network.mac_prefix, network.link)
403
        network.deleted = True
404
        network.state = "DELETED"
405
        if network.mac_prefix:
406
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
407
                release_resource(res_type="mac_prefix",
408
                                 value=network.mac_prefix)
409
        if network.link:
410
            if network.FLAVORS[network.flavor]["link"] == "pool":
411
                release_resource(res_type="bridge", value=network.link)
412

    
413
        # Issue commission
414
        if network.userid:
415
            quotas.issue_and_accept_commission(network, delete=True)
416
            # the above has already saved the object and committed;
417
            # a second save would override others' changes, since the
418
            # object is now unlocked
419
            return
420
        elif not network.public:
421
            log.warning("Network %s does not have an owner!", network.id)
422
    network.save()
423

    
424

    
425
@transaction.commit_on_success
426
def process_network_modify(back_network, etime, jobid, opcode, status,
427
                           add_reserved_ips):
428
    assert (opcode == "OP_NETWORK_SET_PARAMS")
429
    if status not in [x[0] for x in BACKEND_STATUSES]:
430
        raise Network.InvalidBackendMsgError(opcode, status)
431

    
432
    back_network.backendjobid = jobid
433
    back_network.backendjobstatus = status
434
    back_network.opcode = opcode
435

    
436
    if add_reserved_ips:
437
        net = back_network.network
438
        pool = net.get_pool()
439
        if add_reserved_ips:
440
            for ip in add_reserved_ips:
441
                pool.reserve(ip, external=True)
442
        pool.save()
443

    
444
    if status == 'success':
445
        back_network.backendtime = etime
446
    back_network.save()
447

    
448

    
449
@transaction.commit_on_success
450
def process_create_progress(vm, etime, progress):
451

    
452
    percentage = int(progress)
453

    
454
    # The percentage may exceed 100%, due to the way
455
    # snf-image:copy-progress tracks bytes read by image handling processes
456
    percentage = 100 if percentage > 100 else percentage
457
    if percentage < 0:
458
        raise ValueError("Percentage cannot be negative")
459

    
460
    # FIXME: log a warning here, see #1033
461
#   if last_update > percentage:
462
#       raise ValueError("Build percentage should increase monotonically " \
463
#                        "(old = %d, new = %d)" % (last_update, percentage))
464

    
465
    # This assumes that no message of type 'ganeti-create-progress' is going to
466
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
467
    # the instance is STARTED.  What if the two messages are processed by two
468
    # separate dispatcher threads, and the 'ganeti-op-status' message for
469
    # successful creation gets processed before the 'ganeti-create-progress'
470
    # message? [vkoukis]
471
    #
472
    #if not vm.operstate == 'BUILD':
473
    #    raise VirtualMachine.IllegalState("VM is not in building state")
474

    
475
    vm.buildpercentage = percentage
476
    vm.backendtime = etime
477
    vm.save()
478

    
479

    
480
@transaction.commit_on_success
481
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
482
                               details=None):
483
    """
484
    Create virtual machine instance diagnostic entry.
485

486
    :param vm: VirtualMachine instance to create diagnostic for.
487
    :param message: Diagnostic message.
488
    :param source: Diagnostic source identifier (e.g. image-helper).
489
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
490
    :param etime: The time the message occured (if available).
491
    :param details: Additional details or debug information.
492
    """
493
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
494
                                                   source_date=etime,
495
                                                   message=message,
496
                                                   details=details)
497

    
498

    
499
def create_instance(vm, nics, flavor, image):
500
    """`image` is a dictionary which should contain the keys:
501
            'backend_id', 'format' and 'metadata'
502

503
        metadata value should be a dictionary.
504
    """
505

    
506
    # Handle arguments to CreateInstance() as a dictionary,
507
    # initialize it based on a deployment-specific value.
508
    # This enables the administrator to override deployment-specific
509
    # arguments, such as the disk template to use, name of os provider
510
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
511
    #
512
    kw = vm.backend.get_create_params()
513
    kw['mode'] = 'create'
514
    kw['name'] = vm.backend_vm_id
515
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
516

    
517
    kw['disk_template'] = flavor.disk_template
518
    kw['disks'] = [{"size": flavor.disk * 1024}]
519
    provider = flavor.disk_provider
520
    if provider:
521
        kw['disks'][0]['provider'] = provider
522
        kw['disks'][0]['origin'] = flavor.disk_origin
523

    
524
    kw['nics'] = [{"name": nic.backend_uuid,
525
                   "network": nic.network.backend_id,
526
                   "ip": nic.ipv4}
527
                  for nic in nics]
528
    backend = vm.backend
529
    depend_jobs = []
530
    for nic in nics:
531
        network = Network.objects.select_for_update().get(id=nic.network.id)
532
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
533
                                                             network=network)
534
        if bnet.operstate != "ACTIVE":
535
            if network.public:
536
                msg = "Can not connect instance to network %s. Network is not"\
537
                      " ACTIVE in backend %s." % (network, backend)
538
                raise Exception(msg)
539
            else:
540
                jobs = create_network(network, backend, connect=True)
541
                if isinstance(jobs, list):
542
                    depend_jobs.extend(jobs)
543
                else:
544
                    depend_jobs.append(jobs)
545
    kw["depends"] = [[job, ["success", "error", "canceled"]]
546
                     for job in depend_jobs]
547

    
548
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
549
    # kw['os'] = settings.GANETI_OS_PROVIDER
550
    kw['ip_check'] = False
551
    kw['name_check'] = False
552

    
553
    # Do not specific a node explicitly, have
554
    # Ganeti use an iallocator instead
555
    #kw['pnode'] = rapi.GetNodes()[0]
556

    
557
    kw['dry_run'] = settings.TEST
558

    
559
    kw['beparams'] = {
560
        'auto_balance': True,
561
        'vcpus': flavor.cpu,
562
        'memory': flavor.ram}
563

    
564
    kw['osparams'] = {
565
        'config_url': vm.config_url,
566
        # Store image id and format to Ganeti
567
        'img_id': image['backend_id'],
568
        'img_format': image['format']}
569

    
570
    # Use opportunistic locking
571
    kw['opportunistic_locking'] = True
572

    
573
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
574
    # kw['hvparams'] = dict(serial_console=False)
575

    
576
    log.debug("Creating instance %s", utils.hide_pass(kw))
577
    with pooled_rapi_client(vm) as client:
578
        return client.CreateInstance(**kw)
579

    
580

    
581
def delete_instance(vm):
582
    with pooled_rapi_client(vm) as client:
583
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
584

    
585

    
586
def reboot_instance(vm, reboot_type):
587
    assert reboot_type in ('soft', 'hard')
588
    kwargs = {"instance": vm.backend_vm_id,
589
              "reboot_type": "hard"}
590
    # XXX: Currently shutdown_timeout parameter is not supported from the
591
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
592
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
593
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
594
    # of OS API is different from the one in Ganeti, and maps to
595
    # 'shutdown_timeout'.
596
    #if reboot_type == "hard":
597
    #    kwargs["shutdown_timeout"] = 0
598
    if settings.TEST:
599
        kwargs["dry_run"] = True
600
    with pooled_rapi_client(vm) as client:
601
        return client.RebootInstance(**kwargs)
602

    
603

    
604
def startup_instance(vm):
605
    with pooled_rapi_client(vm) as client:
606
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
607

    
608

    
609
def shutdown_instance(vm):
610
    with pooled_rapi_client(vm) as client:
611
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
612

    
613

    
614
def resize_instance(vm, vcpus, memory):
615
    beparams = {"vcpus": int(vcpus),
616
                "minmem": int(memory),
617
                "maxmem": int(memory)}
618
    with pooled_rapi_client(vm) as client:
619
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
620

    
621

    
622
def get_instance_console(vm):
623
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
624
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
625
    # useless (see #783).
626
    #
627
    # Until this is fixed on the Ganeti side, construct a console info reply
628
    # directly.
629
    #
630
    # WARNING: This assumes that VNC runs on port network_port on
631
    #          the instance's primary node, and is probably
632
    #          hypervisor-specific.
633
    #
634
    log.debug("Getting console for vm %s", vm)
635

    
636
    console = {}
637
    console['kind'] = 'vnc'
638

    
639
    with pooled_rapi_client(vm) as client:
640
        i = client.GetInstance(vm.backend_vm_id)
641

    
642
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
643
        raise Exception("hv parameter serial_console cannot be true")
644
    console['host'] = i['pnode']
645
    console['port'] = i['network_port']
646

    
647
    return console
648

    
649

    
650
def get_instance_info(vm):
651
    with pooled_rapi_client(vm) as client:
652
        return client.GetInstance(vm.backend_vm_id)
653

    
654

    
655
def vm_exists_in_backend(vm):
656
    try:
657
        get_instance_info(vm)
658
        return True
659
    except GanetiApiError as e:
660
        if e.code == 404:
661
            return False
662
        raise e
663

    
664

    
665
def get_network_info(backend_network):
666
    with pooled_rapi_client(backend_network) as client:
667
        return client.GetNetwork(backend_network.network.backend_id)
668

    
669

    
670
def network_exists_in_backend(backend_network):
671
    try:
672
        get_network_info(backend_network)
673
        return True
674
    except GanetiApiError as e:
675
        if e.code == 404:
676
            return False
677

    
678

    
679
def create_network(network, backend, connect=True):
680
    """Create a network in a Ganeti backend"""
681
    log.debug("Creating network %s in backend %s", network, backend)
682

    
683
    job_id = _create_network(network, backend)
684

    
685
    if connect:
686
        job_ids = connect_network(network, backend, depends=[job_id])
687
        return job_ids
688
    else:
689
        return [job_id]
690

    
691

    
692
def _create_network(network, backend):
693
    """Create a network."""
694

    
695
    tags = network.backend_tag
696
    if network.dhcp:
697
        tags.append('nfdhcpd')
698

    
699
    if network.public:
700
        conflicts_check = True
701
        tags.append('public')
702
    else:
703
        conflicts_check = False
704
        tags.append('private')
705

    
706
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
707
    # not support IPv6 only networks. To bypass this limitation, we create the
708
    # network with a dummy network subnet, and make Cyclades connect instances
709
    # to such networks, with address=None.
710
    subnet = network.subnet
711
    if subnet is None:
712
        subnet = "10.0.0.0/24"
713

    
714
    try:
715
        bn = BackendNetwork.objects.get(network=network, backend=backend)
716
        mac_prefix = bn.mac_prefix
717
    except BackendNetwork.DoesNotExist:
718
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
719
                        " does not exist" % (network.id, backend.id))
720

    
721
    with pooled_rapi_client(backend) as client:
722
        return client.CreateNetwork(network_name=network.backend_id,
723
                                    network=subnet,
724
                                    network6=network.subnet6,
725
                                    gateway=network.gateway,
726
                                    gateway6=network.gateway6,
727
                                    mac_prefix=mac_prefix,
728
                                    conflicts_check=conflicts_check,
729
                                    tags=tags)
730

    
731

    
732
def connect_network(network, backend, depends=[], group=None):
733
    """Connect a network to nodegroups."""
734
    log.debug("Connecting network %s to backend %s", network, backend)
735

    
736
    if network.public:
737
        conflicts_check = True
738
    else:
739
        conflicts_check = False
740

    
741
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
742
    with pooled_rapi_client(backend) as client:
743
        groups = [group] if group is not None else client.GetGroups()
744
        job_ids = []
745
        for group in groups:
746
            job_id = client.ConnectNetwork(network.backend_id, group,
747
                                           network.mode, network.link,
748
                                           conflicts_check,
749
                                           depends=depends)
750
            job_ids.append(job_id)
751
    return job_ids
752

    
753

    
754
def delete_network(network, backend, disconnect=True):
755
    log.debug("Deleting network %s from backend %s", network, backend)
756

    
757
    depends = []
758
    if disconnect:
759
        depends = disconnect_network(network, backend)
760
    _delete_network(network, backend, depends=depends)
761

    
762

    
763
def _delete_network(network, backend, depends=[]):
764
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
765
    with pooled_rapi_client(backend) as client:
766
        return client.DeleteNetwork(network.backend_id, depends)
767

    
768

    
769
def disconnect_network(network, backend, group=None):
770
    log.debug("Disconnecting network %s to backend %s", network, backend)
771

    
772
    with pooled_rapi_client(backend) as client:
773
        groups = [group] if group is not None else client.GetGroups()
774
        job_ids = []
775
        for group in groups:
776
            job_id = client.DisconnectNetwork(network.backend_id, group)
777
            job_ids.append(job_id)
778
    return job_ids
779

    
780

    
781
def connect_to_network(vm, nic):
782
    network = nic.network
783
    backend = vm.backend
784
    network = Network.objects.select_for_update().get(id=network.id)
785
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
786
                                                         network=network)
787
    depend_jobs = []
788
    if bnet.operstate != "ACTIVE":
789
        depend_jobs = create_network(network, backend, connect=True)
790

    
791
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
792

    
793
    nic = {'name': nic.backend_uuid,
794
           'network': network.backend_id,
795
           'ip': nic.ipv4}
796

    
797
    log.debug("Adding NIC %s to VM %s", nic, vm)
798

    
799
    kwargs = {
800
        "instance": vm.backend_vm_id,
801
        "nics": [("add", "-1", nic)],
802
        "depends": depends,
803
    }
804
    if vm.backend.use_hotplug():
805
        kwargs["hotplug"] = True
806
    if settings.TEST:
807
        kwargs["dry_run"] = True
808

    
809
    with pooled_rapi_client(vm) as client:
810
        return client.ModifyInstance(**kwargs)
811

    
812

    
813
def disconnect_from_network(vm, nic):
814
    log.debug("Removing NIC %s of VM %s", nic, vm)
815

    
816
    kwargs = {
817
        "instance": vm.backend_vm_id,
818
        "nics": [("remove", nic.index, {})],
819
    }
820
    if vm.backend.use_hotplug():
821
        kwargs["hotplug"] = True
822
    if settings.TEST:
823
        kwargs["dry_run"] = True
824

    
825
    with pooled_rapi_client(vm) as client:
826
        jobID = client.ModifyInstance(**kwargs)
827
        # If the NIC has a tag for a firewall profile it must be deleted,
828
        # otherwise it may affect another NIC. XXX: Deleting the tag should
829
        # depend on the removing the NIC, but currently RAPI client does not
830
        # support this, this may result in clearing the firewall profile
831
        # without successfully removing the NIC. This issue will be fixed with
832
        # use of NIC UUIDs.
833
        firewall_profile = nic.firewall_profile
834
        if firewall_profile and firewall_profile != "DISABLED":
835
            tag = _firewall_tags[firewall_profile] % nic.index
836
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
837
                                      dry_run=settings.TEST)
838

    
839
        return jobID
840

    
841

    
842
def set_firewall_profile(vm, profile, index=0):
843
    try:
844
        tag = _firewall_tags[profile] % index
845
    except KeyError:
846
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
847

    
848
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
849

    
850
    with pooled_rapi_client(vm) as client:
851
        # Delete previous firewall tags
852
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
853
        delete_tags = [(t % index) for t in _firewall_tags.values()
854
                       if (t % index) in old_tags]
855
        if delete_tags:
856
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
857
                                      dry_run=settings.TEST)
858

    
859
        if profile != "DISABLED":
860
            client.AddInstanceTags(vm.backend_vm_id, [tag],
861
                                   dry_run=settings.TEST)
862

    
863
        # XXX NOP ModifyInstance call to force process_net_status to run
864
        # on the dispatcher
865
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
866
        client.ModifyInstance(vm.backend_vm_id,
867
                              os_name=os_name)
868
    return None
869

    
870

    
871
def get_instances(backend, bulk=True):
872
    with pooled_rapi_client(backend) as c:
873
        return c.GetInstances(bulk=bulk)
874

    
875

    
876
def get_nodes(backend, bulk=True):
877
    with pooled_rapi_client(backend) as c:
878
        return c.GetNodes(bulk=bulk)
879

    
880

    
881
def get_jobs(backend, bulk=True):
882
    with pooled_rapi_client(backend) as c:
883
        return c.GetJobs(bulk=bulk)
884

    
885

    
886
def get_physical_resources(backend):
887
    """ Get the physical resources of a backend.
888

889
    Get the resources of a backend as reported by the backend (not the db).
890

891
    """
892
    nodes = get_nodes(backend, bulk=True)
893
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
894
    res = {}
895
    for a in attr:
896
        res[a] = 0
897
    for n in nodes:
898
        # Filter out drained, offline and not vm_capable nodes since they will
899
        # not take part in the vm allocation process
900
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
901
        if can_host_vms and n['cnodes']:
902
            for a in attr:
903
                res[a] += int(n[a])
904
    return res
905

    
906

    
907
def update_backend_resources(backend, resources=None):
908
    """ Update the state of the backend resources in db.
909

910
    """
911

    
912
    if not resources:
913
        resources = get_physical_resources(backend)
914

    
915
    backend.mfree = resources['mfree']
916
    backend.mtotal = resources['mtotal']
917
    backend.dfree = resources['dfree']
918
    backend.dtotal = resources['dtotal']
919
    backend.pinst_cnt = resources['pinst_cnt']
920
    backend.ctotal = resources['ctotal']
921
    backend.updated = datetime.now()
922
    backend.save()
923

    
924

    
925
def get_memory_from_instances(backend):
926
    """ Get the memory that is used from instances.
927

928
    Get the used memory of a backend. Note: This is different for
929
    the real memory used, due to kvm's memory de-duplication.
930

931
    """
932
    with pooled_rapi_client(backend) as client:
933
        instances = client.GetInstances(bulk=True)
934
    mem = 0
935
    for i in instances:
936
        mem += i['oper_ram']
937
    return mem
938

    
939

    
940
def get_available_disk_templates(backend):
941
    """Get the list of available disk templates of a Ganeti backend.
942

943
    The list contains the disk templates that are enabled in the Ganeti backend
944
    and also included in ipolicy-disk-templates.
945

946
    """
947
    with pooled_rapi_client(backend) as c:
948
        info = c.GetInfo()
949
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
950
    try:
951
        enabled_disk_templates = info["enabled_disk_templates"]
952
        return [dp for dp in enabled_disk_templates
953
                if dp in ipolicy_disk_templates]
954
    except KeyError:
955
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
956
        return ipolicy_disk_templates
957

    
958

    
959
def update_backend_disk_templates(backend):
960
    disk_templates = get_available_disk_templates(backend)
961
    backend.disk_templates = disk_templates
962
    backend.save()
963

    
964

    
965
##
966
## Synchronized operations for reconciliation
967
##
968

    
969

    
970
def create_network_synced(network, backend):
971
    result = _create_network_synced(network, backend)
972
    if result[0] != 'success':
973
        return result
974
    result = connect_network_synced(network, backend)
975
    return result
976

    
977

    
978
def _create_network_synced(network, backend):
979
    with pooled_rapi_client(backend) as client:
980
        job = _create_network(network, backend)
981
        result = wait_for_job(client, job)
982
    return result
983

    
984

    
985
def connect_network_synced(network, backend):
986
    with pooled_rapi_client(backend) as client:
987
        for group in client.GetGroups():
988
            job = client.ConnectNetwork(network.backend_id, group,
989
                                        network.mode, network.link)
990
            result = wait_for_job(client, job)
991
            if result[0] != 'success':
992
                return result
993

    
994
    return result
995

    
996

    
997
def wait_for_job(client, jobid):
998
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
999
    status = result['job_info'][0]
1000
    while status not in ['success', 'error', 'cancel']:
1001
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1002
                                         [result], None)
1003
        status = result['job_info'][0]
1004

    
1005
    if status == 'success':
1006
        return (status, None)
1007
    else:
1008
        error = result['job_info'][1]
1009
        return (status, error)