Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 0d069390

History | View | Annotate | Download (36.9 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, FloatingIP)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
61

    
62
NIC_FIELDS = ["state", "mac", "ipv4", "ipv6", "network", "firewall_profile",
63
              "index"]
64
UNKNOWN_NIC_PREFIX = "unknown-"
65

    
66

    
67
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
68
    """Handle quotas for updated VirtualMachine.
69

70
    Update quotas for the updated VirtualMachine based on the job that run on
71
    the Ganeti backend. If a commission has been already issued for this job,
72
    then this commission is just accepted or rejected based on the job status.
73
    Otherwise, a new commission for the given change is issued, that is also in
74
    force and auto-accept mode. In this case, previous commissions are
75
    rejected, since they reflect a previous state of the VM.
76

77
    """
78
    if job_status not in ["success", "error", "canceled"]:
79
        return vm
80

    
81
    # Check successful completion of a job will trigger any quotable change in
82
    # the VM state.
83
    action = utils.get_action_from_opcode(job_opcode, job_fields)
84
    if action == "BUILD":
85
        # Quotas for new VMs are automatically accepted by the API
86
        return vm
87
    commission_info = quotas.get_commission_info(vm, action=action,
88
                                                 action_fields=job_fields)
89

    
90
    if vm.task_job_id == job_id and vm.serial is not None:
91
        # Commission for this change has already been issued. So just
92
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
93
        # if fails, must be accepted, as the user must manually remove the
94
        # failed server
95
        serial = vm.serial
96
        if job_status == "success":
97
            quotas.accept_serial(serial)
98
        elif job_status in ["error", "canceled"]:
99
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
100
                      serial)
101
            quotas.reject_serial(serial)
102
        vm.serial = None
103
    elif job_status == "success" and commission_info is not None:
104
        log.debug("Expected job was %s. Processing job %s. Commission for"
105
                  " this job: %s", vm.task_job_id, job_id, commission_info)
106
        # Commission for this change has not been issued, or the issued
107
        # commission was unaware of the current change. Reject all previous
108
        # commissions and create a new one in forced mode!
109
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
110
                           % (vm, job_id))
111
        quotas.handle_resource_commission(vm, action,
112
                                          commission_info=commission_info,
113
                                          commission_name=commission_name,
114
                                          force=True,
115
                                          auto_accept=True)
116
        log.debug("Issued new commission: %s", vm.serial)
117

    
118
    return vm
119

    
120

    
121
@transaction.commit_on_success
122
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
123
                      job_fields=None):
124
    """Process a job progress notification from the backend
125

126
    Process an incoming message from the backend (currently Ganeti).
127
    Job notifications with a terminating status (sucess, error, or canceled),
128
    also update the operating state of the VM.
129

130
    """
131
    # See #1492, #1031, #1111 why this line has been removed
132
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
133
    if status not in [x[0] for x in BACKEND_STATUSES]:
134
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
135

    
136
    vm.backendjobid = jobid
137
    vm.backendjobstatus = status
138
    vm.backendopcode = opcode
139
    vm.backendlogmsg = logmsg
140

    
141
    if status in ["queued", "waiting", "running"]:
142
        vm.save()
143
        return
144

    
145
    if job_fields is None:
146
        job_fields = {}
147
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
148

    
149
    # Notifications of success change the operating state
150
    if status == "success":
151
        if state_for_success is not None:
152
            vm.operstate = state_for_success
153
        beparams = job_fields.get("beparams", None)
154
        if beparams:
155
            # Change the flavor of the VM
156
            _process_resize(vm, beparams)
157
        # Update backendtime only for jobs that have been successfully
158
        # completed, since only these jobs update the state of the VM. Else a
159
        # "race condition" may occur when a successful job (e.g.
160
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
161
        # in reversed order.
162
        vm.backendtime = etime
163

    
164
    if status in ["success", "error", "canceled"] and nics is not None:
165
        # Update the NICs of the VM
166
        _process_net_status(vm, etime, nics)
167

    
168
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
169
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
170
        vm.operstate = 'ERROR'
171
        vm.backendtime = etime
172
    elif opcode == 'OP_INSTANCE_REMOVE':
173
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
174
        # when no instance exists at the Ganeti backend.
