Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 92d2d1ce

History | View | Annotate | Download (36.8 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               IPAddress,
39
                               BackendNetwork, BACKEND_STATUSES,
40
                               pooled_rapi_client, VirtualMachineDiagnostic,
41
                               Flavor)
42
from synnefo.logic import utils
43
from synnefo import quotas
44
from synnefo.api.util import release_resource
45
from synnefo.util.mac2eui64 import mac2eui64
46
from synnefo.logic.rapi import GanetiApiError
47

    
48
from logging import getLogger
49
log = getLogger(__name__)
50

    
51

    
52
_firewall_tags = {
53
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
54
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
55
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
56

    
57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58

    
59
# Timeout in seconds for building NICs. After this period the NICs considered
60
# stale and removed from DB.
61
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
62

    
63
NIC_FIELDS = ["state", "mac", "ipv4", "ipv6", "network", "firewall_profile",
64
              "index"]
65
UNKNOWN_NIC_PREFIX = "unknown-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in ["success", "error", "canceled"]:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88
    commission_info = quotas.get_commission_info(vm, action=action,
89
                                                 action_fields=job_fields)
90

    
91
    if vm.task_job_id == job_id and vm.serial is not None:
92
        # Commission for this change has already been issued. So just
93
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
94
        # if fails, must be accepted, as the user must manually remove the
95
        # failed server
96
        serial = vm.serial
97
        if job_status == "success":
98
            quotas.accept_serial(serial)
99
        elif job_status in ["error", "canceled"]:
100
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
101
                      serial)
102
            quotas.reject_serial(serial)
103
        vm.serial = None
104
    elif job_status == "success" and commission_info is not None:
105
        log.debug("Expected job was %s. Processing job %s. Commission for"
106
                  " this job: %s", vm.task_job_id, job_id, commission_info)
107
        # Commission for this change has not been issued, or the issued
108
        # commission was unaware of the current change. Reject all previous
109
        # commissions and create a new one in forced mode!
110
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
111
                           % (vm, job_id))
112
        quotas.handle_resource_commission(vm, action,
113
                                          commission_info=commission_info,
114
                                          commission_name=commission_name,
115
                                          force=True,
116
                                          auto_accept=True)
117
        log.debug("Issued new commission: %s", vm.serial)
118

    
119
    return vm
120

    
121

    
122
@transaction.commit_on_success
123
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
124
                      job_fields=None):
125
    """Process a job progress notification from the backend
126

127
    Process an incoming message from the backend (currently Ganeti).
128
    Job notifications with a terminating status (sucess, error, or canceled),
129
    also update the operating state of the VM.
130

131
    """
132
    # See #1492, #1031, #1111 why this line has been removed
133
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
134
    if status not in [x[0] for x in BACKEND_STATUSES]:
135
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
136

    
137
    vm.backendjobid = jobid
138
    vm.backendjobstatus = status
139
    vm.backendopcode = opcode
140
    vm.backendlogmsg = logmsg
141

    
142
    if status in ["queued", "waiting", "running"]:
143
        vm.save()
144
        return
145

    
146
    if job_fields is None:
147
        job_fields = {}
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

    
150
    # Notifications of success change the operating state
151
    if status == "success":
152
        if state_for_success is not None:
153
            vm.operstate = state_for_success
154
        beparams = job_fields.get("beparams", None)
155
        if beparams:
156
            # Change the flavor of the VM
157
            _process_resize(vm, beparams)
158
        # Update backendtime only for jobs that have been successfully
159
        # completed, since only these jobs update the state of the VM. Else a
160
        # "race condition" may occur when a successful job (e.g.
161
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
162
        # in reversed order.
163
        vm.backendtime = etime
164

    
165
    if status in ["success", "error", "canceled"] and nics is not None:
166
        # Update the NICs of the VM
167
        _process_net_status(vm, etime, nics)
168

    
169
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
170
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
171
        vm.operstate = 'ERROR'
172
        vm.backendtime = etime
173
    elif opcode == 'OP_INSTANCE_REMOVE':
174
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
175
        # when no instance exists at the Ganeti backend.
176
        # See ticket #799 for all the details.
