Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 96feddae

History | View | Annotate | Download (36.6 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource, allocate_ip
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
61

    
62
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
63
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
64
NIC_FIELDS = SIMPLE_NIC_FIELDS + COMPLEX_NIC_FIELDS
65
UNKNOWN_NIC_PREFIX = "unknown-"
66

    
67

    
68
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
69
    """Handle quotas for updated VirtualMachine.
70

71
    Update quotas for the updated VirtualMachine based on the job that run on
72
    the Ganeti backend. If a commission has been already issued for this job,
73
    then this commission is just accepted or rejected based on the job status.
74
    Otherwise, a new commission for the given change is issued, that is also in
75
    force and auto-accept mode. In this case, previous commissions are
76
    rejected, since they reflect a previous state of the VM.
77

78
    """
79
    if job_status not in ["success", "error", "canceled"]:
80
        return vm
81

    
82
    # Check successful completion of a job will trigger any quotable change in
83
    # the VM state.
84
    action = utils.get_action_from_opcode(job_opcode, job_fields)
85
    if action == "BUILD":
86
        # Quotas for new VMs are automatically accepted by the API
87
        return vm
88
    commission_info = quotas.get_commission_info(vm, action=action,
89
                                                 action_fields=job_fields)
90

    
91
    if vm.task_job_id == job_id and vm.serial is not None:
92
        # Commission for this change has already been issued. So just
93
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
94
        # if fails, must be accepted, as the user must manually remove the
95
        # failed server
96
        serial = vm.serial
97
        if job_status == "success":
98
            quotas.accept_serial(serial)
99
        elif job_status in ["error", "canceled"]:
100
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
101
                      serial)
102
            quotas.reject_serial(serial)
103
        vm.serial = None
104
    elif job_status == "success" and commission_info is not None:
105
        log.debug("Expected job was %s. Processing job %s. Commission for"
106
                  " this job: %s", vm.task_job_id, job_id, commission_info)
107
        # Commission for this change has not been issued, or the issued
108
        # commission was unaware of the current change. Reject all previous
109
        # commissions and create a new one in forced mode!
110
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
111
                           % (vm, job_id))
112
        quotas.handle_resource_commission(vm, action,
113
                                          commission_info=commission_info,
114
                                          commission_name=commission_name,
115
                                          force=True,
116
                                          auto_accept=True)
117
        log.debug("Issued new commission: %s", vm.serial)
118

    
119
    return vm
120

    
121

    
122
@transaction.commit_on_success
123
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
124
                      job_fields=None):
125
    """Process a job progress notification from the backend
126

127
    Process an incoming message from the backend (currently Ganeti).
128
    Job notifications with a terminating status (sucess, error, or canceled),
129
    also update the operating state of the VM.
130

131
    """
132
    # See #1492, #1031, #1111 why this line has been removed
133
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
134
    if status not in [x[0] for x in BACKEND_STATUSES]:
135
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
136

    
137
    vm.backendjobid = jobid
138
    vm.backendjobstatus = status
139
    vm.backendopcode = opcode
140
    vm.backendlogmsg = logmsg
141

    
142
    if status in ["queued", "waiting", "running"]:
143
        vm.save()
144
        return
145

    
146
    if job_fields is None:
147
        job_fields = {}
148
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
149

    
150
    # Notifications of success change the operating state
151
    if status == "success":
152
        if state_for_success is not None:
153
            vm.operstate = state_for_success
154
        beparams = job_fields.get("beparams", None)
155
        if beparams:
156
            # Change the flavor of the VM
157
            _process_resize(vm, beparams)
158
        # Update backendtime only for jobs that have been successfully
159
        # completed, since only these jobs update the state of the VM. Else a
160
        # "race condition" may occur when a successful job (e.g.
161
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
162
        # in reversed order.
163
        vm.backendtime = etime
164

    
165
    if status in ["success", "error", "canceled"] and nics is not None:
166
        # Update the NICs of the VM
167
        _process_net_status(vm, etime, nics)
168

    
169
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
170
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
171
        vm.operstate = 'ERROR'
172
        vm.backendtime = etime
173
        # Update state of associated NICs