175
        # See ticket #799 for all the details.
176
        if status == 'success' or (status == 'error' and
177
                                   not vm_exists_in_backend(vm)):
178
            # VM has been deleted
179
            for nic in vm.nics.all():
180
                # Release the IP
181
                release_nic_address(nic)
182
                # And delete the NIC.
183
                nic.delete()
184
            vm.deleted = True
185
            vm.operstate = state_for_success
186
            vm.backendtime = etime
187
            status = "success"
188

    
189
    if status in ["success", "error", "canceled"]:
190
        # Job is finalized: Handle quotas/commissioning
191
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
192
                              job_status=status, job_fields=job_fields)
193
        # and clear task fields
194
        if vm.task_job_id == jobid:
195
            vm.task = None
196
            vm.task_job_id = None
197

    
198
    vm.save()
199

    
200

    
201
def _process_resize(vm, beparams):
202
    """Change flavor of a VirtualMachine based on new beparams."""
203
    old_flavor = vm.flavor
204
    vcpus = beparams.get("vcpus", old_flavor.cpu)
205
    ram = beparams.get("maxmem", old_flavor.ram)
206
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
207
        return
208
    try:
209
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
210
                                        disk=old_flavor.disk,
211
                                        disk_template=old_flavor.disk_template)
212
    except Flavor.DoesNotExist:
213
        raise Exception("Can not find flavor for VM")
214
    vm.flavor = new_flavor
215
    vm.save()
216

    
217

    
218
@transaction.commit_on_success
219
def process_net_status(vm, etime, nics):
220
    """Wrap _process_net_status inside transaction."""
221
    _process_net_status(vm, etime, nics)
222

    
223

    
224
def _process_net_status(vm, etime, nics):
225
    """Process a net status notification from the backend
226

227
    Process an incoming message from the Ganeti backend,
228
    detailing the NIC configuration of a VM instance.
229

230
    Update the state of the VM in the DB accordingly.
231

232
    """
233
    ganeti_nics = process_ganeti_nics(nics)
234
    db_nics = dict([(nic.id, nic) for nic in vm.nics.all()])
235

    
236
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
237
    # guarantee that no deadlock will occur with Backend allocator.
238
    Backend.objects.select_for_update().get(id=vm.backend_id)
239

    
240
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
241
        db_nic = db_nics.get(nic_name)
242
        ganeti_nic = ganeti_nics.get(nic_name)
243
        if ganeti_nic is None:
244
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
245
            # state for more than 5 minutes, then we remove the NIC.
246
            # TODO: This is dangerous as the job may be stack in the queue, and
247
            # releasing the IP may lead to duplicate IP use.
248
            if db_nic.state != "BUILDING" or\