177
        if status == 'success' or (status == 'error' and
178
                                   not vm_exists_in_backend(vm)):
179
            # VM has been deleted
180
            for nic in vm.nics.all():
181
                # Release the IP
182
                release_nic_address(nic)
183
                # And delete the NIC.
184
                nic.delete()
185
            vm.deleted = True
186
            vm.operstate = state_for_success
187
            vm.backendtime = etime
188
            status = "success"
189

    
190
    if status in ["success", "error", "canceled"]:
191
        # Job is finalized: Handle quotas/commissioning
192
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
193
                              job_status=status, job_fields=job_fields)
194
        # and clear task fields
195
        if vm.task_job_id == jobid:
196
            vm.task = None
197
            vm.task_job_id = None
198

    
199
    vm.save()
200

    
201

    
202
def _process_resize(vm, beparams):
203
    """Change flavor of a VirtualMachine based on new beparams."""
204
    old_flavor = vm.flavor
205
    vcpus = beparams.get("vcpus", old_flavor.cpu)
206
    ram = beparams.get("maxmem", old_flavor.ram)
207
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
208
        return
209
    try:
210
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
211
                                        disk=old_flavor.disk,
212
                                        disk_template=old_flavor.disk_template)
213
    except Flavor.DoesNotExist:
214
        raise Exception("Can not find flavor for VM")
215
    vm.flavor = new_flavor
216
    vm.save()
217

    
218

    
219
@transaction.commit_on_success
220
def process_net_status(vm, etime, nics):
221
    """Wrap _process_net_status inside transaction."""
222
    _process_net_status(vm, etime, nics)
223

    
224

    
225
def _process_net_status(vm, etime, nics):
226
    """Process a net status notification from the backend
227

228
    Process an incoming message from the Ganeti backend,
229
    detailing the NIC configuration of a VM instance.
230

231
    Update the state of the VM in the DB accordingly.
232

233
    """
234
    ganeti_nics = process_ganeti_nics(nics)
235
    db_nics = dict([(nic.id, nic) for nic in vm.nics.all()])
236

    
237
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
238
    # guarantee that no deadlock will occur with Backend allocator.
239
    Backend.objects.select_for_update().get(id=vm.backend_id)
240

    
241
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
242
        db_nic = db_nics.get(nic_name)
243
        ganeti_nic = ganeti_nics.get(nic_name)
244
        if ganeti_nic is None:
245
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
246
            # state for more than 5 minutes, then we remove the NIC.
247
            # TODO: This is dangerous as the job may be stack in the queue, and
248
            # releasing the IP may lead to duplicate IP use.
249
            if db_nic.state != "BUILDING" or\