174
        vm.nics.all().update(state="ERROR")
175
    elif opcode == 'OP_INSTANCE_REMOVE':
176
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
177
        # when no instance exists at the Ganeti backend.
178
        # See ticket #799 for all the details.
179
        if status == 'success' or (status == 'error' and
180
                                   not vm_exists_in_backend(vm)):
181
            # VM has been deleted
182
            for nic in vm.nics.all():
183
                # Release the IP
184
                remove_nic_ips(nic)
185
                # And delete the NIC.
186
                nic.delete()
187
            vm.deleted = True
188
            vm.operstate = state_for_success
189
            vm.backendtime = etime
190
            status = "success"
191

    
192
    if status in ["success", "error", "canceled"]:
193
        # Job is finalized: Handle quotas/commissioning
194
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
195
                              job_status=status, job_fields=job_fields)
196
        # and clear task fields
197
        if vm.task_job_id == jobid:
198
            vm.task = None
199
            vm.task_job_id = None
200

    
201
    vm.save()
202

    
203

    
204
def _process_resize(vm, beparams):
205
    """Change flavor of a VirtualMachine based on new beparams."""
206
    old_flavor = vm.flavor
207
    vcpus = beparams.get("vcpus", old_flavor.cpu)
208
    ram = beparams.get("maxmem", old_flavor.ram)
209
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
210
        return
211
    try:
212
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
213
                                        disk=old_flavor.disk,
214
                                        disk_template=old_flavor.disk_template)
215
    except Flavor.DoesNotExist:
216
        raise Exception("Can not find flavor for VM")
217
    vm.flavor = new_flavor
218
    vm.save()
219

    
220

    
221
@transaction.commit_on_success
222
def process_net_status(vm, etime, nics):
223
    """Wrap _process_net_status inside transaction."""
224
    _process_net_status(vm, etime, nics)
225

    
226

    
227
def _process_net_status(vm, etime, nics):
228
    """Process a net status notification from the backend
229

230
    Process an incoming message from the Ganeti backend,
231
    detailing the NIC configuration of a VM instance.
232

233
    Update the state of the VM in the DB accordingly.
234

235
    """
236
    ganeti_nics = process_ganeti_nics(nics)
237
    db_nics = dict([(nic.id, nic)
238
                    for nic in vm.nics.prefetch_related("ips__subnet")])
239

    
240
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
241
    # guarantee that no deadlock will occur with Backend allocator.
242
    Backend.objects.select_for_update().get(id=vm.backend_id)
243

    
244
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
245
        db_nic = db_nics.get(nic_name)
246
        ganeti_nic = ganeti_nics.get(nic_name)
247
        if ganeti_nic is None:
248
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
249
            # state for more than 5 minutes, then we remove the NIC.
250
            # TODO: This is dangerous as the job may be stack in the queue, and
251
            # releasing the IP may lead to duplicate IP use.
252
            if db_nic.state != "BUILD" or\