249
                (db_nic.state == "BUILDING" and
250
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
251
                release_nic_address(db_nic)
252
                db_nic.delete()
253
            else:
254
                log.warning("Ignoring recent building NIC: %s", db_nic)
255
        elif db_nic is None:
256
            # NIC exists in Ganeti but not in DB
257
            if str(nic_name).startswith(UNKNOWN_NIC_PREFIX):
258
                msg = "Can not process NIC! NIC '%s' does not have a"\
259
                      " valid name." % ganeti_nic
260
                log.error(msg)
261
                continue
262
            if ganeti_nic["ipv4"]:
263
                network = ganeti_nic["network"]
264
                network.reserve_address(ganeti_nic["ipv4"])
265
            vm.nics.create(id=nic_name, **ganeti_nic)
266
        elif not nics_are_equal(db_nic, ganeti_nic):
267
            # Special case where the IPv4 address has changed, because you
268
            # need to release the old IPv4 address and reserve the new one
269
            if db_nic.ipv4 != ganeti_nic["ipv4"]:
270
                release_nic_address(db_nic)
271
                if ganeti_nic["ipv4"]:
272
                    ganeti_nic["network"].reserve_address(ganeti_nic["ipv4"])
273

    
274
            # Update the NIC in DB with the values from Ganeti NIC
275
            [setattr(db_nic, f, ganeti_nic[f]) for f in NIC_FIELDS]
276
            db_nic.save()
277

    
278
            # Dummy update the network, to work with 'changed-since'
279
            db_nic.network.save()
280

    
281
    vm.backendtime = etime
282
    vm.save()
283

    
284

    
285
def nics_are_equal(db_nic, gnt_nic):
286
    for field in NIC_FIELDS:
287
        if getattr(db_nic, field) != gnt_nic[field]:
288
            return False
289
    return True
290

    
291

    
292
def process_ganeti_nics(ganeti_nics):
293
    """Process NIC dict from ganeti"""
294
    new_nics = []
295
    for index, gnic in enumerate(ganeti_nics):
296
        nic_name = gnic.get("name", None)
297
        if nic_name is not None:
298
            nic_id = utils.id_from_nic_name(nic_name)
299
        else:
300
            # Put as default value the index. If it is an unknown NIC to
301
            # synnefo it will be created automaticaly.
302
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
303
        network_name = gnic.get('network', '')
304
        network_id = utils.id_from_network_name(network_name)
305
        network = Network.objects.get(id=network_id)
306

    
307
        # Get the new nic info
308
        mac = gnic.get('mac')
309
        ipv4 = gnic.get('ip')
310
        ipv6 = mac2eui64(mac, network.subnet6)\
311
            if network.subnet6 is not None else None
312

    
313
        firewall = gnic.get('firewall')
314
        firewall_profile = _reverse_tags.get(firewall)
315
        if not firewall_profile and network.public:
316
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
317

    
318
        nic_info = {
319
            'index': index,
320
            'network': network,
321
            'mac': mac,
322
            'ipv4': ipv4,
323
            'ipv6': ipv6,
324
            'firewall_profile': firewall_profile,
325
            'state': 'ACTIVE'}
326

    
327
        new_nics.append((nic_id, nic_info))
328
    return dict(new_nics)
329

    
330

    
331
def release_nic_address(nic):
332
    """Release the IPv4 address of a NIC.
333

334
    Check if an instance's NIC has an IPv4 address and release it if it is not
335
    a Floating IP. If it is as Floating IP, then disassociate the FloatingIP
336
    from the machine.
337

338
    """
339

    
340
    if nic.ipv4:
341
        if nic.ip_type == "FLOATING":
342
            FloatingIP.objects.filter(machine=nic.machine_id,
343
                                      network=nic.network_id,
344
                                      ipv4=nic.ipv4).update(machine=None)
345
        else:
346
            nic.network.release_address(nic.ipv4)
347

    
348

    
349
@transaction.commit_on_success
350
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
351
    if status not in [x[0] for x in BACKEND_STATUSES]:
352
        raise Network.InvalidBackendMsgError(opcode, status)
353

    
354
    back_network.backendjobid = jobid
355
    back_network.backendjobstatus = status
356
    back_network.backendopcode = opcode
357
    back_network.backendlogmsg = logmsg
358

    
359
    network = back_network.network
360

    
361
    # Notifications of success change the operating state
362
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
363
    if status == 'success' and state_for_success is not None:
364
        back_network.operstate = state_for_success
365

    
366
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
367
        back_network.operstate = 'ERROR'
368
        back_network.backendtime = etime
369

    
370
    if opcode == 'OP_NETWORK_REMOVE':
371
        network_is_deleted = (status == "success")
372
        if network_is_deleted or (status == "error" and not
373
                                  network_exists_in_backend(back_network)):
374
            back_network.operstate = state_for_success
375
            back_network.deleted = True
376
            back_network.backendtime = etime
377

    
378
    if status == 'success':
379
        back_network.backendtime = etime
380
    back_network.save()
381
    # Also you must update the state of the Network!!
382
    update_network_state(network)
383

    
384

    
385
def update_network_state(network):
386
    """Update the state of a Network based on BackendNetwork states.
387

388
    Update the state of a Network based on the operstate of the networks in the
389
    backends that network exists.
390

391
    The state of the network is:
392
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
393
    * DELETED: If it is is 'DELETED' in all backends that have been created.
394

395
    This function also releases the resources (MAC prefix or Bridge) and the
396
    quotas for the network.
397

398
    """
399
    if network.deleted:
400
        # Network has already been deleted. Just assert that state is also
401
        # DELETED
402
        if not network.state == "DELETED":
403
            network.state = "DELETED"
404
            network.save()
405
        return
406

    
407
    backend_states = [s.operstate for s in network.backend_networks.all()]
408
    if not backend_states and network.action != "DESTROY":
409
        if network.state != "ACTIVE":
410
            network.state = "ACTIVE"
411
            network.save()
412
            return
413

    
414
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
415
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
416
                     "DELETED")