250
                (db_nic.state == "BUILDING" and
251
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
252
                release_nic_address(db_nic)
253
                db_nic.delete()
254
            else:
255
                log.warning("Ignoring recent building NIC: %s", db_nic)
256
        elif db_nic is None:
257
            # NIC exists in Ganeti but not in DB
258
            if str(nic_name).startswith(UNKNOWN_NIC_PREFIX):
259
                msg = "Can not process NIC! NIC '%s' does not have a"\
260
                      " valid name." % ganeti_nic
261
                log.error(msg)
262
                continue
263
            if ganeti_nic["ipv4"]:
264
                network = ganeti_nic["network"]
265
                network.reserve_address(ganeti_nic["ipv4"])
266
            vm.nics.create(id=nic_name, **ganeti_nic)
267
        elif not nics_are_equal(db_nic, ganeti_nic):
268
            # Special case where the IPv4 address has changed, because you
269
            # need to release the old IPv4 address and reserve the new one
270
            if db_nic.ipv4 != ganeti_nic["ipv4"]:
271
                release_nic_address(db_nic)
272
                if ganeti_nic["ipv4"]:
273
                    ganeti_nic["network"].reserve_address(ganeti_nic["ipv4"])
274

    
275
            # Update the NIC in DB with the values from Ganeti NIC
276
            [setattr(db_nic, f, ganeti_nic[f]) for f in NIC_FIELDS]
277
            db_nic.save()
278

    
279
            # Dummy update the network, to work with 'changed-since'
280
            db_nic.network.save()
281

    
282
    vm.backendtime = etime
283
    vm.save()
284

    
285

    
286
def nics_are_equal(db_nic, gnt_nic):
287
    for field in NIC_FIELDS:
288
        if getattr(db_nic, field) != gnt_nic[field]:
289
            return False
290
    return True
291

    
292

    
293
def process_ganeti_nics(ganeti_nics):
294
    """Process NIC dict from ganeti"""
295
    new_nics = []
296
    for index, gnic in enumerate(ganeti_nics):
297
        nic_name = gnic.get("name", None)
298
        if nic_name is not None:
299
            nic_id = utils.id_from_nic_name(nic_name)
300
        else:
301
            # Put as default value the index. If it is an unknown NIC to
302
            # synnefo it will be created automaticaly.
303
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
304
        network_name = gnic.get('network', '')
305
        network_id = utils.id_from_network_name(network_name)
306
        network = Network.objects.get(id=network_id)
307

    
308
        # Get the new nic info
309
        mac = gnic.get('mac')
310
        ipv4 = gnic.get('ip')
311
        ipv6 = mac2eui64(mac, network.subnet6)\
312
            if network.subnet6 is not None else None
313

    
314
        firewall = gnic.get('firewall')
315
        firewall_profile = _reverse_tags.get(firewall)
316
        if not firewall_profile and network.public:
317
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
318

    
319
        nic_info = {
320
            'index': index,
321
            'network': network,
322
            'mac': mac,
323
            'ipv4': ipv4,
324
            'ipv6': ipv6,
325
            'firewall_profile': firewall_profile,
326
            'state': 'ACTIVE'}
327

    
328
        new_nics.append((nic_id, nic_info))
329
    return dict(new_nics)
330

    
331

    
332
def release_nic_address(nic):
333
    """Release the IPv4 address of a NIC.
334

335
    Check if an instance's NIC has an IPv4 address and release it if it is not
336
    a Floating IP. If it is as Floating IP, then disassociate the FloatingIP
337
    from the machine.
338

339
    """
340

    
341
    if nic.ipv4:
342
        if nic.ip_type == "FLOATING":
343
            IPAddress.objects.filter(machine=nic.machine_id,
344
                                     network=nic.network_id,
345
                                     ipv4=nic.ipv4).update(machine=None)
346
        else:
347
            nic.network.release_address(nic.ipv4)
348

    
349

    
350
@transaction.commit_on_success
351
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
352
    if status not in [x[0] for x in BACKEND_STATUSES]:
353
        raise Network.InvalidBackendMsgError(opcode, status)
354

    
355
    back_network.backendjobid = jobid
356
    back_network.backendjobstatus = status
357
    back_network.backendopcode = opcode
358
    back_network.backendlogmsg = logmsg
359

    
360
    network = back_network.network
361

    
362
    # Notifications of success change the operating state
363
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
364
    if status == 'success' and state_for_success is not None:
365
        back_network.operstate = state_for_success
366

    
367
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
368
        back_network.operstate = 'ERROR'
369
        back_network.backendtime = etime
370

    
371
    if opcode == 'OP_NETWORK_REMOVE':
372
        network_is_deleted = (status == "success")
373
        if network_is_deleted or (status == "error" and not
374
                                  network_exists_in_backend(back_network)):
375
            back_network.operstate = state_for_success
376
            back_network.deleted = True
377
            back_network.backendtime = etime
378

    
379
    if status == 'success':
380
        back_network.backendtime = etime
381
    back_network.save()
382
    # Also you must update the state of the Network!!
383
    update_network_state(network)
384

    
385

    
386
def update_network_state(network):
387
    """Update the state of a Network based on BackendNetwork states.
388

389
    Update the state of a Network based on the operstate of the networks in the
390
    backends that network exists.
391

392
    The state of the network is:
393
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
394
    * DELETED: If it is is 'DELETED' in all backends that have been created.
395

396
    This function also releases the resources (MAC prefix or Bridge) and the
397
    quotas for the network.
398

399
    """
400
    if network.deleted:
401
        # Network has already been deleted. Just assert that state is also
402
        # DELETED
403
        if not network.state == "DELETED":
404
            network.state = "DELETED"
405
            network.save()
406
        return
407

    
408
    backend_states = [s.operstate for s in network.backend_networks.all()]
409
    if not backend_states and network.action != "DESTROY":
410
        if network.state != "ACTIVE":
411
            network.state = "ACTIVE"
412
            network.save()
413
            return
414

    
415
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
416
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
417
                     "DELETED")