253
                (db_nic.state == "BUILD" and
254
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
255
                remove_nic_ips(db_nic)
256
                db_nic.delete()
257
            else:
258
                log.warning("Ignoring recent building NIC: %s", db_nic)
259
        elif db_nic is None:
260
            msg = ("NIC/%s of VM %s does not exist in DB! Cannot automatically"
261
                   " fix this issue!" % (nic_name, vm))
262
            log.error(msg)
263
            continue
264
        elif not nics_are_equal(db_nic, ganeti_nic):
265
            for f in SIMPLE_NIC_FIELDS:
266
                # Update the NIC in DB with the values from Ganeti NIC
267
                setattr(db_nic, f, ganeti_nic[f])
268
                db_nic.save()
269
            # Special case where the IPv4 address has changed, because you
270
            # need to release the old IPv4 address and reserve the new one
271
            ipv4_address = ganeti_nic["ipv4_address"]
272
            if db_nic.ipv4_address != ipv4_address:
273
                remove_nic_ips(db_nic)
274
                if ipv4_address:
275
                    network = ganeti_nic["network"]
276
                    ipaddress = allocate_ip(network, vm.userid,
277
                                            address=ipv4_address)
278
                    ipaddress.nic = nic
279
                    ipaddress.save()
280

    
281
    vm.backendtime = etime
282
    vm.save()
283

    
284

    
285
def nics_are_equal(db_nic, gnt_nic):
286
    for field in NIC_FIELDS:
287
        if getattr(db_nic, field) != gnt_nic[field]:
288
            return False
289
    return True
290

    
291

    
292
def process_ganeti_nics(ganeti_nics):
293
    """Process NIC dict from ganeti"""
294
    new_nics = []
295
    for index, gnic in enumerate(ganeti_nics):
296
        nic_name = gnic.get("name", None)
297
        if nic_name is not None:
298
            nic_id = utils.id_from_nic_name(nic_name)
299
        else:
300
            # Put as default value the index. If it is an unknown NIC to
301
            # synnefo it will be created automaticaly.
302
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
303
        network_name = gnic.get('network', '')
304
        network_id = utils.id_from_network_name(network_name)
305
        network = Network.objects.get(id=network_id)
306

    
307
        # Get the new nic info
308
        mac = gnic.get('mac')
309
        ipv4 = gnic.get('ip')
310
        subnet6 = network.subnet6
311
        ipv6 = mac2eui64(mac, subnet6) if subnet6 else None
312

    
313
        firewall = gnic.get('firewall')
314
        firewall_profile = _reverse_tags.get(firewall)
315
        if not firewall_profile and network.public:
316
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
317

    
318
        nic_info = {
319
            'index': index,
320
            'network': network,
321
            'mac': mac,
322
            'ipv4_address': ipv4,
323
            'ipv6_address': ipv6,
324
            'firewall_profile': firewall_profile,
325
            'state': 'ACTIVE'}
326

    
327
        new_nics.append((nic_id, nic_info))
328
    return dict(new_nics)
329

    
330

    
331
def remove_nic_ips(nic):
332
    """Remove IP addresses associated with a NetworkInterface.
333

334
    Remove all IP addresses that are associated with the NetworkInterface
335
    object, by returning them to the pool and deleting the IPAddress object. If
336
    the IP is a floating IP, then it is just disassociated from the NIC.
337

338
    """
339

    
340
    for ip in nic.ips.all():
341
        if ip.ipversion == 4:
342
            if ip.floating_ip:
343
                ip.nic = None
344
                ip.save()
345
            else:
346
                ip.release_address()
347
        if not ip.floating_ip:
348
            ip.delete()
349

    
350

    
351
@transaction.commit_on_success
352
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
353
    if status not in [x[0] for x in BACKEND_STATUSES]:
354
        raise Network.InvalidBackendMsgError(opcode, status)
355

    
356
    back_network.backendjobid = jobid
357
    back_network.backendjobstatus = status
358
    back_network.backendopcode = opcode
359
    back_network.backendlogmsg = logmsg
360

    
361
    network = back_network.network
362

    
363
    # Notifications of success change the operating state
364
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
365
    if status == 'success' and state_for_success is not None:
366
        back_network.operstate = state_for_success
367

    
368
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
369
        back_network.operstate = 'ERROR'
370
        back_network.backendtime = etime
371

    
372
    if opcode == 'OP_NETWORK_REMOVE':
373
        network_is_deleted = (status == "success")
374
        if network_is_deleted or (status == "error" and not
375
                                  network_exists_in_backend(back_network)):
376
            back_network.operstate = state_for_success
377
            back_network.deleted = True
378
            back_network.backendtime = etime
379

    
380
    if status == 'success':
381
        back_network.backendtime = etime
382
    back_network.save()
383
    # Also you must update the state of the Network!!
384
    update_network_state(network)
385

    
386

    
387
def update_network_state(network):
388
    """Update the state of a Network based on BackendNetwork states.
389

390
    Update the state of a Network based on the operstate of the networks in the
391
    backends that network exists.
392

393
    The state of the network is:
394
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
395
    * DELETED: If it is is 'DELETED' in all backends that have been created.
396

397
    This function also releases the resources (MAC prefix or Bridge) and the
398
    quotas for the network.
399

400
    """
401
    if network.deleted:
402
        # Network has already been deleted. Just assert that state is also
403
        # DELETED
404
        if not network.state == "DELETED":
405
            network.state = "DELETED"
406
            network.save()
407
        return
408

    
409
    backend_states = [s.operstate for s in network.backend_networks.all()]
410
    if not backend_states and network.action != "DESTROY":
411
        if network.state != "ACTIVE":
412
            network.state = "ACTIVE"
413
            network.save()
414
            return
415

    
416
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
417
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
418
                     "DELETED")