417

    
418
    # Release the resources on the deletion of the Network
419
    if deleted:
420
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
421
                 network.id, network.mac_prefix, network.link)
422
        network.deleted = True
423
        network.state = "DELETED"
424
        if network.mac_prefix:
425
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
426
                release_resource(res_type="mac_prefix",
427
                                 value=network.mac_prefix)
428
        if network.link:
429
            if network.FLAVORS[network.flavor]["link"] == "pool":
430
                release_resource(res_type="bridge", value=network.link)
431

    
432
        # Issue commission
433
        if network.userid:
434
            quotas.issue_and_accept_commission(network, delete=True)
435
            # the above has already saved the object and committed;
436
            # a second save would override others' changes, since the
437
            # object is now unlocked
438
            return
439
        elif not network.public:
440
            log.warning("Network %s does not have an owner!", network.id)
441
    network.save()
442

    
443

    
444
@transaction.commit_on_success
445
def process_network_modify(back_network, etime, jobid, opcode, status,
446
                           job_fields):
447
    assert (opcode == "OP_NETWORK_SET_PARAMS")
448
    if status not in [x[0] for x in BACKEND_STATUSES]:
449
        raise Network.InvalidBackendMsgError(opcode, status)
450

    
451
    back_network.backendjobid = jobid
452
    back_network.backendjobstatus = status
453
    back_network.opcode = opcode
454

    
455
    add_reserved_ips = job_fields.get("add_reserved_ips")
456
    if add_reserved_ips:
457
        net = back_network.network
458
        pool = net.get_pool()
459
        if add_reserved_ips:
460
            for ip in add_reserved_ips:
461
                pool.reserve(ip, external=True)
462
        pool.save()
463

    
464
    if status == 'success':
465
        back_network.backendtime = etime
466
    back_network.save()
467

    
468

    
469
@transaction.commit_on_success
470
def process_create_progress(vm, etime, progress):
471

    
472
    percentage = int(progress)
473

    
474
    # The percentage may exceed 100%, due to the way
475
    # snf-image:copy-progress tracks bytes read by image handling processes
476
    percentage = 100 if percentage > 100 else percentage
477
    if percentage < 0:
478
        raise ValueError("Percentage cannot be negative")
479

    
480
    # FIXME: log a warning here, see #1033
481
#   if last_update > percentage:
482
#       raise ValueError("Build percentage should increase monotonically " \
483
#                        "(old = %d, new = %d)" % (last_update, percentage))
484

    
485
    # This assumes that no message of type 'ganeti-create-progress' is going to
486
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
487
    # the instance is STARTED.  What if the two messages are processed by two
488
    # separate dispatcher threads, and the 'ganeti-op-status' message for
489
    # successful creation gets processed before the 'ganeti-create-progress'
490
    # message? [vkoukis]
491
    #
492
    #if not vm.operstate == 'BUILD':
493
    #    raise VirtualMachine.IllegalState("VM is not in building state")
494

    
495
    vm.buildpercentage = percentage
496
    vm.backendtime = etime
497
    vm.save()
498

    
499

    
500
@transaction.commit_on_success
501
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
502
                               details=None):
503
    """
504
    Create virtual machine instance diagnostic entry.
505

506
    :param vm: VirtualMachine instance to create diagnostic for.
507
    :param message: Diagnostic message.
508
    :param source: Diagnostic source identifier (e.g. image-helper).
509
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
510
    :param etime: The time the message occured (if available).
511
    :param details: Additional details or debug information.
512
    """
513
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
514
                                                   source_date=etime,
515
                                                   message=message,
516
                                                   details=details)
517

    
518

    
519
def create_instance(vm, nics, flavor, image):
520
    """`image` is a dictionary which should contain the keys:
521
            'backend_id', 'format' and 'metadata'
522

523
        metadata value should be a dictionary.
524
    """
