Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ a96e84cf

History | View | Annotate | Download (38.3 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor, IPAddress)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = timedelta(seconds=180)
61

    
62
NIC_FIELDS = ["state", "mac", "ipv4_address", "ipv6_address", "network",
63
              "firewall_profile", "index"]
64
UNKNOWN_NIC_PREFIX = "unknown-"
65

    
66

    
67
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
68
    """Handle quotas for updated VirtualMachine.
69

70
    Update quotas for the updated VirtualMachine based on the job that run on
71
    the Ganeti backend. If a commission has been already issued for this job,
72
    then this commission is just accepted or rejected based on the job status.
73
    Otherwise, a new commission for the given change is issued, that is also in
74
    force and auto-accept mode. In this case, previous commissions are
75
    rejected, since they reflect a previous state of the VM.
76

77
    """
78
    if job_status not in ["success", "error", "canceled"]:
79
        return vm
80

    
81
    # Check successful completion of a job will trigger any quotable change in
82
    # the VM state.
83
    action = utils.get_action_from_opcode(job_opcode, job_fields)
84
    if action == "BUILD":
85
        # Quotas for new VMs are automatically accepted by the API
86
        return vm
87
    commission_info = quotas.get_commission_info(vm, action=action,
88
                                                 action_fields=job_fields)
89

    
90
    if vm.task_job_id == job_id and vm.serial is not None:
91
        # Commission for this change has already been issued. So just
92
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
93
        # if fails, must be accepted, as the user must manually remove the
94
        # failed server
95
        serial = vm.serial
96
        if job_status == "success":
97
            quotas.accept_serial(serial)
98
        elif job_status in ["error", "canceled"]:
99
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
100
                      serial)
101
            quotas.reject_serial(serial)
102
        vm.serial = None
103
    elif job_status == "success" and commission_info is not None:
104
        log.debug("Expected job was %s. Processing job %s. Commission for"
105
                  " this job: %s", vm.task_job_id, job_id, commission_info)
106
        # Commission for this change has not been issued, or the issued
107
        # commission was unaware of the current change. Reject all previous
108
        # commissions and create a new one in forced mode!
109
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
110
                           % (vm, job_id))
111
        quotas.handle_resource_commission(vm, action,
112
                                          commission_info=commission_info,
113
                                          commission_name=commission_name,
114
                                          force=True,
115
                                          auto_accept=True)
116
        log.debug("Issued new commission: %s", vm.serial)
117

    
118
    return vm
119

    
120

    
121
@transaction.commit_on_success
122
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
123
                      job_fields=None):
124
    """Process a job progress notification from the backend
125

126
    Process an incoming message from the backend (currently Ganeti).
127
    Job notifications with a terminating status (sucess, error, or canceled),
128
    also update the operating state of the VM.
129

130
    """
131
    # See #1492, #1031, #1111 why this line has been removed
132
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
133
    if status not in [x[0] for x in BACKEND_STATUSES]:
134
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
135

    
136
    vm.backendjobid = jobid
137
    vm.backendjobstatus = status
138
    vm.backendopcode = opcode
139
    vm.backendlogmsg = logmsg
140

    
141
    if status in ["queued", "waiting", "running"]:
142
        vm.save()
143
        return
144

    
145
    if job_fields is None:
146
        job_fields = {}
147
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
148

    
149
    # Notifications of success change the operating state
150
    if status == "success":
151
        if state_for_success is not None:
152
            vm.operstate = state_for_success
153
        beparams = job_fields.get("beparams", None)
154
        if beparams:
155
            # Change the flavor of the VM
156
            _process_resize(vm, beparams)
157
        # Update backendtime only for jobs that have been successfully
158
        # completed, since only these jobs update the state of the VM. Else a
159
        # "race condition" may occur when a successful job (e.g.
160
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
161
        # in reversed order.
162
        vm.backendtime = etime
163

    
164
    if status in ["success", "error", "canceled"] and nics is not None:
165
        # Update the NICs of the VM
166
        _process_net_status(vm, etime, nics)
167

    
168
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
169
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
170
        vm.operstate = 'ERROR'
171
        vm.backendtime = etime
172
    elif opcode == 'OP_INSTANCE_REMOVE':
173
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
174
        # when no instance exists at the Ganeti backend.
175
        # See ticket #799 for all the details.
176
        if status == 'success' or (status == 'error' and
177
                                   not vm_exists_in_backend(vm)):