419

    
420
    # Release the resources on the deletion of the Network
421
    if deleted:
422
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
423
                 network.id, network.mac_prefix, network.link)
424
        network.deleted = True
425
        network.state = "DELETED"
426
        if network.mac_prefix:
427
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
428
                release_resource(res_type="mac_prefix",
429
                                 value=network.mac_prefix)
430
        if network.link:
431
            if network.FLAVORS[network.flavor]["link"] == "pool":
432
                release_resource(res_type="bridge", value=network.link)
433

    
434
        # Issue commission
435
        if network.userid:
436
            quotas.issue_and_accept_commission(network, delete=True)
437
            # the above has already saved the object and committed;
438
            # a second save would override others' changes, since the
439
            # object is now unlocked
440
            return
441
        elif not network.public:
442
            log.warning("Network %s does not have an owner!", network.id)
443

    
444
        # TODO!!!!!
445
        # Set all subnets as deleted
446
        network.subnets.update(deleted=True)
447
        # And delete the IP pools
448
        network.subnets.ip_pools.all().delete()
449
    network.save()
450

    
451

    
452
@transaction.commit_on_success
453
def process_network_modify(back_network, etime, jobid, opcode, status,
454
                           job_fields):
455
    assert (opcode == "OP_NETWORK_SET_PARAMS")
456
    if status not in [x[0] for x in BACKEND_STATUSES]:
457
        raise Network.InvalidBackendMsgError(opcode, status)
458

    
459
    back_network.backendjobid = jobid
460
    back_network.backendjobstatus = status
461
    back_network.opcode = opcode
462

    
463
    add_reserved_ips = job_fields.get("add_reserved_ips")
464
    if add_reserved_ips:
465
        network = back_network.network
466
        for ip in add_reserved_ips:
467
            network.reserve_address(ip, external=True)
468

    
469
    if status == 'success':
470
        back_network.backendtime = etime
471
    back_network.save()
472

    
473

    
474
@transaction.commit_on_success
475
def process_create_progress(vm, etime, progress):
476

    
477
    percentage = int(progress)
478

    
479
    # The percentage may exceed 100%, due to the way
480
    # snf-image:copy-progress tracks bytes read by image handling processes
481
    percentage = 100 if percentage > 100 else percentage
482
    if percentage < 0:
483
        raise ValueError("Percentage cannot be negative")
484

    
485
    # FIXME: log a warning here, see #1033
486
#   if last_update > percentage:
487
#       raise ValueError("Build percentage should increase monotonically " \
488
#                        "(old = %d, new = %d)" % (last_update, percentage))
489

    
490
    # This assumes that no message of type 'ganeti-create-progress' is going to
491
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
492
    # the instance is STARTED.  What if the two messages are processed by two
493
    # separate dispatcher threads, and the 'ganeti-op-status' message for
494
    # successful creation gets processed before the 'ganeti-create-progress'
495
    # message? [vkoukis]
496
    #
497
    #if not vm.operstate == 'BUILD':
498
    #    raise VirtualMachine.IllegalState("VM is not in building state")
499

    
500
    vm.buildpercentage = percentage
501
    vm.backendtime = etime
502
    vm.save()
503

    
504

    
505
@transaction.commit_on_success
506
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
507
                               details=None):
508
    """
509
    Create virtual machine instance diagnostic entry.
510

511
    :param vm: VirtualMachine instance to create diagnostic for.
512
    :param message: Diagnostic message.
513
    :param source: Diagnostic source identifier (e.g. image-helper).
514
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
515
    :param etime: The time the message occured (if available).
516
    :param details: Additional details or debug information.
517
    """
518
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
519
                                                   source_date=etime,
520
                                                   message=message,
521
                                                   details=details)