418

    
419
    # Release the resources on the deletion of the Network
420
    if deleted:
421
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
422
                 network.id, network.mac_prefix, network.link)
423
        network.deleted = True
424
        network.state = "DELETED"
425
        if network.mac_prefix:
426
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
427
                release_resource(res_type="mac_prefix",
428
                                 value=network.mac_prefix)
429
        if network.link:
430
            if network.FLAVORS[network.flavor]["link"] == "pool":
431
                release_resource(res_type="bridge", value=network.link)
432

    
433
        # Issue commission
434
        if network.userid:
435
            quotas.issue_and_accept_commission(network, delete=True)
436
            # the above has already saved the object and committed;
437
            # a second save would override others' changes, since the
438
            # object is now unlocked
439
            return
440
        elif not network.public:
441
            log.warning("Network %s does not have an owner!", network.id)
442
    network.save()
443

    
444

    
445
@transaction.commit_on_success
446
def process_network_modify(back_network, etime, jobid, opcode, status,
447
                           job_fields):
448
    assert (opcode == "OP_NETWORK_SET_PARAMS")
449
    if status not in [x[0] for x in BACKEND_STATUSES]:
450
        raise Network.InvalidBackendMsgError(opcode, status)
451

    
452
    back_network.backendjobid = jobid
453
    back_network.backendjobstatus = status
454
    back_network.opcode = opcode
455

    
456
    add_reserved_ips = job_fields.get("add_reserved_ips")
457
    if add_reserved_ips:
458
        net = back_network.network
459
        pool = net.get_pool()
460
        if add_reserved_ips:
461
            for ip in add_reserved_ips:
462
                pool.reserve(ip, external=True)
463
        pool.save()
464

    
465
    if status == 'success':
466
        back_network.backendtime = etime
467
    back_network.save()
468

    
469

    
470
@transaction.commit_on_success
471
def process_create_progress(vm, etime, progress):
472

    
473
    percentage = int(progress)
474

    
475
    # The percentage may exceed 100%, due to the way
476
    # snf-image:copy-progress tracks bytes read by image handling processes
477
    percentage = 100 if percentage > 100 else percentage
478
    if percentage < 0:
479
        raise ValueError("Percentage cannot be negative")
480

    
481
    # FIXME: log a warning here, see #1033
482
#   if last_update > percentage:
483
#       raise ValueError("Build percentage should increase monotonically " \
484
#                        "(old = %d, new = %d)" % (last_update, percentage))
485

    
486
    # This assumes that no message of type 'ganeti-create-progress' is going to
487
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
488
    # the instance is STARTED.  What if the two messages are processed by two
489
    # separate dispatcher threads, and the 'ganeti-op-status' message for
490
    # successful creation gets processed before the 'ganeti-create-progress'
491
    # message? [vkoukis]
492
    #
493
    #if not vm.operstate == 'BUILD':
494
    #    raise VirtualMachine.IllegalState("VM is not in building state")
495

    
496
    vm.buildpercentage = percentage
497
    vm.backendtime = etime
498
    vm.save()
499

    
500

    
501
@transaction.commit_on_success
502
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
503
                               details=None):
504
    """
505
    Create virtual machine instance diagnostic entry.
506

507
    :param vm: VirtualMachine instance to create diagnostic for.
508
    :param message: Diagnostic message.
509
    :param source: Diagnostic source identifier (e.g. image-helper).
510
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
511
    :param etime: The time the message occured (if available).
512
    :param details: Additional details or debug information.
513
    """
514
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
515
                                                   source_date=etime,
516
                                                   message=message,
517
                                                   details=details)
518

    
519

    
520
def create_instance(vm, nics, flavor, image):
521
    """`image` is a dictionary which should contain the keys:
522
            'backend_id', 'format' and 'metadata'
523

524
        metadata value should be a dictionary.
525
    """