525

    
526
    # Handle arguments to CreateInstance() as a dictionary,
527
    # initialize it based on a deployment-specific value.
528
    # This enables the administrator to override deployment-specific
529
    # arguments, such as the disk template to use, name of os provider
530
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
531
    #
532
    kw = vm.backend.get_create_params()
533
    kw['mode'] = 'create'
534
    kw['name'] = vm.backend_vm_id
535
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
536

    
537
    kw['disk_template'] = flavor.disk_template
538
    kw['disks'] = [{"size": flavor.disk * 1024}]
539
    provider = flavor.disk_provider
540
    if provider:
541
        kw['disks'][0]['provider'] = provider
542
        kw['disks'][0]['origin'] = flavor.disk_origin
543

    
544
    kw['nics'] = [{"name": nic.backend_uuid,
545
                   "network": nic.network.backend_id,
546
                   "ip": nic.ipv4}
547
                  for nic in nics]
548
    backend = vm.backend
549
    depend_jobs = []
550
    for nic in nics:
551
        network = Network.objects.select_for_update().get(id=nic.network.id)
552
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
553
                                                             network=network)
554
        if bnet.operstate != "ACTIVE":
555
            if network.public:
556
                msg = "Can not connect instance to network %s. Network is not"\
557
                      " ACTIVE in backend %s." % (network, backend)
558
                raise Exception(msg)
559
            else:
560
                jobs = create_network(network, backend, connect=True)
561
                if isinstance(jobs, list):
562
                    depend_jobs.extend(jobs)
563
                else:
564
                    depend_jobs.append(jobs)
565
    kw["depends"] = [[job, ["success", "error", "canceled"]]
566
                     for job in depend_jobs]
567

    
568
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
569
    # kw['os'] = settings.GANETI_OS_PROVIDER
570
    kw['ip_check'] = False
571
    kw['name_check'] = False
572

    
573
    # Do not specific a node explicitly, have
574
    # Ganeti use an iallocator instead
575
    #kw['pnode'] = rapi.GetNodes()[0]
576

    
577
    kw['dry_run'] = settings.TEST
578

    
579
    kw['beparams'] = {
580
        'auto_balance': True,
581
        'vcpus': flavor.cpu,
582
        'memory': flavor.ram}
583

    
584
    kw['osparams'] = {
585
        'config_url': vm.config_url,
586
        # Store image id and format to Ganeti
587
        'img_id': image['backend_id'],
588
        'img_format': image['format']}
589

    
590
    # Use opportunistic locking
591
    kw['opportunistic_locking'] = True
592

    
593
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
594
    # kw['hvparams'] = dict(serial_console=False)
595

    
596
    log.debug("Creating instance %s", utils.hide_pass(kw))
597
    with pooled_rapi_client(vm) as client:
598
        return client.CreateInstance(**kw)
599

    
600

    
601
def delete_instance(vm):
602
    with pooled_rapi_client(vm) as client:
603
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
604

    
605

    
606
def reboot_instance(vm, reboot_type):
607
    assert reboot_type in ('soft', 'hard')
608
    kwargs = {"instance": vm.backend_vm_id,
609
              "reboot_type": "hard"}
610
    # XXX: Currently shutdown_timeout parameter is not supported from the
611
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
612
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
613
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
614
    # of OS API is different from the one in Ganeti, and maps to
615
    # 'shutdown_timeout'.
616
    #if reboot_type == "hard":
617
    #    kwargs["shutdown_timeout"] = 0
618
    if settings.TEST:
619
        kwargs["dry_run"] = True
620
    with pooled_rapi_client(vm) as client:
621
        return client.RebootInstance(**kwargs)
622

    
623

    
624
def startup_instance(vm):
625
    with pooled_rapi_client(vm) as client:
626
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
627

    
628

    
629
def shutdown_instance(vm):
630
    with pooled_rapi_client(vm) as client:
631
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
632

    
633

    
634
def resize_instance(vm, vcpus, memory):
635
    beparams = {"vcpus": int(vcpus),
636
                "minmem": int(memory),
637
                "maxmem": int(memory)}
638
    with pooled_rapi_client(vm) as client:
639
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
640

    
641

    
642
def get_instance_console(vm):
643
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
644
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
645
    # useless (see #783).
646
    #
647
    # Until this is fixed on the Ganeti side, construct a console info reply
648
    # directly.
649
    #
650
    # WARNING: This assumes that VNC runs on port network_port on
651
    #          the instance's primary node, and is probably
652
    #          hypervisor-specific.
653
    #
654
    log.debug("Getting console for vm %s", vm)
655

    
656
    console = {}
657
    console['kind'] = 'vnc'
658

    
659
    with pooled_rapi_client(vm) as client:
660
        i = client.GetInstance(vm.backend_vm_id)
661

    
662
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
663
        raise Exception("hv parameter serial_console cannot be true")
664
    console['host'] = i['pnode']
665
    console['port'] = i['network_port']
666

    
667
    return console
668

    
669

    
670
def get_instance_info(vm):
671
    with pooled_rapi_client(vm) as client:
672
        return client.GetInstance(vm.backend_vm_id)
673

    
674

    
675
def vm_exists_in_backend(vm):
676
    try:
677
        get_instance_info(vm)
678
        return True
679
    except GanetiApiError as e:
680
        if e.code == 404:
681
            return False
682
        raise e
683

    
684

    
685
def get_network_info(backend_network):
686
    with pooled_rapi_client(backend_network) as client:
687
        return client.GetNetwork(backend_network.network.backend_id)
688

    
689

    
690
def network_exists_in_backend(backend_network):
691
    try:
692
        get_network_info(backend_network)
693
        return True
694
    except GanetiApiError as e:
695
        if e.code == 404:
696
            return False
697

    
698

    
699
def create_network(network, backend, connect=True):
700
    """Create a network in a Ganeti backend"""
701
    log.debug("Creating network %s in backend %s", network, backend)
702

    
703
    job_id = _create_network(network, backend)
704

    
705
    if connect:
706
        job_ids = connect_network(network, backend, depends=[job_id])
707
        return job_ids
708
    else:
709
        return [job_id]
710

    
711

    
712
def _create_network(network, backend):
713
    """Create a network."""
714

    
715
    tags = network.backend_tag
716
    if network.dhcp:
717
        tags.append('nfdhcpd')
718

    
719
    if network.public:
720
        conflicts_check = True
721
        tags.append('public')
722
    else:
723
        conflicts_check = False
724
        tags.append('private')
725

    
726
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
727
    # not support IPv6 only networks. To bypass this limitation, we create the
728
    # network with a dummy network subnet, and make Cyclades connect instances
729
    # to such networks, with address=None.
730
    subnet = network.subnet
731
    if subnet is None:
732
        subnet = "10.0.0.0/24"
733

    
734
    try:
735
        bn = BackendNetwork.objects.get(network=network, backend=backend)
736
        mac_prefix = bn.mac_prefix
737
    except BackendNetwork.DoesNotExist:
738
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
739
                        " does not exist" % (network.id, backend.id))
740

    
741
    with pooled_rapi_client(backend) as client:
742
        return client.CreateNetwork(network_name=network.backend_id,
743
                                    network=subnet,
744
                                    network6=network.subnet6,
745
                                    gateway=network.gateway,
746
                                    gateway6=network.gateway6,
747
                                    mac_prefix=mac_prefix,
748
                                    conflicts_check=conflicts_check,
749
                                    tags=tags)
750

    
751

    
752
def connect_network(network, backend, depends=[], group=None):
753
    """Connect a network to nodegroups."""
754
    log.debug("Connecting network %s to backend %s", network, backend)
755

    
756
    if network.public:
757
        conflicts_check = True
758
    else:
759
        conflicts_check = False
760

    
761
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
762
    with pooled_rapi_client(backend) as client:
763
        groups = [group] if group is not None else client.GetGroups()
764
        job_ids = []
765
        for group in groups:
766
            job_id = client.ConnectNetwork(network.backend_id, group,
767
                                           network.mode, network.link,
768
                                           conflicts_check,
769
                                           depends=depends)
770
            job_ids.append(job_id)
771
    return job_ids
772

    
773

    
774
def delete_network(network, backend, disconnect=True):
775
    log.debug("Deleting network %s from backend %s", network, backend)