522

    
523

    
524
def create_instance(vm, nics, flavor, image):
525
    """`image` is a dictionary which should contain the keys:
526
            'backend_id', 'format' and 'metadata'
527

528
        metadata value should be a dictionary.
529
    """
530

    
531
    # Handle arguments to CreateInstance() as a dictionary,
532
    # initialize it based on a deployment-specific value.
533
    # This enables the administrator to override deployment-specific
534
    # arguments, such as the disk template to use, name of os provider
535
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
536
    #
537
    kw = vm.backend.get_create_params()
538
    kw['mode'] = 'create'
539
    kw['name'] = vm.backend_vm_id
540
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
541

    
542
    kw['disk_template'] = flavor.disk_template
543
    kw['disks'] = [{"size": flavor.disk * 1024}]
544
    provider = flavor.disk_provider
545
    if provider:
546
        kw['disks'][0]['provider'] = provider
547
        kw['disks'][0]['origin'] = flavor.disk_origin
548

    
549
    kw['nics'] = [{"name": nic.backend_uuid,
550
                   "network": nic.network.backend_id,
551
                   "ip": nic.ipv4_address}
552
                  for nic in nics]
553
    backend = vm.backend
554
    depend_jobs = []
555
    for nic in nics:
556
        network = Network.objects.select_for_update().get(id=nic.network_id)
557
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
558
                                                             network=network)
559
        if bnet.operstate != "ACTIVE":
560
            depend_jobs = create_network(network, backend, connect=True)
561
    kw["depends"] = [[job, ["success", "error", "canceled"]]
562
                     for job in depend_jobs]
563

    
564
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
565
    # kw['os'] = settings.GANETI_OS_PROVIDER
566
    kw['ip_check'] = False
567
    kw['name_check'] = False
568

    
569
    # Do not specific a node explicitly, have
570
    # Ganeti use an iallocator instead
571
    #kw['pnode'] = rapi.GetNodes()[0]
572

    
573
    kw['dry_run'] = settings.TEST
574

    
575
    kw['beparams'] = {
576
        'auto_balance': True,
577
        'vcpus': flavor.cpu,
578
        'memory': flavor.ram}
579

    
580
    kw['osparams'] = {
581
        'config_url': vm.config_url,
582
        # Store image id and format to Ganeti
583
        'img_id': image['backend_id'],
584
        'img_format': image['format']}
585

    
586
    # Use opportunistic locking
587
    kw['opportunistic_locking'] = True
588

    
589
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
590
    # kw['hvparams'] = dict(serial_console=False)
591

    
592
    log.debug("Creating instance %s", utils.hide_pass(kw))
593
    with pooled_rapi_client(vm) as client:
594
        return client.CreateInstance(**kw)
595

    
596

    
597
def delete_instance(vm):
598
    with pooled_rapi_client(vm) as client:
599
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
600

    
601

    
602
def reboot_instance(vm, reboot_type):
603
    assert reboot_type in ('soft', 'hard')
604
    kwargs = {"instance": vm.backend_vm_id,
605
              "reboot_type": "hard"}
606
    # XXX: Currently shutdown_timeout parameter is not supported from the
607
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
608
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
609
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
610
    # of OS API is different from the one in Ganeti, and maps to
611
    # 'shutdown_timeout'.
612
    #if reboot_type == "hard":
613
    #    kwargs["shutdown_timeout"] = 0
614
    if settings.TEST:
615
        kwargs["dry_run"] = True
616
    with pooled_rapi_client(vm) as client:
617
        return client.RebootInstance(**kwargs)
618

    
619

    
620
def startup_instance(vm):
621
    with pooled_rapi_client(vm) as client:
622
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
623

    
624

    
625
def shutdown_instance(vm):
626
    with pooled_rapi_client(vm) as client:
627
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
628

    
629

    
630
def resize_instance(vm, vcpus, memory):
631
    beparams = {"vcpus": int(vcpus),
632
                "minmem": int(memory),
633
                "maxmem": int(memory)}
634
    with pooled_rapi_client(vm) as client:
635
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
636

    
637

    
638
def get_instance_console(vm):
639
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
640
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
641
    # useless (see #783).