178
            # VM has been deleted
179
            for nic in vm.nics.all():
180
                # Release the IP
181
                release_nic_address(nic)
182
                # And delete the NIC.
183
                nic.delete()
184
            vm.deleted = True
185
            vm.operstate = state_for_success
186
            vm.backendtime = etime
187
            status = "success"
188

    
189
    if status in ["success", "error", "canceled"]:
190
        # Job is finalized: Handle quotas/commissioning
191
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
192
                              job_status=status, job_fields=job_fields)
193
        # and clear task fields
194
        if vm.task_job_id == jobid:
195
            vm.task = None
196
            vm.task_job_id = None
197

    
198
    vm.save()
199

    
200

    
201
def _process_resize(vm, beparams):
202
    """Change flavor of a VirtualMachine based on new beparams."""
203
    old_flavor = vm.flavor
204
    vcpus = beparams.get("vcpus", old_flavor.cpu)
205
    ram = beparams.get("maxmem", old_flavor.ram)
206
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
207
        return
208
    try:
209
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
210
                                        disk=old_flavor.disk,
211
                                        disk_template=old_flavor.disk_template)
212
    except Flavor.DoesNotExist:
213
        raise Exception("Can not find flavor for VM")
214
    vm.flavor = new_flavor
215
    vm.save()
216

    
217

    
218
@transaction.commit_on_success
219
def process_net_status(vm, etime, nics):
220
    """Wrap _process_net_status inside transaction."""
221
    _process_net_status(vm, etime, nics)
222

    
223

    
224
def _process_net_status(vm, etime, nics):
225
    """Process a net status notification from the backend
226

227
    Process an incoming message from the Ganeti backend,
228
    detailing the NIC configuration of a VM instance.
229

230
    Update the state of the VM in the DB accordingly.
231

232
    """
233
    ganeti_nics = process_ganeti_nics(nics)
234
    db_nics = dict([(nic.id, nic) for nic in vm.nics.all()])
235

    
236
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
237
    # guarantee that no deadlock will occur with Backend allocator.
238
    Backend.objects.select_for_update().get(id=vm.backend_id)
239

    
240
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
241
        db_nic = db_nics.get(nic_name)
242
        ganeti_nic = ganeti_nics.get(nic_name)
243
        if ganeti_nic is None:
244
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
245
            # state for more than 5 minutes, then we remove the NIC.
246
            # TODO: This is dangerous as the job may be stack in the queue, and
247
            # releasing the IP may lead to duplicate IP use.
248
            if db_nic.state != "BUILDING" or\
249
                (db_nic.state == "BUILDING" and
250
                 etime > db_nic.created + BUILDING_NIC_TIMEOUT):
251
                release_nic_address(db_nic)
252
                db_nic.delete()
253
            else:
254
                log.warning("Ignoring recent building NIC: %s", db_nic)
255
        elif db_nic is None:
256
            # NIC exists in Ganeti but not in DB
257
            if str(nic_name).startswith(UNKNOWN_NIC_PREFIX):
258
                msg = "Can not process NIC! NIC '%s' does not have a"\
259
                      " valid name." % ganeti_nic
260
                log.error(msg)
261
                continue
262
            ipaddress = None
263
            network = ganeti_nic["network"]
264
            ipv4_address = ganeti_nic["ipv4_address"]
265
            if ipv4_address:
266
                network.reserve_address(ipv4_address)
267
                subnet = network.subnets.get(ipversion=4)
268
                ipaddress = IPAddress.objects.create(address=ipv4_address,
269
                                                     network=network,
270
                                                     subnet=subnet,
271
                                                     userid=vm.userid)
272
            # TODO
273
            ganeti_nic.pop("ipv4_address")
274
            ganeti_nic.pop("ip", None)
275
            ganeti_nic.pop("ipv6_address")
276
            nic = vm.nics.create(id=nic_name, **ganeti_nic)
277
            if ipaddress is not None:
278
                ipaddress.nic = nic
279
                ipaddress.save()
280
        elif not nics_are_equal(db_nic, ganeti_nic):
281
            # Special case where the IPv4 address has changed, because you
282
            # need to release the old IPv4 address and reserve the new one
283
            ipv4_address = ganeti_nic["ipv4_address"]
284
            if db_nic.ipv4_address != ipv4_address:
285
                release_nic_address(db_nic)
286
                if ipv4_address:
287
                    network = ganeti_nic["network"]