526

    
527
    # Handle arguments to CreateInstance() as a dictionary,
528
    # initialize it based on a deployment-specific value.
529
    # This enables the administrator to override deployment-specific
530
    # arguments, such as the disk template to use, name of os provider
531
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
532
    #
533
    kw = vm.backend.get_create_params()
534
    kw['mode'] = 'create'
535
    kw['name'] = vm.backend_vm_id
536
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
537

    
538
    kw['disk_template'] = flavor.disk_template
539
    kw['disks'] = [{"size": flavor.disk * 1024}]
540
    provider = flavor.disk_provider
541
    if provider:
542
        kw['disks'][0]['provider'] = provider
543
        kw['disks'][0]['origin'] = flavor.disk_origin
544

    
545
    kw['nics'] = [{"name": nic.backend_uuid,
546
                   "network": nic.network.backend_id,
547
                   "ip": nic.ipv4_address}
548
                  for nic in nics]
549
    backend = vm.backend
550
    depend_jobs = []
551
    for nic in nics:
552
        network = Network.objects.select_for_update().get(id=nic.network_id)
553
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
554
                                                             network=network)
555
        if bnet.operstate != "ACTIVE":
556
            if network.public:
557
                msg = "Can not connect instance to network %s. Network is not"\
558
                      " ACTIVE in backend %s." % (network, backend)
559
                raise Exception(msg)
560
            else:
561
                jobs = create_network(network, backend, connect=True)
562
                if isinstance(jobs, list):
563
                    depend_jobs.extend(jobs)
564
                else:
565
                    depend_jobs.append(jobs)
566
    kw["depends"] = [[job, ["success", "error", "canceled"]]
567
                     for job in depend_jobs]
568

    
569
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
570
    # kw['os'] = settings.GANETI_OS_PROVIDER
571
    kw['ip_check'] = False
572
    kw['name_check'] = False
573

    
574
    # Do not specific a node explicitly, have
575
    # Ganeti use an iallocator instead
576
    #kw['pnode'] = rapi.GetNodes()[0]
577

    
578
    kw['dry_run'] = settings.TEST
579

    
580
    kw['beparams'] = {
581
        'auto_balance': True,
582
        'vcpus': flavor.cpu,
583
        'memory': flavor.ram}
584

    
585
    kw['osparams'] = {
586
        'config_url': vm.config_url,
587
        # Store image id and format to Ganeti
588
        'img_id': image['backend_id'],
589
        'img_format': image['format']}
590

    
591
    # Use opportunistic locking
592
    kw['opportunistic_locking'] = True
593

    
594
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
595
    # kw['hvparams'] = dict(serial_console=False)
596

    
597
    log.debug("Creating instance %s", utils.hide_pass(kw))
598
    with pooled_rapi_client(vm) as client:
599
        return client.CreateInstance(**kw)
600

    
601

    
602
def delete_instance(vm):
603
    with pooled_rapi_client(vm) as client:
604
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
605

    
606

    
607
def reboot_instance(vm, reboot_type):
608
    assert reboot_type in ('soft', 'hard')
609
    kwargs = {"instance": vm.backend_vm_id,
610
              "reboot_type": "hard"}
611
    # XXX: Currently shutdown_timeout parameter is not supported from the
612
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
613
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
614
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
615
    # of OS API is different from the one in Ganeti, and maps to
616
    # 'shutdown_timeout'.
617
    #if reboot_type == "hard":
618
    #    kwargs["shutdown_timeout"] = 0
619
    if settings.TEST:
620
        kwargs["dry_run"] = True
621
    with pooled_rapi_client(vm) as client:
622
        return client.RebootInstance(**kwargs)
623

    
624

    
625
def startup_instance(vm):
626
    with pooled_rapi_client(vm) as client:
627
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
628

    
629

    
630
def shutdown_instance(vm):
631
    with pooled_rapi_client(vm) as client:
632
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
633

    
634

    
635
def resize_instance(vm, vcpus, memory):
636
    beparams = {"vcpus": int(vcpus),
637
                "minmem": int(memory),
638
                "maxmem": int(memory)}
639
    with pooled_rapi_client(vm) as client:
640
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
641

    
642

    
643
def get_instance_console(vm):
644
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
645
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
646
    # useless (see #783).
647
    #
648
    # Until this is fixed on the Ganeti side, construct a console info reply
649
    # directly.
650
    #
651
    # WARNING: This assumes that VNC runs on port network_port on
652
    #          the instance's primary node, and is probably
653
    #          hypervisor-specific.
654
    #
655
    log.debug("Getting console for vm %s", vm)
656

    
657
    console = {}
658
    console['kind'] = 'vnc'
659

    
660
    with pooled_rapi_client(vm) as client:
661
        i = client.GetInstance(vm.backend_vm_id)
662

    
663
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
664
        raise Exception("hv parameter serial_console cannot be true")
665
    console['host'] = i['pnode']
666
    console['port'] = i['network_port']
667

    
668
    return console
669

    
670

    
671
def get_instance_info(vm):
672
    with pooled_rapi_client(vm) as client:
673
        return client.GetInstance(vm.backend_vm_id)
674

    
675

    
676
def vm_exists_in_backend(vm):
677
    try:
678
        get_instance_info(vm)
679
        return True
680
    except GanetiApiError as e:
681
        if e.code == 404:
682
            return False
683
        raise e
684

    
685

    
686
def get_network_info(backend_network):
687
    with pooled_rapi_client(backend_network) as client:
688
        return client.GetNetwork(backend_network.network.backend_id)
689

    
690

    
691
def network_exists_in_backend(backend_network):
692
    try:
693
        get_network_info(backend_network)
694
        return True
695
    except GanetiApiError as e:
696
        if e.code == 404:
697
            return False
698

    
699

    
700
def create_network(network, backend, connect=True):
701
    """Create a network in a Ganeti backend"""
702
    log.debug("Creating network %s in backend %s", network, backend)
703

    
704
    job_id = _create_network(network, backend)
705

    
706
    if connect:
707
        job_ids = connect_network(network, backend, depends=[job_id])
708
        return job_ids
709
    else:
710
        return [job_id]
711

    
712

    
713
def _create_network(network, backend):
714
    """Create a network."""
715

    
716
    tags = network.backend_tag
717
    subnet = None
718
    subnet6 = None
719
    gateway = None
720
    gateway6 = None
721
    for subnet in network.subnets.all():
722
        if subnet.ipversion == 4:
723
            if subnet.dhcp:
724
                tags.append('nfdhcpd')
725
                subnet = subnet.cidr
726
                gateway = subnet.gateway
727
        elif subnet.ipversion == 6:
728
                subnet6 = subnet.cidr
729
                gateway6 = subnet.gateway
730

    
731
    if network.public:
732
        conflicts_check = True
733
        tags.append('public')
734
    else:
735
        conflicts_check = False
736
        tags.append('private')
737

    
738
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
739
    # not support IPv6 only networks. To bypass this limitation, we create the
740
    # network with a dummy network subnet, and make Cyclades connect instances
741
    # to such networks, with address=None.
742
    if subnet is None:
743
        subnet = "10.0.0.0/24"
744

    
745
    try:
746
        bn = BackendNetwork.objects.get(network=network, backend=backend)
747
        mac_prefix = bn.mac_prefix
748
    except BackendNetwork.DoesNotExist:
749
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
750
                        " does not exist" % (network.id, backend.id))
751

    
752
    with pooled_rapi_client(backend) as client:
753
        return client.CreateNetwork(network_name=network.backend_id,
754
                                    network=subnet,
755
                                    network6=subnet6,
756
                                    gateway=gateway,
757
                                    gateway6=gateway6,
758
                                    mac_prefix=mac_prefix,
759
                                    conflicts_check=conflicts_check,
760
                                    tags=tags)
761

    
762

    
763
def connect_network(network, backend, depends=[], group=None):
764
    """Connect a network to nodegroups."""
765
    log.debug("Connecting network %s to backend %s", network, backend)
766

    
767
    if network.public:
768
        conflicts_check = True
769
    else:
770
        conflicts_check = False
771

    
772
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
773
    with pooled_rapi_client(backend) as client:
774
        groups = [group] if group is not None else client.GetGroups()
775
        job_ids = []