642
    #
643
    # Until this is fixed on the Ganeti side, construct a console info reply
644
    # directly.
645
    #
646
    # WARNING: This assumes that VNC runs on port network_port on
647
    #          the instance's primary node, and is probably
648
    #          hypervisor-specific.
649
    #
650
    log.debug("Getting console for vm %s", vm)
651

    
652
    console = {}
653
    console['kind'] = 'vnc'
654

    
655
    with pooled_rapi_client(vm) as client:
656
        i = client.GetInstance(vm.backend_vm_id)
657

    
658
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
659
        raise Exception("hv parameter serial_console cannot be true")
660
    console['host'] = i['pnode']
661
    console['port'] = i['network_port']
662

    
663
    return console
664

    
665

    
666
def get_instance_info(vm):
667
    with pooled_rapi_client(vm) as client:
668
        return client.GetInstance(vm.backend_vm_id)
669

    
670

    
671
def vm_exists_in_backend(vm):
672
    try:
673
        get_instance_info(vm)
674
        return True
675
    except GanetiApiError as e:
676
        if e.code == 404:
677
            return False
678
        raise e
679

    
680

    
681
def get_network_info(backend_network):
682
    with pooled_rapi_client(backend_network) as client:
683
        return client.GetNetwork(backend_network.network.backend_id)
684

    
685

    
686
def network_exists_in_backend(backend_network):
687
    try:
688
        get_network_info(backend_network)
689
        return True
690
    except GanetiApiError as e:
691
        if e.code == 404:
692
            return False
693

    
694

    
695
def create_network(network, backend, connect=True):
696
    """Create a network in a Ganeti backend"""
697
    log.debug("Creating network %s in backend %s", network, backend)
698

    
699
    job_id = _create_network(network, backend)
700

    
701
    if connect:
702
        job_ids = connect_network(network, backend, depends=[job_id])
703
        return job_ids
704
    else:
705
        return [job_id]
706

    
707

    
708
def _create_network(network, backend):
709
    """Create a network."""
710

    
711
    tags = network.backend_tag
712
    subnet = None
713
    subnet6 = None
714
    gateway = None
715
    gateway6 = None
716
    for _subnet in network.subnets.all():
717
        if _subnet.ipversion == 4:
718
            if _subnet.dhcp:
719
                tags.append('nfdhcpd')
720
            subnet = _subnet.cidr
721
            gateway = _subnet.gateway
722
        elif _subnet.ipversion == 6:
723
            subnet6 = _subnet.cidr
724
            gateway6 = _subnet.gateway
725

    
726
    if network.public:
727
        conflicts_check = True
728
        tags.append('public')
729
    else:
730
        conflicts_check = False
731
        tags.append('private')
732

    
733
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
734
    # not support IPv6 only networks. To bypass this limitation, we create the
735
    # network with a dummy network subnet, and make Cyclades connect instances
736
    # to such networks, with address=None.
737
    if subnet is None:
738
        subnet = "10.0.0.0/24"
739

    
740
    try:
741
        bn = BackendNetwork.objects.get(network=network, backend=backend)
742
        mac_prefix = bn.mac_prefix
743
    except BackendNetwork.DoesNotExist:
744
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
745
                        " does not exist" % (network.id, backend.id))
746

    
747
    with pooled_rapi_client(backend) as client:
748
        return client.CreateNetwork(network_name=network.backend_id,
749
                                    network=subnet,
750
                                    network6=subnet6,
751
                                    gateway=gateway,
752
                                    gateway6=gateway6,
753
                                    mac_prefix=mac_prefix,
754
                                    conflicts_check=conflicts_check,
755
                                    tags=tags)
756

    
757

    
758
def connect_network(network, backend, depends=[], group=None):
759
    """Connect a network to nodegroups."""
760
    log.debug("Connecting network %s to backend %s", network, backend)
761

    
762
    if network.public:
763
        conflicts_check = True
764
    else:
765
        conflicts_check = False
766

    
767
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
768
    with pooled_rapi_client(backend) as client:
769
        groups = [group] if group is not None else client.GetGroups()
770
        job_ids = []
771
        for group in groups:
772
            job_id = client.ConnectNetwork(network.backend_id, group,
773
                                           network.mode, network.link,
774
                                           conflicts_check,
775
                                           depends=depends)
776
            job_ids.append(job_id)
777
    return job_ids
778

    
779

    
780
def delete_network(network, backend, disconnect=True):
781
    log.debug("Deleting network %s from backend %s", network, backend)
782

    
783
    depends = []
784
    if disconnect:
785
        depends = disconnect_network(network, backend)
786
    _delete_network(network, backend, depends=depends)
787

    
788

    
789
def _delete_network(network, backend, depends=[]):
790
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
791
    with pooled_rapi_client(backend) as client:
792
        return client.DeleteNetwork(network.backend_id, depends)
793

    
794

    
795
def disconnect_network(network, backend, group=None):
796
    log.debug("Disconnecting network %s to backend %s", network, backend)
797

    
798
    with pooled_rapi_client(backend) as client:
799
        groups = [group] if group is not None else client.GetGroups()
800
        job_ids = []
801
        for group in groups:
802
            job_id = client.DisconnectNetwork(network.backend_id, group)
803
            job_ids.append(job_id)
804
    return job_ids
805

    
806

    
807
def connect_to_network(vm, nic):
808
    network = nic.network
809
    backend = vm.backend
810
    network = Network.objects.select_for_update().get(id=network.id)
811
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
812
                                                         network=network)
813
    depend_jobs = []
814
    if bnet.operstate != "ACTIVE":
815
        depend_jobs = create_network(network, backend, connect=True)
816

    
817
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
818

    
819
    nic = {'name': nic.backend_uuid,
820
           'network': network.backend_id,
821
           'ip': nic.ipv4_address}
822

    
823
    log.debug("Adding NIC %s to VM %s", nic, vm)
824

    
825
    kwargs = {
826
        "instance": vm.backend_vm_id,
827
        "nics": [("add", "-1", nic)],
828
        "depends": depends,
829
    }
830
    if vm.backend.use_hotplug():
831
        kwargs["hotplug"] = True
832
    if settings.TEST:
833
        kwargs["dry_run"] = True
834

    
835
    with pooled_rapi_client(vm) as client:
836
        return client.ModifyInstance(**kwargs)
837

    
838

    
839
def disconnect_from_network(vm, nic):
840
    log.debug("Removing NIC %s of VM %s", nic, vm)
841

    
842
    kwargs = {
843
        "instance": vm.backend_vm_id,
844
        "nics": [("remove", nic.backend_uuid, {})],
845
    }
846
    if vm.backend.use_hotplug():
847
        kwargs["hotplug"] = True
848
    if settings.TEST:
849
        kwargs["dry_run"] = True
850

    
851
    with pooled_rapi_client(vm) as client:
852
        jobID = client.ModifyInstance(**kwargs)
853
        firewall_profile = nic.firewall_profile
854
        if firewall_profile and firewall_profile != "DISABLED":
855
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
856
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
857
                                      dry_run=settings.TEST)
858

    
859
        return jobID
860

    
861

    
862
def set_firewall_profile(vm, profile, nic):
863
    uuid = nic.backend_uuid
864
    try:
865
        tag = _firewall_tags[profile] % uuid
866
    except KeyError:
867
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
868

    
869
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
870

    
871
    with pooled_rapi_client(vm) as client:
872
        # Delete previous firewall tags
873
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
874
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
875
                       if (t % uuid) in old_tags]
876
        if delete_tags:
877
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
878
                                      dry_run=settings.TEST)
879

    
880
        if profile != "DISABLED":
881
            client.AddInstanceTags(vm.backend_vm_id, [tag],
882
                                   dry_run=settings.TEST)
883

    
884
        # XXX NOP ModifyInstance call to force process_net_status to run
885
        # on the dispatcher
886
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
887
        client.ModifyInstance(vm.backend_vm_id,
888
                              os_name=os_name)
889
    return None
890

    
891

    
892
def get_instances(backend, bulk=True):
893
    with pooled_rapi_client(backend) as c:
894
        return c.GetInstances(bulk=bulk)