776

    
777
    depends = []
778
    if disconnect:
779
        depends = disconnect_network(network, backend)
780
    _delete_network(network, backend, depends=depends)
781

    
782

    
783
def _delete_network(network, backend, depends=[]):
784
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
785
    with pooled_rapi_client(backend) as client:
786
        return client.DeleteNetwork(network.backend_id, depends)
787

    
788

    
789
def disconnect_network(network, backend, group=None):
790
    log.debug("Disconnecting network %s to backend %s", network, backend)
791

    
792
    with pooled_rapi_client(backend) as client:
793
        groups = [group] if group is not None else client.GetGroups()
794
        job_ids = []
795
        for group in groups:
796
            job_id = client.DisconnectNetwork(network.backend_id, group)
797
            job_ids.append(job_id)
798
    return job_ids
799

    
800

    
801
def connect_to_network(vm, nic):
802
    network = nic.network
803
    backend = vm.backend
804
    network = Network.objects.select_for_update().get(id=network.id)
805
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
806
                                                         network=network)
807
    depend_jobs = []
808
    if bnet.operstate != "ACTIVE":
809
        depend_jobs = create_network(network, backend, connect=True)
810

    
811
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
812

    
813
    nic = {'name': nic.backend_uuid,
814
           'network': network.backend_id,
815
           'ip': nic.ipv4}
816

    
817
    log.debug("Adding NIC %s to VM %s", nic, vm)
818

    
819
    kwargs = {
820
        "instance": vm.backend_vm_id,
821
        "nics": [("add", "-1", nic)],
822
        "depends": depends,
823
    }
824
    if vm.backend.use_hotplug():
825
        kwargs["hotplug"] = True
826
    if settings.TEST:
827
        kwargs["dry_run"] = True
828

    
829
    with pooled_rapi_client(vm) as client:
830
        return client.ModifyInstance(**kwargs)
831

    
832

    
833
def disconnect_from_network(vm, nic):
834
    log.debug("Removing NIC %s of VM %s", nic, vm)
835

    
836
    kwargs = {
837
        "instance": vm.backend_vm_id,
838
        "nics": [("remove", nic.backend_uuid, {})],
839
    }
840
    if vm.backend.use_hotplug():
841
        kwargs["hotplug"] = True
842
    if settings.TEST:
843
        kwargs["dry_run"] = True
844

    
845
    with pooled_rapi_client(vm) as client:
846
        jobID = client.ModifyInstance(**kwargs)
847
        # If the NIC has a tag for a firewall profile it must be deleted,
848
        # otherwise it may affect another NIC. XXX: Deleting the tag should
849
        # depend on the removing the NIC, but currently RAPI client does not
850
        # support this, this may result in clearing the firewall profile
851
        # without successfully removing the NIC. This issue will be fixed with
852
        # use of NIC UUIDs.
853
        firewall_profile = nic.firewall_profile
854
        if firewall_profile and firewall_profile != "DISABLED":
855
            tag = _firewall_tags[firewall_profile] % nic.index
856
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
857
                                      dry_run=settings.TEST)
858

    
859
        return jobID
860

    
861

    
862
def set_firewall_profile(vm, profile, index=0):
863
    try:
864
        tag = _firewall_tags[profile] % index
865
    except KeyError:
866
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
867

    
868
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
869

    
870
    with pooled_rapi_client(vm) as client:
871
        # Delete previous firewall tags
872
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
873
        delete_tags = [(t % index) for t in _firewall_tags.values()
874
                       if (t % index) in old_tags]
875
        if delete_tags:
876
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
877
                                      dry_run=settings.TEST)
878

    
879
        if profile != "DISABLED":
880
            client.AddInstanceTags(vm.backend_vm_id, [tag],
881
                                   dry_run=settings.TEST)
882

    
883
        # XXX NOP ModifyInstance call to force process_net_status to run
884
        # on the dispatcher
885
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
886
        client.ModifyInstance(vm.backend_vm_id,
887
                              os_name=os_name)
888
    return None
889

    
890

    
891
def get_instances(backend, bulk=True):
892
    with pooled_rapi_client(backend) as c:
893
        return c.GetInstances(bulk=bulk)