776
        for group in groups:
777
            job_id = client.ConnectNetwork(network.backend_id, group,
778
                                           network.mode, network.link,
779
                                           conflicts_check,
780
                                           depends=depends)
781
            job_ids.append(job_id)
782
    return job_ids
783

    
784

    
785
def delete_network(network, backend, disconnect=True):
786
    log.debug("Deleting network %s from backend %s", network, backend)
787

    
788
    depends = []
789
    if disconnect:
790
        depends = disconnect_network(network, backend)
791
    _delete_network(network, backend, depends=depends)
792

    
793

    
794
def _delete_network(network, backend, depends=[]):
795
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
796
    with pooled_rapi_client(backend) as client:
797
        return client.DeleteNetwork(network.backend_id, depends)
798

    
799

    
800
def disconnect_network(network, backend, group=None):
801
    log.debug("Disconnecting network %s to backend %s", network, backend)
802

    
803
    with pooled_rapi_client(backend) as client:
804
        groups = [group] if group is not None else client.GetGroups()
805
        job_ids = []
806
        for group in groups:
807
            job_id = client.DisconnectNetwork(network.backend_id, group)
808
            job_ids.append(job_id)
809
    return job_ids
810

    
811

    
812
def connect_to_network(vm, nic):
813
    network = nic.network
814
    backend = vm.backend
815
    network = Network.objects.select_for_update().get(id=network.id)
816
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
817
                                                         network=network)
818
    depend_jobs = []
819
    if bnet.operstate != "ACTIVE":
820
        depend_jobs = create_network(network, backend, connect=True)
821

    
822
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
823

    
824
    nic = {'name': nic.backend_uuid,
825
           'network': network.backend_id,
826
           'ip': nic.ipv4}
827

    
828
    log.debug("Adding NIC %s to VM %s", nic, vm)
829

    
830
    kwargs = {
831
        "instance": vm.backend_vm_id,
832
        "nics": [("add", "-1", nic)],
833
        "depends": depends,
834
    }
835
    if vm.backend.use_hotplug():
836
        kwargs["hotplug"] = True
837
    if settings.TEST:
838
        kwargs["dry_run"] = True
839

    
840
    with pooled_rapi_client(vm) as client:
841
        return client.ModifyInstance(**kwargs)
842

    
843

    
844
def disconnect_from_network(vm, nic):
845
    log.debug("Removing NIC %s of VM %s", nic, vm)
846

    
847
    kwargs = {
848
        "instance": vm.backend_vm_id,
849
        "nics": [("remove", nic.backend_uuid, {})],
850
    }
851
    if vm.backend.use_hotplug():
852
        kwargs["hotplug"] = True
853
    if settings.TEST:
854
        kwargs["dry_run"] = True
855

    
856
    with pooled_rapi_client(vm) as client:
857
        jobID = client.ModifyInstance(**kwargs)
858
        firewall_profile = nic.firewall_profile
859
        if firewall_profile and firewall_profile != "DISABLED":
860
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
861
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
862
                                      dry_run=settings.TEST)
863

    
864
        return jobID
865

    
866

    
867
def set_firewall_profile(vm, profile, nic):
868
    uuid = nic.backend_uuid
869
    try:
870
        tag = _firewall_tags[profile] % uuid
871
    except KeyError:
872
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
873

    
874
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
875

    
876
    with pooled_rapi_client(vm) as client:
877
        # Delete previous firewall tags
878
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
879
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
880
                       if (t % uuid) in old_tags]
881
        if delete_tags:
882
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
883
                                      dry_run=settings.TEST)
884

    
885
        if profile != "DISABLED":
886
            client.AddInstanceTags(vm.backend_vm_id, [tag],
887
                                   dry_run=settings.TEST)
888

    
889
        # XXX NOP ModifyInstance call to force process_net_status to run
890
        # on the dispatcher
891
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
892
        client.ModifyInstance(vm.backend_vm_id,
893
                              os_name=os_name)
894
    return None
895

    
896

    
897
def get_instances(backend, bulk=True):
898
    with pooled_rapi_client(backend) as c:
899
        return c.GetInstances(bulk=bulk)