288
                    network.reserve_address(ipv4_address)
289
                    subnet = network.subnets.get(ipversion=4)
290
                    ipaddress, _ =\
291
                        IPAddress.objects.get_or_create(network=network,
292
                                                        subnet=subnet,
293
                                                        userid=vm.userid,
294
                                                        address=ipv4_address)
295
                    ipaddress.nic = nic
296
                    ipaddress.save()
297

    
298
            for f in ["state", "mac", "network", "firewall_profile", "index"]:
299
                # Update the NIC in DB with the values from Ganeti NIC
300
                setattr(db_nic, f, ganeti_nic[f])
301
                db_nic.save()
302

    
303
            # Dummy update the network, to work with 'changed-since'
304
            db_nic.network.save()
305

    
306
    vm.backendtime = etime
307
    vm.save()
308

    
309

    
310
def nics_are_equal(db_nic, gnt_nic):
311
    for field in NIC_FIELDS:
312
        if getattr(db_nic, field) != gnt_nic[field]:
313
            return False
314
    return True
315

    
316

    
317
def process_ganeti_nics(ganeti_nics):
318
    """Process NIC dict from ganeti"""
319
    new_nics = []
320
    for index, gnic in enumerate(ganeti_nics):
321
        nic_name = gnic.get("name", None)
322
        if nic_name is not None:
323
            nic_id = utils.id_from_nic_name(nic_name)
324
        else:
325
            # Put as default value the index. If it is an unknown NIC to
326
            # synnefo it will be created automaticaly.
327
            nic_id = UNKNOWN_NIC_PREFIX + str(index)
328
        network_name = gnic.get('network', '')
329
        network_id = utils.id_from_network_name(network_name)
330
        network = Network.objects.get(id=network_id)
331

    
332
        # Get the new nic info
333
        mac = gnic.get('mac')
334
        ipv4 = gnic.get('ip')
335
        subnet6 = network.subnet6
336
        ipv6 = mac2eui64(mac, subnet6) if subnet6 else None
337

    
338
        firewall = gnic.get('firewall')
339
        firewall_profile = _reverse_tags.get(firewall)
340
        if not firewall_profile and network.public:
341
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
342

    
343
        nic_info = {
344
            'index': index,
345
            'network': network,
346
            'mac': mac,
347
            'ipv4_address': ipv4,
348
            'ipv6_address': ipv6,
349
            'firewall_profile': firewall_profile,
350
            'state': 'ACTIVE'}
351

    
352
        new_nics.append((nic_id, nic_info))
353
    return dict(new_nics)
354

    
355

    
356
def release_nic_address(nic):
357
    """Release the IPv4 address of a NIC.
358

359
    Check if an instance's NIC has an IPv4 address and release it if it is not
360
    a Floating IP. If it is as Floating IP, then disassociate the FloatingIP
361
    from the machine.
362

363
    """
364

    
365
    for ip in nic.ips.all():
366
        if ip.subnet.ipversion == 4:
367
            if ip.floating_ip:
368
                ip.nic = None
369
                ip.save()
370
            else:
371
                ip.network.release_address(ip.address)
372
                ip.delete()
373
        else:
374
            ip.delete()
375

    
376

    
377
@transaction.commit_on_success
378
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
379
    if status not in [x[0] for x in BACKEND_STATUSES]:
380
        raise Network.InvalidBackendMsgError(opcode, status)
381

    
382
    back_network.backendjobid = jobid
383
    back_network.backendjobstatus = status
384
    back_network.backendopcode = opcode
385
    back_network.backendlogmsg = logmsg
386

    
387
    network = back_network.network
388

    
389
    # Notifications of success change the operating state
390
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
391
    if status == 'success' and state_for_success is not None:
392
        back_network.operstate = state_for_success
393

    
394
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
395
        back_network.operstate = 'ERROR'
396
        back_network.backendtime = etime
397

    
398
    if opcode == 'OP_NETWORK_REMOVE':
399
        network_is_deleted = (status == "success")
400
        if network_is_deleted or (status == "error" and not
401
                                  network_exists_in_backend(back_network)):
402
            back_network.operstate = state_for_success
403
            back_network.deleted = True
404
            back_network.backendtime = etime
405

    
406
    if status == 'success':
407
        back_network.backendtime = etime
408
    back_network.save()
409
    # Also you must update the state of the Network!!
410
    update_network_state(network)