894

    
895

    
896
def get_nodes(backend, bulk=True):
897
    with pooled_rapi_client(backend) as c:
898
        return c.GetNodes(bulk=bulk)
899

    
900

    
901
def get_jobs(backend, bulk=True):
902
    with pooled_rapi_client(backend) as c:
903
        return c.GetJobs(bulk=bulk)
904

    
905

    
906
def get_physical_resources(backend):
907
    """ Get the physical resources of a backend.
908

909
    Get the resources of a backend as reported by the backend (not the db).
910

911
    """
912
    nodes = get_nodes(backend, bulk=True)
913
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
914
    res = {}
915
    for a in attr:
916
        res[a] = 0
917
    for n in nodes:
918
        # Filter out drained, offline and not vm_capable nodes since they will
919
        # not take part in the vm allocation process
920
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
921
        if can_host_vms and n['cnodes']:
922
            for a in attr:
923
                res[a] += int(n[a])
924
    return res
925

    
926

    
927
def update_backend_resources(backend, resources=None):
928
    """ Update the state of the backend resources in db.
929

930
    """
931

    
932
    if not resources:
933
        resources = get_physical_resources(backend)
934

    
935
    backend.mfree = resources['mfree']
936
    backend.mtotal = resources['mtotal']
937
    backend.dfree = resources['dfree']
938
    backend.dtotal = resources['dtotal']
939
    backend.pinst_cnt = resources['pinst_cnt']
940
    backend.ctotal = resources['ctotal']
941
    backend.updated = datetime.now()
942
    backend.save()
943

    
944

    
945
def get_memory_from_instances(backend):
946
    """ Get the memory that is used from instances.
947

948
    Get the used memory of a backend. Note: This is different for
949
    the real memory used, due to kvm's memory de-duplication.
950

951
    """
952
    with pooled_rapi_client(backend) as client:
953
        instances = client.GetInstances(bulk=True)
954
    mem = 0
955
    for i in instances:
956
        mem += i['oper_ram']
957
    return mem
958

    
959

    
960
def get_available_disk_templates(backend):
961
    """Get the list of available disk templates of a Ganeti backend.
962

963
    The list contains the disk templates that are enabled in the Ganeti backend
964
    and also included in ipolicy-disk-templates.
965

966
    """
967
    with pooled_rapi_client(backend) as c:
968
        info = c.GetInfo()
969
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
970
    try:
971
        enabled_disk_templates = info["enabled_disk_templates"]
972
        return [dp for dp in enabled_disk_templates
973
                if dp in ipolicy_disk_templates]
974
    except KeyError:
975
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
976
        return ipolicy_disk_templates
977

    
978

    
979
def update_backend_disk_templates(backend):
980
    disk_templates = get_available_disk_templates(backend)
981
    backend.disk_templates = disk_templates
982
    backend.save()
983

    
984

    
985
##
986
## Synchronized operations for reconciliation
987
##
988

    
989

    
990
def create_network_synced(network, backend):
991
    result = _create_network_synced(network, backend)
992
    if result[0] != 'success':
993
        return result
994
    result = connect_network_synced(network, backend)
995
    return result
996

    
997

    
998
def _create_network_synced(network, backend):
999
    with pooled_rapi_client(backend) as client:
1000
        job = _create_network(network, backend)
1001
        result = wait_for_job(client, job)
1002
    return result
1003

    
1004

    
1005
def connect_network_synced(network, backend):
1006
    with pooled_rapi_client(backend) as client:
1007
        for group in client.GetGroups():
1008
            job = client.ConnectNetwork(network.backend_id, group,
1009
                                        network.mode, network.link)
1010
            result = wait_for_job(client, job)
1011
            if result[0] != 'success':
1012
                return result
1013

    
1014
    return result
1015

    
1016

    
1017
def wait_for_job(client, jobid):
1018
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1019
    status = result['job_info'][0]
1020
    while status not in ['success', 'error', 'cancel']:
1021
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1022
                                         [result], None)
1023
        status = result['job_info'][0]
1024

    
1025
    if status == 'success':
1026
        return (status, None)
1027
    else:
1028
        error = result['job_info'][1]
1029
        return (status, error)