900

    
901

    
902
def get_nodes(backend, bulk=True):
903
    with pooled_rapi_client(backend) as c:
904
        return c.GetNodes(bulk=bulk)
905

    
906

    
907
def get_jobs(backend, bulk=True):
908
    with pooled_rapi_client(backend) as c:
909
        return c.GetJobs(bulk=bulk)
910

    
911

    
912
def get_physical_resources(backend):
913
    """ Get the physical resources of a backend.
914

915
    Get the resources of a backend as reported by the backend (not the db).
916

917
    """
918
    nodes = get_nodes(backend, bulk=True)
919
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
920
    res = {}
921
    for a in attr:
922
        res[a] = 0
923
    for n in nodes:
924
        # Filter out drained, offline and not vm_capable nodes since they will
925
        # not take part in the vm allocation process
926
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
927
        if can_host_vms and n['cnodes']:
928
            for a in attr:
929
                res[a] += int(n[a] or 0)
930
    return res
931

    
932

    
933
def update_backend_resources(backend, resources=None):
934
    """ Update the state of the backend resources in db.
935

936
    """
937

    
938
    if not resources:
939
        resources = get_physical_resources(backend)
940

    
941
    backend.mfree = resources['mfree']
942
    backend.mtotal = resources['mtotal']
943
    backend.dfree = resources['dfree']
944
    backend.dtotal = resources['dtotal']
945
    backend.pinst_cnt = resources['pinst_cnt']
946
    backend.ctotal = resources['ctotal']
947
    backend.updated = datetime.now()
948
    backend.save()
949

    
950

    
951
def get_memory_from_instances(backend):
952
    """ Get the memory that is used from instances.
953

954
    Get the used memory of a backend. Note: This is different for
955
    the real memory used, due to kvm's memory de-duplication.
956

957
    """
958
    with pooled_rapi_client(backend) as client:
959
        instances = client.GetInstances(bulk=True)
960
    mem = 0
961
    for i in instances:
962
        mem += i['oper_ram']
963
    return mem
964

    
965

    
966
def get_available_disk_templates(backend):
967
    """Get the list of available disk templates of a Ganeti backend.
968

969
    The list contains the disk templates that are enabled in the Ganeti backend
970
    and also included in ipolicy-disk-templates.
971

972
    """
973
    with pooled_rapi_client(backend) as c:
974
        info = c.GetInfo()
975
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
976
    try:
977
        enabled_disk_templates = info["enabled_disk_templates"]
978
        return [dp for dp in enabled_disk_templates
979
                if dp in ipolicy_disk_templates]
980
    except KeyError:
981
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
982
        return ipolicy_disk_templates
983

    
984

    
985
def update_backend_disk_templates(backend):
986
    disk_templates = get_available_disk_templates(backend)
987
    backend.disk_templates = disk_templates
988
    backend.save()
989

    
990

    
991
##
992
## Synchronized operations for reconciliation
993
##
994

    
995

    
996
def create_network_synced(network, backend):
997
    result = _create_network_synced(network, backend)
998
    if result[0] != 'success':
999
        return result
1000
    result = connect_network_synced(network, backend)
1001
    return result
1002

    
1003

    
1004
def _create_network_synced(network, backend):
1005
    with pooled_rapi_client(backend) as client:
1006
        job = _create_network(network, backend)
1007
        result = wait_for_job(client, job)
1008
    return result
1009

    
1010

    
1011
def connect_network_synced(network, backend):
1012
    with pooled_rapi_client(backend) as client:
1013
        for group in client.GetGroups():
1014
            job = client.ConnectNetwork(network.backend_id, group,
1015
                                        network.mode, network.link)
1016
            result = wait_for_job(client, job)
1017
            if result[0] != 'success':
1018
                return result
1019

    
1020
    return result
1021

    
1022

    
1023
def wait_for_job(client, jobid):
1024
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1025
    status = result['job_info'][0]
1026
    while status not in ['success', 'error', 'cancel']:
1027
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1028
                                         [result], None)
1029
        status = result['job_info'][0]
1030

    
1031
    if status == 'success':
1032
        return (status, None)
1033
    else:
1034
        error = result['job_info'][1]
1035
        return (status, error)