411

    
412

    
413
def update_network_state(network):
414
    """Update the state of a Network based on BackendNetwork states.
415

416
    Update the state of a Network based on the operstate of the networks in the
417
    backends that network exists.
418

419
    The state of the network is:
420
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
421
    * DELETED: If it is is 'DELETED' in all backends that have been created.
422

423
    This function also releases the resources (MAC prefix or Bridge) and the
424
    quotas for the network.
425

426
    """
427
    if network.deleted:
428
        # Network has already been deleted. Just assert that state is also
429
        # DELETED
430
        if not network.state == "DELETED":
431
            network.state = "DELETED"
432
            network.save()
433
        return
434

    
435
    backend_states = [s.operstate for s in network.backend_networks.all()]
436
    if not backend_states and network.action != "DESTROY":
437
        if network.state != "ACTIVE":
438
            network.state = "ACTIVE"
439
            network.save()
440
            return
441

    
442
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
443
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
444
                     "DELETED")
445

    
446
    # Release the resources on the deletion of the Network
447
    if deleted:
448
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
449
                 network.id, network.mac_prefix, network.link)
450
        network.deleted = True
451
        network.state = "DELETED"
452
        if network.mac_prefix:
453
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
454
                release_resource(res_type="mac_prefix",
455
                                 value=network.mac_prefix)
456
        if network.link:
457
            if network.FLAVORS[network.flavor]["link"] == "pool":
458
                release_resource(res_type="bridge", value=network.link)
459

    
460
        # Issue commission
461
        if network.userid:
462
            quotas.issue_and_accept_commission(network, delete=True)
463
            # the above has already saved the object and committed;
464
            # a second save would override others' changes, since the
465
            # object is now unlocked
466
            return
467
        elif not network.public:
468
            log.warning("Network %s does not have an owner!", network.id)
469

    
470
        # TODO!!!!!
471
        # Set all subnets as deleted
472
        network.subnets.update(deleted=True)
473
        # And delete the IP pools
474
        network.subnets.ip_pools.all().delete()
475
    network.save()
476

    
477

    
478
@transaction.commit_on_success
479
def process_network_modify(back_network, etime, jobid, opcode, status,
480
                           job_fields):
481
    assert (opcode == "OP_NETWORK_SET_PARAMS")
482
    if status not in [x[0] for x in BACKEND_STATUSES]:
483
        raise Network.InvalidBackendMsgError(opcode, status)
484

    
485
    back_network.backendjobid = jobid
486
    back_network.backendjobstatus = status
487
    back_network.opcode = opcode
488

    
489
    add_reserved_ips = job_fields.get("add_reserved_ips")
490
    if add_reserved_ips:
491
        net = back_network.network
492
        pool = net.get_pool()
493
        if add_reserved_ips:
494
            for ip in add_reserved_ips:
495
                pool.reserve(ip, external=True)
496
        pool.save()
497

    
498
    if status == 'success':
499
        back_network.backendtime = etime
500
    back_network.save()
501

    
502

    
503
@transaction.commit_on_success
504
def process_create_progress(vm, etime, progress):
505

    
506
    percentage = int(progress)
507

    
508
    # The percentage may exceed 100%, due to the way
509
    # snf-image:copy-progress tracks bytes read by image handling processes
510
    percentage = 100 if percentage > 100 else percentage
511
    if percentage < 0:
512
        raise ValueError("Percentage cannot be negative")
513

    
514
    # FIXME: log a warning here, see #1033
515
#   if last_update > percentage:
516
#       raise ValueError("Build percentage should increase monotonically " \
517
#                        "(old = %d, new = %d)" % (last_update, percentage))
518

    
519
    # This assumes that no message of type 'ganeti-create-progress' is going to
520
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
521
    # the instance is STARTED.  What if the two messages are processed by two
522
    # separate dispatcher threads, and the 'ganeti-op-status' message for
523
    # successful creation gets processed before the 'ganeti-create-progress'
524
    # message? [vkoukis]
525
    #
526
    #if not vm.operstate == 'BUILD':
527
    #    raise VirtualMachine.IllegalState("VM is not in building state")
528

    
529
    vm.buildpercentage = percentage
530
    vm.backendtime = etime
531
    vm.save()
532

    
533

    
534
@transaction.commit_on_success
535
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
536
                               details=None):