895

    
896

    
897
def get_nodes(backend, bulk=True):
898
    with pooled_rapi_client(backend) as c:
899
        return c.GetNodes(bulk=bulk)
900

    
901

    
902
def get_jobs(backend, bulk=True):
903
    with pooled_rapi_client(backend) as c:
904
        return c.GetJobs(bulk=bulk)
905

    
906

    
907
def get_physical_resources(backend):
908
    """ Get the physical resources of a backend.
909

910
    Get the resources of a backend as reported by the backend (not the db).
911

912
    """
913
    nodes = get_nodes(backend, bulk=True)
914
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
915
    res = {}
916
    for a in attr:
917
        res[a] = 0
918
    for n in nodes:
919
        # Filter out drained, offline and not vm_capable nodes since they will
920
        # not take part in the vm allocation process
921
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
922
        if can_host_vms and n['cnodes']:
923
            for a in attr:
924
                res[a] += int(n[a] or 0)
925
    return res
926

    
927

    
928
def update_backend_resources(backend, resources=None):
929
    """ Update the state of the backend resources in db.
930

931
    """
932

    
933
    if not resources:
934
        resources = get_physical_resources(backend)
935

    
936
    backend.mfree = resources['mfree']
937
    backend.mtotal = resources['mtotal']
938
    backend.dfree = resources['dfree']
939
    backend.dtotal = resources['dtotal']
940
    backend.pinst_cnt = resources['pinst_cnt']
941
    backend.ctotal = resources['ctotal']
942
    backend.updated = datetime.now()
943
    backend.save()
944

    
945

    
946
def get_memory_from_instances(backend):
947
    """ Get the memory that is used from instances.
948

949
    Get the used memory of a backend. Note: This is different for
950
    the real memory used, due to kvm's memory de-duplication.
951

952
    """
953
    with pooled_rapi_client(backend) as client:
954
        instances = client.GetInstances(bulk=True)
955
    mem = 0
956
    for i in instances:
957
        mem += i['oper_ram']
958
    return mem
959

    
960

    
961
def get_available_disk_templates(backend):
962
    """Get the list of available disk templates of a Ganeti backend.
963

964
    The list contains the disk templates that are enabled in the Ganeti backend
965
    and also included in ipolicy-disk-templates.
966

967
    """
968
    with pooled_rapi_client(backend) as c:
969
        info = c.GetInfo()
970
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
971
    try:
972
        enabled_disk_templates = info["enabled_disk_templates"]
973
        return [dp for dp in enabled_disk_templates
974
                if dp in ipolicy_disk_templates]
975
    except KeyError:
976
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
977
        return ipolicy_disk_templates
978

    
979

    
980
def update_backend_disk_templates(backend):
981
    disk_templates = get_available_disk_templates(backend)
982
    backend.disk_templates = disk_templates
983
    backend.save()
984

    
985

    
986
##
987
## Synchronized operations for reconciliation
988
##
989

    
990

    
991
def create_network_synced(network, backend):
992
    result = _create_network_synced(network, backend)
993
    if result[0] != 'success':
994
        return result
995
    result = connect_network_synced(network, backend)
996
    return result
997

    
998

    
999
def _create_network_synced(network, backend):
1000
    with pooled_rapi_client(backend) as client:
1001
        job = _create_network(network, backend)
1002
        result = wait_for_job(client, job)
1003
    return result
1004

    
1005

    
1006
def connect_network_synced(network, backend):
1007
    with pooled_rapi_client(backend) as client:
1008
        for group in client.GetGroups():
1009
            job = client.ConnectNetwork(network.backend_id, group,
1010
                                        network.mode, network.link)
1011
            result = wait_for_job(client, job)
1012
            if result[0] != 'success':
1013
                return result
1014

    
1015
    return result
1016

    
1017

    
1018
def wait_for_job(client, jobid):
1019
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1020
    status = result['job_info'][0]
1021
    while status not in ['success', 'error', 'cancel']:
1022
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1023
                                         [result], None)
1024
        status = result['job_info'][0]
1025

    
1026
    if status == 'success':
1027
        return (status, None)
1028
    else:
1029
        error = result['job_info'][1]
1030
        return (status, error)