537
    """
538
    Create virtual machine instance diagnostic entry.
539

540
    :param vm: VirtualMachine instance to create diagnostic for.
541
    :param message: Diagnostic message.
542
    :param source: Diagnostic source identifier (e.g. image-helper).
543
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
544
    :param etime: The time the message occured (if available).
545
    :param details: Additional details or debug information.
546
    """
547
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
548
                                                   source_date=etime,
549
                                                   message=message,
550
                                                   details=details)
551

    
552

    
553
def create_instance(vm, nics, flavor, image):
554
    """`image` is a dictionary which should contain the keys:
555
            'backend_id', 'format' and 'metadata'
556

557
        metadata value should be a dictionary.
558
    """
559

    
560
    # Handle arguments to CreateInstance() as a dictionary,
561
    # initialize it based on a deployment-specific value.
562
    # This enables the administrator to override deployment-specific
563
    # arguments, such as the disk template to use, name of os provider
564
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
565
    #
566
    kw = vm.backend.get_create_params()
567
    kw['mode'] = 'create'
568
    kw['name'] = vm.backend_vm_id
569
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
570

    
571
    kw['disk_template'] = flavor.disk_template
572
    kw['disks'] = [{"size": flavor.disk * 1024}]
573
    provider = flavor.disk_provider
574
    if provider:
575
        kw['disks'][0]['provider'] = provider
576
        kw['disks'][0]['origin'] = flavor.disk_origin
577

    
578
    kw['nics'] = [{"name": nic.backend_uuid,
579
                   "network": nic.network.backend_id,
580
                   "ip": nic.ipv4_address}
581
                  for nic in nics]
582
    backend = vm.backend
583
    depend_jobs = []
584
    for nic in nics:
585
        network = Network.objects.select_for_update().get(id=nic.network_id)
586
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
587
                                                             network=network)
588
        if bnet.operstate != "ACTIVE":
589
            if network.public:
590
                msg = "Can not connect instance to network %s. Network is not"\
591
                      " ACTIVE in backend %s." % (network, backend)
592
                raise Exception(msg)
593
            else:
594
                jobs = create_network(network, backend, connect=True)
595
                if isinstance(jobs, list):
596
                    depend_jobs.extend(jobs)
597
                else:
598
                    depend_jobs.append(jobs)
599
    kw["depends"] = [[job, ["success", "error", "canceled"]]
600
                     for job in depend_jobs]
601

    
602
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
603
    # kw['os'] = settings.GANETI_OS_PROVIDER
604
    kw['ip_check'] = False
605
    kw['name_check'] = False
606

    
607
    # Do not specific a node explicitly, have
608
    # Ganeti use an iallocator instead
609
    #kw['pnode'] = rapi.GetNodes()[0]
610

    
611
    kw['dry_run'] = settings.TEST
612

    
613
    kw['beparams'] = {
614
        'auto_balance': True,
615
        'vcpus': flavor.cpu,
616
        'memory': flavor.ram}
617

    
618
    kw['osparams'] = {
619
        'config_url': vm.config_url,
620
        # Store image id and format to Ganeti
621
        'img_id': image['backend_id'],
622
        'img_format': image['format']}
623

    
624
    # Use opportunistic locking
625
    kw['opportunistic_locking'] = True
626

    
627
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
628
    # kw['hvparams'] = dict(serial_console=False)
629

    
630
    log.debug("Creating instance %s", utils.hide_pass(kw))
631
    with pooled_rapi_client(vm) as client:
632
        return client.CreateInstance(**kw)
633

    
634

    
635
def delete_instance(vm):
636
    with pooled_rapi_client(vm) as client:
637
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
638

    
639

    
640
def reboot_instance(vm, reboot_type):
641
    assert reboot_type in ('soft', 'hard')
642
    kwargs = {"instance": vm.backend_vm_id,
643
              "reboot_type": "hard"}
644
    # XXX: Currently shutdown_timeout parameter is not supported from the
645
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
646
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
647
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
648
    # of OS API is different from the one in Ganeti, and maps to
649
    # 'shutdown_timeout'.
650
    #if reboot_type == "hard":
651
    #    kwargs["shutdown_timeout"] = 0
652
    if settings.TEST:
653
        kwargs["dry_run"] = True
654
    with pooled_rapi_client(vm) as client:
655
        return client.RebootInstance(**kwargs)
656

    
657

    
658
def startup_instance(vm):
659
    with pooled_rapi_client(vm) as client:
660
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
661

    
662

    
663
def shutdown_instance(vm):
664
    with pooled_rapi_client(vm) as client:
665
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
666

    
667

    
668
def resize_instance(vm, vcpus, memory):
669
    beparams = {"vcpus": int(vcpus),
670
                "minmem": int(memory),
671
                "maxmem": int(memory)}
672
    with pooled_rapi_client(vm) as client:
673
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
674

    
675

    
676
def get_instance_console(vm):
677
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
678
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
679
    # useless (see #783).
680
    #
681
    # Until this is fixed on the Ganeti side, construct a console info reply
682
    # directly.
683
    #
684
    # WARNING: This assumes that VNC runs on port network_port on
685
    #          the instance's primary node, and is probably
686
    #          hypervisor-specific.
687
    #
688
    log.debug("Getting console for vm %s", vm)
689

    
690
    console = {}
691
    console['kind'] = 'vnc'
692

    
693
    with pooled_rapi_client(vm) as client:
694
        i = client.GetInstance(vm.backend_vm_id)
695

    
696
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
697
        raise Exception("hv parameter serial_console cannot be true")
698
    console['host'] = i['pnode']
699
    console['port'] = i['network_port']
700

    
701
    return console
702

    
703

    
704
def get_instance_info(vm):
705
    with pooled_rapi_client(vm) as client:
706
        return client.GetInstance(vm.backend_vm_id)
707

    
708

    
709
def vm_exists_in_backend(vm):
710
    try:
711
        get_instance_info(vm)
712
        return True
713
    except GanetiApiError as e:
714
        if e.code == 404:
715
            return False
716
        raise e
717

    
718

    
719
def get_network_info(backend_network):
720
    with pooled_rapi_client(backend_network) as client:
721
        return client.GetNetwork(backend_network.network.backend_id)
722

    
723

    
724
def network_exists_in_backend(backend_network):
725
    try:
726
        get_network_info(backend_network)
727
        return True
728
    except GanetiApiError as e:
729
        if e.code == 404:
730
            return False
731

    
732

    
733
def create_network(network, backend, connect=True):
734
    """Create a network in a Ganeti backend"""
735
    log.debug("Creating network %s in backend %s", network, backend)
736

    
737
    job_id = _create_network(network, backend)
738

    
739
    if connect:
740
        job_ids = connect_network(network, backend, depends=[job_id])
741
        return job_ids
742
    else:
743
        return [job_id]
744

    
745

    
746
def _create_network(network, backend):
747
    """Create a network."""
748

    
749
    tags = network.backend_tag
750
    subnet = None
751
    subnet6 = None
752
    gateway = None
753
    gateway6 = None
754
    for dbsubnet in network.subnets.all():
755
        if dbsubnet.ipversion == 4:
756
            if dbsubnet.dhcp:
757
                tags.append('nfdhcpd')
758
                subnet = dbsubnet.cidr
759
                gateway = dbsubnet.gateway
760
        elif dbsubnet.ipversion == 6:
761
                subnet6 = dbsubnet.cidr
762
                gateway6 = dbsubnet.gateway
763

    
764
    if network.public:
765
        conflicts_check = True
766
        tags.append('public')
767
    else:
768
        conflicts_check = False
769
        tags.append('private')
770

    
771
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
772
    # not support IPv6 only networks. To bypass this limitation, we create the
773
    # network with a dummy network subnet, and make Cyclades connect instances
774
    # to such networks, with address=None.
775
    if subnet is None:
776
        subnet = "10.0.0.0/24"
777

    
778
    try:
779
        bn = BackendNetwork.objects.get(network=network, backend=backend)
780
        mac_prefix = bn.mac_prefix
781
    except BackendNetwork.DoesNotExist:
782
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
783
                        " does not exist" % (network.id, backend.id))
784

    
785
    with pooled_rapi_client(backend) as client:
786
        return client.CreateNetwork(network_name=network.backend_id,
787
                                    network=subnet,
788
                                    network6=subnet6,
789
                                    gateway=gateway,
790
                                    gateway6=gateway6,
791
                                    mac_prefix=mac_prefix,
792
                                    conflicts_check=conflicts_check,
793
                                    tags=tags)
794

    
795

    
796
def connect_network(network, backend, depends=[], group=None):
797
    """Connect a network to nodegroups."""
798
    log.debug("Connecting network %s to backend %s", network, backend)
799

    
800
    if network.public:
801
        conflicts_check = True
802
    else:
803
        conflicts_check = False
804

    
805
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
806
    with pooled_rapi_client(backend) as client:
807
        groups = [group] if group is not None else client.GetGroups()
808
        job_ids = []
809
        for group in groups:
810
            job_id = client.ConnectNetwork(network.backend_id, group,
811
                                           network.mode, network.link,
812
                                           conflicts_check,
813
                                           depends=depends)
814
            job_ids.append(job_id)
815
    return job_ids
816

    
817

    
818
def delete_network(network, backend, disconnect=True):
819
    log.debug("Deleting network %s from backend %s", network, backend)
820

    
821
    depends = []
822
    if disconnect:
823
        depends = disconnect_network(network, backend)
824
    _delete_network(network, backend, depends=depends)
825

    
826

    
827
def _delete_network(network, backend, depends=[]):
828
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
829
    with pooled_rapi_client(backend) as client:
830
        return client.DeleteNetwork(network.backend_id, depends)
831

    
832

    
833
def disconnect_network(network, backend, group=None):
834
    log.debug("Disconnecting network %s to backend %s", network, backend)
835

    
836
    with pooled_rapi_client(backend) as client:
837
        groups = [group] if group is not None else client.GetGroups()
838
        job_ids = []
839
        for group in groups:
840
            job_id = client.DisconnectNetwork(network.backend_id, group)
841
            job_ids.append(job_id)
842
    return job_ids
843

    
844

    
845
def connect_to_network(vm, nic):
846
    network = nic.network
847
    backend = vm.backend
848
    network = Network.objects.select_for_update().get(id=network.id)
849
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
850
                                                         network=network)
851
    depend_jobs = []
852
    if bnet.operstate != "ACTIVE":
853
        depend_jobs = create_network(network, backend, connect=True)
854

    
855
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
856

    
857
    nic = {'name': nic.backend_uuid,
858
           'network': network.backend_id,
859
           'ip': nic.ipv4_address}
860

    
861
    log.debug("Adding NIC %s to VM %s", nic, vm)
862

    
863
    kwargs = {
864
        "instance": vm.backend_vm_id,
865
        "nics": [("add", "-1", nic)],
866
        "depends": depends,
867
    }
868
    if vm.backend.use_hotplug():
869
        kwargs["hotplug"] = True
870
    if settings.TEST:
871
        kwargs["dry_run"] = True
872

    
873
    with pooled_rapi_client(vm) as client:
874
        return client.ModifyInstance(**kwargs)
875

    
876

    
877
def disconnect_from_network(vm, nic):
878
    log.debug("Removing NIC %s of VM %s", nic, vm)
879

    
880
    kwargs = {
881
        "instance": vm.backend_vm_id,
882
        "nics": [("remove", nic.backend_uuid, {})],
883
    }
884
    if vm.backend.use_hotplug():
885
        kwargs["hotplug"] = True
886
    if settings.TEST:
887
        kwargs["dry_run"] = True
888

    
889
    with pooled_rapi_client(vm) as client:
890
        jobID = client.ModifyInstance(**kwargs)
891
        firewall_profile = nic.firewall_profile
892
        if firewall_profile and firewall_profile != "DISABLED":
893
            tag = _firewall_tags[firewall_profile] % nic.backend_uuid
894
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
895
                                      dry_run=settings.TEST)
896

    
897
        return jobID
898

    
899

    
900
def set_firewall_profile(vm, profile, nic):
901
    uuid = nic.backend_uuid
902
    try:
903
        tag = _firewall_tags[profile] % uuid
904
    except KeyError:
905
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
906

    
907
    log.debug("Setting tag of VM %s, NIC %s, to %s", vm, nic, profile)
908

    
909
    with pooled_rapi_client(vm) as client:
910
        # Delete previous firewall tags
911
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
912
        delete_tags = [(t % uuid) for t in _firewall_tags.values()
913
                       if (t % uuid) in old_tags]
914
        if delete_tags:
915
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
916
                                      dry_run=settings.TEST)
917

    
918
        if profile != "DISABLED":
919
            client.AddInstanceTags(vm.backend_vm_id, [tag],
920
                                   dry_run=settings.TEST)
921

    
922
        # XXX NOP ModifyInstance call to force process_net_status to run
923
        # on the dispatcher
924
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
925
        client.ModifyInstance(vm.backend_vm_id,
926
                              os_name=os_name)
927
    return None
928

    
929

    
930
def get_instances(backend, bulk=True):
931
    with pooled_rapi_client(backend) as c:
932
        return c.GetInstances(bulk=bulk)
933

    
934

    
935
def get_nodes(backend, bulk=True):
936
    with pooled_rapi_client(backend) as c:
937
        return c.GetNodes(bulk=bulk)
938

    
939

    
940
def get_jobs(backend, bulk=True):
941
    with pooled_rapi_client(backend) as c:
942
        return c.GetJobs(bulk=bulk)
943

    
944

    
945
def get_physical_resources(backend):
946
    """ Get the physical resources of a backend.
947

948
    Get the resources of a backend as reported by the backend (not the db).
949

950
    """
951
    nodes = get_nodes(backend, bulk=True)
952
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
953
    res = {}
954
    for a in attr:
955
        res[a] = 0
956
    for n in nodes:
957
        # Filter out drained, offline and not vm_capable nodes since they will
958
        # not take part in the vm allocation process
959
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
960
        if can_host_vms and n['cnodes']:
961
            for a in attr:
962
                res[a] += int(n[a] or 0)
963
    return res
964

    
965

    
966
def update_backend_resources(backend, resources=None):
967
    """ Update the state of the backend resources in db.
968

969
    """
970

    
971
    if not resources:
972
        resources = get_physical_resources(backend)
973

    
974
    backend.mfree = resources['mfree']
975
    backend.mtotal = resources['mtotal']
976
    backend.dfree = resources['dfree']
977
    backend.dtotal = resources['dtotal']
978
    backend.pinst_cnt = resources['pinst_cnt']
979
    backend.ctotal = resources['ctotal']
980
    backend.updated = datetime.now()
981
    backend.save()
982

    
983

    
984
def get_memory_from_instances(backend):
985
    """ Get the memory that is used from instances.
986

987
    Get the used memory of a backend. Note: This is different for
988
    the real memory used, due to kvm's memory de-duplication.
989

990
    """
991
    with pooled_rapi_client(backend) as client:
992
        instances = client.GetInstances(bulk=True)
993
    mem = 0
994
    for i in instances:
995
        mem += i['oper_ram']
996
    return mem
997

    
998

    
999
def get_available_disk_templates(backend):
1000
    """Get the list of available disk templates of a Ganeti backend.
1001

1002
    The list contains the disk templates that are enabled in the Ganeti backend
1003
    and also included in ipolicy-disk-templates.
1004

1005
    """
1006
    with pooled_rapi_client(backend) as c:
1007
        info = c.GetInfo()
1008
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
1009
    try:
1010
        enabled_disk_templates = info["enabled_disk_templates"]
1011
        return [dp for dp in enabled_disk_templates
1012
                if dp in ipolicy_disk_templates]
1013
    except KeyError:
1014
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
1015
        return ipolicy_disk_templates
1016

    
1017

    
1018
def update_backend_disk_templates(backend):
1019
    disk_templates = get_available_disk_templates(backend)
1020
    backend.disk_templates = disk_templates
1021
    backend.save()
1022

    
1023

    
1024
##
1025
## Synchronized operations for reconciliation
1026
##
1027

    
1028

    
1029
def create_network_synced(network, backend):
1030
    result = _create_network_synced(network, backend)
1031
    if result[0] != 'success':
1032
        return result
1033
    result = connect_network_synced(network, backend)
1034
    return result
1035

    
1036

    
1037
def _create_network_synced(network, backend):
1038
    with pooled_rapi_client(backend) as client:
1039
        job = _create_network(network, backend)
1040
        result = wait_for_job(client, job)
1041
    return result
1042

    
1043

    
1044
def connect_network_synced(network, backend):
1045
    with pooled_rapi_client(backend) as client:
1046
        for group in client.GetGroups():
1047
            job = client.ConnectNetwork(network.backend_id, group,
1048
                                        network.mode, network.link)
1049
            result = wait_for_job(client, job)
1050
            if result[0] != 'success':
1051
                return result
1052

    
1053
    return result
1054

    
1055

    
1056
def wait_for_job(client, jobid):
1057
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1058
    status = result['job_info'][0]
1059
    while status not in ['success', 'error', 'cancel']:
1060
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1061
                                         [result], None)
1062
        status = result['job_info'][0]
1063

    
1064
    if status == 'success':
1065
        return (status, None)
1066
    else:
1067
        error = result['job_info'][1]
1068
        return (status, error)