Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ d7ff7f5a

History | View | Annotate | Download (36.2 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime, timedelta
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic,
40
                               Flavor)
41
from synnefo.logic import utils
42
from synnefo import quotas
43
from synnefo.api.util import release_resource
44
from synnefo.util.mac2eui64 import mac2eui64
45
from synnefo.logic.rapi import GanetiApiError
46

    
47
from logging import getLogger
48
log = getLogger(__name__)
49

    
50

    
51
_firewall_tags = {
52
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
53
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
54
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
55

    
56
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
57

    
58
# Timeout in seconds for building NICs. After this period the NICs considered
59
# stale and removed from DB.
60
BUILDING_NIC_TIMEOUT = 180
61

    
62
NIC_FIELDS = ["state", "mac", "ipv4", "ipv6", "network", "firewall_profile"]
63

    
64

    
65
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
66
    """Handle quotas for updated VirtualMachine.
67

68
    Update quotas for the updated VirtualMachine based on the job that run on
69
    the Ganeti backend. If a commission has been already issued for this job,
70
    then this commission is just accepted or rejected based on the job status.
71
    Otherwise, a new commission for the given change is issued, that is also in
72
    force and auto-accept mode. In this case, previous commissions are
73
    rejected, since they reflect a previous state of the VM.
74

75
    """
76
    if job_status not in ["success", "error", "canceled"]:
77
        return vm
78

    
79
    # Check successful completion of a job will trigger any quotable change in
80
    # the VM state.
81
    action = utils.get_action_from_opcode(job_opcode, job_fields)
82
    if action == "BUILD":
83
        # Quotas for new VMs are automatically accepted by the API
84
        return vm
85
    commission_info = quotas.get_commission_info(vm, action=action,
86
                                                 action_fields=job_fields)
87

    
88
    if vm.task_job_id == job_id and vm.serial is not None:
89
        # Commission for this change has already been issued. So just
90
        # accept/reject it. Special case is OP_INSTANCE_CREATE, which even
91
        # if fails, must be accepted, as the user must manually remove the
92
        # failed server
93
        serial = vm.serial
94
        if job_status == "success":
95
            quotas.accept_serial(serial)
96
        elif job_status in ["error", "canceled"]:
97
            log.debug("Job %s failed. Rejecting related serial %s", job_id,
98
                      serial)
99
            quotas.reject_serial(serial)
100
        vm.serial = None
101
    elif job_status == "success" and commission_info is not None:
102
        log.debug("Expected job was %s. Processing job %s. Commission for"
103
                  " this job: %s", vm.task_job_id, job_id, commission_info)
104
        # Commission for this change has not been issued, or the issued
105
        # commission was unaware of the current change. Reject all previous
106
        # commissions and create a new one in forced mode!
107
        commission_name = ("client: dispatcher, resource: %s, ganeti_job: %s"
108
                           % (vm, job_id))
109
        quotas.handle_resource_commission(vm, action,
110
                                          commission_info=commission_info,
111
                                          commission_name=commission_name,
112
                                          force=True,
113
                                          auto_accept=True)
114
        log.debug("Issued new commission: %s", vm.serial)
115

    
116
    return vm
117

    
118

    
119
@transaction.commit_on_success
120
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
121
                      job_fields=None):
122
    """Process a job progress notification from the backend
123

124
    Process an incoming message from the backend (currently Ganeti).
125
    Job notifications with a terminating status (sucess, error, or canceled),
126
    also update the operating state of the VM.
127

128
    """
129
    # See #1492, #1031, #1111 why this line has been removed
130
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
131
    if status not in [x[0] for x in BACKEND_STATUSES]:
132
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
133

    
134
    vm.backendjobid = jobid
135
    vm.backendjobstatus = status
136
    vm.backendopcode = opcode
137
    vm.backendlogmsg = logmsg
138

    
139
    if status in ["queued", "waiting", "running"]:
140
        vm.save()
141
        return
142

    
143
    if job_fields is None:
144
        job_fields = {}
145
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode)
146

    
147
    # Notifications of success change the operating state
148
    if status == "success":
149
        if state_for_success is not None:
150
            vm.operstate = state_for_success
151
        beparams = job_fields.get("beparams", None)
152
        if beparams:
153
            # Change the flavor of the VM
154
            _process_resize(vm, beparams)
155
        # Update backendtime only for jobs that have been successfully
156
        # completed, since only these jobs update the state of the VM. Else a
157
        # "race condition" may occur when a successful job (e.g.
158
        # OP_INSTANCE_REMOVE) completes before an error job and messages arrive
159
        # in reversed order.
160
        vm.backendtime = etime
161

    
162
    if status in ["success", "error", "canceled"] and nics is not None:
163
        # Update the NICs of the VM
164
        _process_net_status(vm, etime, nics)
165

    
166
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
167
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
168
        vm.operstate = 'ERROR'
169
        vm.backendtime = etime
170
    elif opcode == 'OP_INSTANCE_REMOVE':
171
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
172
        # when no instance exists at the Ganeti backend.
173
        # See ticket #799 for all the details.
174
        if status == 'success' or (status == 'error' and
175
                                   not vm_exists_in_backend(vm)):
176
            # VM has been deleted
177
            for nic in vm.nics.all():
178
                # Release the IP
179
                release_nic_address(nic)
180
                # And delete the NIC.
181
                nic.delete()
182
            vm.deleted = True
183
            vm.operstate = state_for_success
184
            vm.backendtime = etime
185
            status = "success"
186

    
187
    if status in ["success", "error", "canceled"]:
188
        # Job is finalized: Handle quotas/commissioning
189
        vm = handle_vm_quotas(vm, job_id=jobid, job_opcode=opcode,
190
                              job_status=status, job_fields=job_fields)
191
        # and clear task fields
192
        if vm.task_job_id == jobid:
193
            vm.task = None
194
            vm.task_job_id = None
195

    
196
    vm.save()
197

    
198

    
199
def _process_resize(vm, beparams):
200
    """Change flavor of a VirtualMachine based on new beparams."""
201
    old_flavor = vm.flavor
202
    vcpus = beparams.get("vcpus", old_flavor.cpu)
203
    ram = beparams.get("maxmem", old_flavor.ram)
204
    if vcpus == old_flavor.cpu and ram == old_flavor.ram:
205
        return
206
    try:
207
        new_flavor = Flavor.objects.get(cpu=vcpus, ram=ram,
208
                                        disk=old_flavor.disk,
209
                                        disk_template=old_flavor.disk_template)
210
    except Flavor.DoesNotExist:
211
        raise Exception("Can not find flavor for VM")
212
    vm.flavor = new_flavor
213
    vm.save()
214

    
215

    
216
@transaction.commit_on_success
217
def process_net_status(vm, etime, nics):
218
    """Wrap _process_net_status inside transaction."""
219
    _process_net_status(vm, etime, nics)
220

    
221

    
222
def _process_net_status(vm, etime, nics):
223
    """Process a net status notification from the backend
224

225
    Process an incoming message from the Ganeti backend,
226
    detailing the NIC configuration of a VM instance.
227

228
    Update the state of the VM in the DB accordingly.
229

230
    """
231
    ganeti_nics = process_ganeti_nics(nics)
232
    db_nics = dict([(nic.id, nic) for nic in vm.nics.all()])
233

    
234
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
235
    # guarantee that no deadlock will occur with Backend allocator.
236
    Backend.objects.select_for_update().get(id=vm.backend_id)
237

    
238
    for nic_name in set(db_nics.keys()) | set(ganeti_nics.keys()):
239
        db_nic = db_nics.get(nic_name)
240
        ganeti_nic = ganeti_nics.get(nic_name)
241
        if ganeti_nic is None:
242
            # NIC exists in DB but not in Ganeti. If the NIC is in 'building'
243
            # state for more than 5 minutes, then we remove the NIC.
244
            # TODO: This is dangerous as the job may be stack in the queue, and
245
            # releasing the IP may lead to duplicate IP use.
246
            if nic.state != "BUILDING" or (nic.state == "BUILDING" and
247
               etime > nic.created + timedelta(seconds=BUILDING_NIC_TIMEOUT)):
248
                release_nic_address(nic)
249
                nic.delete()
250
            else:
251
                log.warning("Ignoring recent building NIC: %s", nic)
252
        elif db_nic is None:
253
            # NIC exists in Ganeti but not in DB
254
            if ganeti_nic["ipv4"]:
255
                network = ganeti_nic["network"]
256
                network.reserve_address(ganeti_nic["ipv4"])
257
            vm.nics.create(**ganeti_nic)
258
        elif not nics_are_equal(db_nic, ganeti_nic):
259
            # Special case where the IPv4 address has changed, because you
260
            # need to release the old IPv4 address and reserve the new one
261
            if db_nic.ipv4 != ganeti_nic["ipv4"]:
262
                release_nic_address(db_nic)
263
                if ganeti_nic["ipv4"]:
264
                    ganeti_nic["network"].reserve_address(ganeti_nic["ipv4"])
265

    
266
            # Update the NIC in DB with the values from Ganeti NIC
267
            [setattr(db_nic, f, ganeti_nic[f]) for f in NIC_FIELDS]
268
            db_nic.save()
269

    
270
            # Dummy update the network, to work with 'changed-since'
271
            db_nic.network.save()
272

    
273
    vm.backendtime = etime
274
    vm.save()
275

    
276

    
277
def nics_are_equal(db_nic, gnt_nic):
278
    for field in NIC_FIELDS:
279
        if getattr(db_nic, field) != gnt_nic[field]:
280
            return False
281
    return True
282

    
283

    
284
def process_ganeti_nics(ganeti_nics):
285
    """Process NIC dict from ganeti"""
286
    new_nics = []
287
    for index, gnic in enumerate(ganeti_nics):
288
        nic_name = gnic.get("name", None)
289
        if nic_name is not None:
290
            nic_id = utils.id_from_nic_name(nic_name)
291
        else:
292
            # Put as default value the index. If it is an unknown NIC to
293
            # synnefo it will be created automaticaly.
294
            nic_id = "unknown-" + str(index)
295
        network_name = gnic.get('network', '')
296
        network_id = utils.id_from_network_name(network_name)
297
        network = Network.objects.get(id=network_id)
298

    
299
        # Get the new nic info
300
        mac = gnic.get('mac')
301
        ipv4 = gnic.get('ip')
302
        ipv6 = mac2eui64(mac, network.subnet6)\
303
            if network.subnet6 is not None else None
304

    
305
        firewall = gnic.get('firewall')
306
        firewall_profile = _reverse_tags.get(firewall)
307
        if not firewall_profile and network.public:
308
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
309

    
310
        nic_info = {
311
            'index': index,
312
            'network': network,
313
            'mac': mac,
314
            'ipv4': ipv4,
315
            'ipv6': ipv6,
316
            'firewall_profile': firewall_profile,
317
            'state': 'ACTIVE'}
318

    
319
        new_nics.append((nic_id, nic_info))
320
    return dict(new_nics)
321

    
322

    
323
def release_nic_address(nic):
324
    """Release the IPv4 address of a NIC.
325

326
    Check if an instance's NIC has an IPv4 address and release it if it is not
327
    a Floating IP.
328

329
    """
330

    
331
    if nic.ipv4 and not nic.ip_type == "FLOATING":
332
        nic.network.release_address(nic.ipv4)
333

    
334

    
335
@transaction.commit_on_success
336
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
337
    if status not in [x[0] for x in BACKEND_STATUSES]:
338
        raise Network.InvalidBackendMsgError(opcode, status)
339

    
340
    back_network.backendjobid = jobid
341
    back_network.backendjobstatus = status
342
    back_network.backendopcode = opcode
343
    back_network.backendlogmsg = logmsg
344

    
345
    network = back_network.network
346

    
347
    # Notifications of success change the operating state
348
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
349
    if status == 'success' and state_for_success is not None:
350
        back_network.operstate = state_for_success
351

    
352
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
353
        back_network.operstate = 'ERROR'
354
        back_network.backendtime = etime
355

    
356
    if opcode == 'OP_NETWORK_REMOVE':
357
        network_is_deleted = (status == "success")
358
        if network_is_deleted or (status == "error" and not
359
                                  network_exists_in_backend(back_network)):
360
            back_network.operstate = state_for_success
361
            back_network.deleted = True
362
            back_network.backendtime = etime
363

    
364
    if status == 'success':
365
        back_network.backendtime = etime
366
    back_network.save()
367
    # Also you must update the state of the Network!!
368
    update_network_state(network)
369

    
370

    
371
def update_network_state(network):
372
    """Update the state of a Network based on BackendNetwork states.
373

374
    Update the state of a Network based on the operstate of the networks in the
375
    backends that network exists.
376

377
    The state of the network is:
378
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
379
    * DELETED: If it is is 'DELETED' in all backends that have been created.
380

381
    This function also releases the resources (MAC prefix or Bridge) and the
382
    quotas for the network.
383

384
    """
385
    if network.deleted:
386
        # Network has already been deleted. Just assert that state is also
387
        # DELETED
388
        if not network.state == "DELETED":
389
            network.state = "DELETED"
390
            network.save()
391
        return
392

    
393
    backend_states = [s.operstate for s in network.backend_networks.all()]
394
    if not backend_states and network.action != "DESTROY":
395
        if network.state != "ACTIVE":
396
            network.state = "ACTIVE"
397
            network.save()
398
            return
399

    
400
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
401
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
402
                     "DELETED")
403

    
404
    # Release the resources on the deletion of the Network
405
    if deleted:
406
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
407
                 network.id, network.mac_prefix, network.link)
408
        network.deleted = True
409
        network.state = "DELETED"
410
        if network.mac_prefix:
411
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
412
                release_resource(res_type="mac_prefix",
413
                                 value=network.mac_prefix)
414
        if network.link:
415
            if network.FLAVORS[network.flavor]["link"] == "pool":
416
                release_resource(res_type="bridge", value=network.link)
417

    
418
        # Issue commission
419
        if network.userid:
420
            quotas.issue_and_accept_commission(network, delete=True)
421
            # the above has already saved the object and committed;
422
            # a second save would override others' changes, since the
423
            # object is now unlocked
424
            return
425
        elif not network.public:
426
            log.warning("Network %s does not have an owner!", network.id)
427
    network.save()
428

    
429

    
430
@transaction.commit_on_success
431
def process_network_modify(back_network, etime, jobid, opcode, status,
432
                           job_fields):
433
    assert (opcode == "OP_NETWORK_SET_PARAMS")
434
    if status not in [x[0] for x in BACKEND_STATUSES]:
435
        raise Network.InvalidBackendMsgError(opcode, status)
436

    
437
    back_network.backendjobid = jobid
438
    back_network.backendjobstatus = status
439
    back_network.opcode = opcode
440

    
441
    add_reserved_ips = job_fields.get("add_reserved_ips")
442
    if add_reserved_ips:
443
        net = back_network.network
444
        pool = net.get_pool()
445
        if add_reserved_ips:
446
            for ip in add_reserved_ips:
447
                pool.reserve(ip, external=True)
448
        pool.save()
449

    
450
    if status == 'success':
451
        back_network.backendtime = etime
452
    back_network.save()
453

    
454

    
455
@transaction.commit_on_success
456
def process_create_progress(vm, etime, progress):
457

    
458
    percentage = int(progress)
459

    
460
    # The percentage may exceed 100%, due to the way
461
    # snf-image:copy-progress tracks bytes read by image handling processes
462
    percentage = 100 if percentage > 100 else percentage
463
    if percentage < 0:
464
        raise ValueError("Percentage cannot be negative")
465

    
466
    # FIXME: log a warning here, see #1033
467
#   if last_update > percentage:
468
#       raise ValueError("Build percentage should increase monotonically " \
469
#                        "(old = %d, new = %d)" % (last_update, percentage))
470

    
471
    # This assumes that no message of type 'ganeti-create-progress' is going to
472
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
473
    # the instance is STARTED.  What if the two messages are processed by two
474
    # separate dispatcher threads, and the 'ganeti-op-status' message for
475
    # successful creation gets processed before the 'ganeti-create-progress'
476
    # message? [vkoukis]
477
    #
478
    #if not vm.operstate == 'BUILD':
479
    #    raise VirtualMachine.IllegalState("VM is not in building state")
480

    
481
    vm.buildpercentage = percentage
482
    vm.backendtime = etime
483
    vm.save()
484

    
485

    
486
@transaction.commit_on_success
487
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
488
                               details=None):
489
    """
490
    Create virtual machine instance diagnostic entry.
491

492
    :param vm: VirtualMachine instance to create diagnostic for.
493
    :param message: Diagnostic message.
494
    :param source: Diagnostic source identifier (e.g. image-helper).
495
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
496
    :param etime: The time the message occured (if available).
497
    :param details: Additional details or debug information.
498
    """
499
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
500
                                                   source_date=etime,
501
                                                   message=message,
502
                                                   details=details)
503

    
504

    
505
def create_instance(vm, nics, flavor, image):
506
    """`image` is a dictionary which should contain the keys:
507
            'backend_id', 'format' and 'metadata'
508

509
        metadata value should be a dictionary.
510
    """
511

    
512
    # Handle arguments to CreateInstance() as a dictionary,
513
    # initialize it based on a deployment-specific value.
514
    # This enables the administrator to override deployment-specific
515
    # arguments, such as the disk template to use, name of os provider
516
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
517
    #
518
    kw = vm.backend.get_create_params()
519
    kw['mode'] = 'create'
520
    kw['name'] = vm.backend_vm_id
521
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
522

    
523
    kw['disk_template'] = flavor.disk_template
524
    kw['disks'] = [{"size": flavor.disk * 1024}]
525
    provider = flavor.disk_provider
526
    if provider:
527
        kw['disks'][0]['provider'] = provider
528
        kw['disks'][0]['origin'] = flavor.disk_origin
529

    
530
    kw['nics'] = [{"name": nic.backend_uuid,
531
                   "network": nic.network.backend_id,
532
                   "ip": nic.ipv4}
533
                  for nic in nics]
534
    backend = vm.backend
535
    depend_jobs = []
536
    for nic in nics:
537
        network = Network.objects.select_for_update().get(id=nic.network.id)
538
        bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
539
                                                             network=network)
540
        if bnet.operstate != "ACTIVE":
541
            if network.public:
542
                msg = "Can not connect instance to network %s. Network is not"\
543
                      " ACTIVE in backend %s." % (network, backend)
544
                raise Exception(msg)
545
            else:
546
                jobs = create_network(network, backend, connect=True)
547
                if isinstance(jobs, list):
548
                    depend_jobs.extend(jobs)
549
                else:
550
                    depend_jobs.append(jobs)
551
    kw["depends"] = [[job, ["success", "error", "canceled"]]
552
                     for job in depend_jobs]
553

    
554
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
555
    # kw['os'] = settings.GANETI_OS_PROVIDER
556
    kw['ip_check'] = False
557
    kw['name_check'] = False
558

    
559
    # Do not specific a node explicitly, have
560
    # Ganeti use an iallocator instead
561
    #kw['pnode'] = rapi.GetNodes()[0]
562

    
563
    kw['dry_run'] = settings.TEST
564

    
565
    kw['beparams'] = {
566
        'auto_balance': True,
567
        'vcpus': flavor.cpu,
568
        'memory': flavor.ram}
569

    
570
    kw['osparams'] = {
571
        'config_url': vm.config_url,
572
        # Store image id and format to Ganeti
573
        'img_id': image['backend_id'],
574
        'img_format': image['format']}
575

    
576
    # Use opportunistic locking
577
    kw['opportunistic_locking'] = True
578

    
579
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
580
    # kw['hvparams'] = dict(serial_console=False)
581

    
582
    log.debug("Creating instance %s", utils.hide_pass(kw))
583
    with pooled_rapi_client(vm) as client:
584
        return client.CreateInstance(**kw)
585

    
586

    
587
def delete_instance(vm):
588
    with pooled_rapi_client(vm) as client:
589
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
590

    
591

    
592
def reboot_instance(vm, reboot_type):
593
    assert reboot_type in ('soft', 'hard')
594
    kwargs = {"instance": vm.backend_vm_id,
595
              "reboot_type": "hard"}
596
    # XXX: Currently shutdown_timeout parameter is not supported from the
597
    # Ganeti RAPI. Until supported, we will fallback for both reboot types
598
    # to the default shutdown timeout of Ganeti (120s). Note that reboot
599
    # type of Ganeti job must be always hard. The 'soft' and 'hard' type
600
    # of OS API is different from the one in Ganeti, and maps to
601
    # 'shutdown_timeout'.
602
    #if reboot_type == "hard":
603
    #    kwargs["shutdown_timeout"] = 0
604
    if settings.TEST:
605
        kwargs["dry_run"] = True
606
    with pooled_rapi_client(vm) as client:
607
        return client.RebootInstance(**kwargs)
608

    
609

    
610
def startup_instance(vm):
611
    with pooled_rapi_client(vm) as client:
612
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
613

    
614

    
615
def shutdown_instance(vm):
616
    with pooled_rapi_client(vm) as client:
617
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
618

    
619

    
620
def resize_instance(vm, vcpus, memory):
621
    beparams = {"vcpus": int(vcpus),
622
                "minmem": int(memory),
623
                "maxmem": int(memory)}
624
    with pooled_rapi_client(vm) as client:
625
        return client.ModifyInstance(vm.backend_vm_id, beparams=beparams)
626

    
627

    
628
def get_instance_console(vm):
629
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
630
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
631
    # useless (see #783).
632
    #
633
    # Until this is fixed on the Ganeti side, construct a console info reply
634
    # directly.
635
    #
636
    # WARNING: This assumes that VNC runs on port network_port on
637
    #          the instance's primary node, and is probably
638
    #          hypervisor-specific.
639
    #
640
    log.debug("Getting console for vm %s", vm)
641

    
642
    console = {}
643
    console['kind'] = 'vnc'
644

    
645
    with pooled_rapi_client(vm) as client:
646
        i = client.GetInstance(vm.backend_vm_id)
647

    
648
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
649
        raise Exception("hv parameter serial_console cannot be true")
650
    console['host'] = i['pnode']
651
    console['port'] = i['network_port']
652

    
653
    return console
654

    
655

    
656
def get_instance_info(vm):
657
    with pooled_rapi_client(vm) as client:
658
        return client.GetInstance(vm.backend_vm_id)
659

    
660

    
661
def vm_exists_in_backend(vm):
662
    try:
663
        get_instance_info(vm)
664
        return True
665
    except GanetiApiError as e:
666
        if e.code == 404:
667
            return False
668
        raise e
669

    
670

    
671
def get_network_info(backend_network):
672
    with pooled_rapi_client(backend_network) as client:
673
        return client.GetNetwork(backend_network.network.backend_id)
674

    
675

    
676
def network_exists_in_backend(backend_network):
677
    try:
678
        get_network_info(backend_network)
679
        return True
680
    except GanetiApiError as e:
681
        if e.code == 404:
682
            return False
683

    
684

    
685
def create_network(network, backend, connect=True):
686
    """Create a network in a Ganeti backend"""
687
    log.debug("Creating network %s in backend %s", network, backend)
688

    
689
    job_id = _create_network(network, backend)
690

    
691
    if connect:
692
        job_ids = connect_network(network, backend, depends=[job_id])
693
        return job_ids
694
    else:
695
        return [job_id]
696

    
697

    
698
def _create_network(network, backend):
699
    """Create a network."""
700

    
701
    tags = network.backend_tag
702
    if network.dhcp:
703
        tags.append('nfdhcpd')
704

    
705
    if network.public:
706
        conflicts_check = True
707
        tags.append('public')
708
    else:
709
        conflicts_check = False
710
        tags.append('private')
711

    
712
    # Use a dummy network subnet for IPv6 only networks. Currently Ganeti does
713
    # not support IPv6 only networks. To bypass this limitation, we create the
714
    # network with a dummy network subnet, and make Cyclades connect instances
715
    # to such networks, with address=None.
716
    subnet = network.subnet
717
    if subnet is None:
718
        subnet = "10.0.0.0/24"
719

    
720
    try:
721
        bn = BackendNetwork.objects.get(network=network, backend=backend)
722
        mac_prefix = bn.mac_prefix
723
    except BackendNetwork.DoesNotExist:
724
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
725
                        " does not exist" % (network.id, backend.id))
726

    
727
    with pooled_rapi_client(backend) as client:
728
        return client.CreateNetwork(network_name=network.backend_id,
729
                                    network=subnet,
730
                                    network6=network.subnet6,
731
                                    gateway=network.gateway,
732
                                    gateway6=network.gateway6,
733
                                    mac_prefix=mac_prefix,
734
                                    conflicts_check=conflicts_check,
735
                                    tags=tags)
736

    
737

    
738
def connect_network(network, backend, depends=[], group=None):
739
    """Connect a network to nodegroups."""
740
    log.debug("Connecting network %s to backend %s", network, backend)
741

    
742
    if network.public:
743
        conflicts_check = True
744
    else:
745
        conflicts_check = False
746

    
747
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
748
    with pooled_rapi_client(backend) as client:
749
        groups = [group] if group is not None else client.GetGroups()
750
        job_ids = []
751
        for group in groups:
752
            job_id = client.ConnectNetwork(network.backend_id, group,
753
                                           network.mode, network.link,
754
                                           conflicts_check,
755
                                           depends=depends)
756
            job_ids.append(job_id)
757
    return job_ids
758

    
759

    
760
def delete_network(network, backend, disconnect=True):
761
    log.debug("Deleting network %s from backend %s", network, backend)
762

    
763
    depends = []
764
    if disconnect:
765
        depends = disconnect_network(network, backend)
766
    _delete_network(network, backend, depends=depends)
767

    
768

    
769
def _delete_network(network, backend, depends=[]):
770
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
771
    with pooled_rapi_client(backend) as client:
772
        return client.DeleteNetwork(network.backend_id, depends)
773

    
774

    
775
def disconnect_network(network, backend, group=None):
776
    log.debug("Disconnecting network %s to backend %s", network, backend)
777

    
778
    with pooled_rapi_client(backend) as client:
779
        groups = [group] if group is not None else client.GetGroups()
780
        job_ids = []
781
        for group in groups:
782
            job_id = client.DisconnectNetwork(network.backend_id, group)
783
            job_ids.append(job_id)
784
    return job_ids
785

    
786

    
787
def connect_to_network(vm, nic):
788
    network = nic.network
789
    backend = vm.backend
790
    network = Network.objects.select_for_update().get(id=network.id)
791
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
792
                                                         network=network)
793
    depend_jobs = []
794
    if bnet.operstate != "ACTIVE":
795
        depend_jobs = create_network(network, backend, connect=True)
796

    
797
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
798

    
799
    nic = {'name': nic.backend_uuid,
800
           'network': network.backend_id,
801
           'ip': nic.ipv4}
802

    
803
    log.debug("Adding NIC %s to VM %s", nic, vm)
804

    
805
    kwargs = {
806
        "instance": vm.backend_vm_id,
807
        "nics": [("add", "-1", nic)],
808
        "depends": depends,
809
    }
810
    if vm.backend.use_hotplug():
811
        kwargs["hotplug"] = True
812
    if settings.TEST:
813
        kwargs["dry_run"] = True
814

    
815
    with pooled_rapi_client(vm) as client:
816
        return client.ModifyInstance(**kwargs)
817

    
818

    
819
def disconnect_from_network(vm, nic):
820
    log.debug("Removing NIC %s of VM %s", nic, vm)
821

    
822
    kwargs = {
823
        "instance": vm.backend_vm_id,
824
        "nics": [("remove", nic.index, {})],
825
    }
826
    if vm.backend.use_hotplug():
827
        kwargs["hotplug"] = True
828
    if settings.TEST:
829
        kwargs["dry_run"] = True
830

    
831
    with pooled_rapi_client(vm) as client:
832
        jobID = client.ModifyInstance(**kwargs)
833
        # If the NIC has a tag for a firewall profile it must be deleted,
834
        # otherwise it may affect another NIC. XXX: Deleting the tag should
835
        # depend on the removing the NIC, but currently RAPI client does not
836
        # support this, this may result in clearing the firewall profile
837
        # without successfully removing the NIC. This issue will be fixed with
838
        # use of NIC UUIDs.
839
        firewall_profile = nic.firewall_profile
840
        if firewall_profile and firewall_profile != "DISABLED":
841
            tag = _firewall_tags[firewall_profile] % nic.index
842
            client.DeleteInstanceTags(vm.backend_vm_id, [tag],
843
                                      dry_run=settings.TEST)
844

    
845
        return jobID
846

    
847

    
848
def set_firewall_profile(vm, profile, index=0):
849
    try:
850
        tag = _firewall_tags[profile] % index
851
    except KeyError:
852
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
853

    
854
    log.debug("Setting tag of VM %s, NIC index %d, to %s", vm, index, profile)
855

    
856
    with pooled_rapi_client(vm) as client:
857
        # Delete previous firewall tags
858
        old_tags = client.GetInstanceTags(vm.backend_vm_id)
859
        delete_tags = [(t % index) for t in _firewall_tags.values()
860
                       if (t % index) in old_tags]
861
        if delete_tags:
862
            client.DeleteInstanceTags(vm.backend_vm_id, delete_tags,
863
                                      dry_run=settings.TEST)
864

    
865
        if profile != "DISABLED":
866
            client.AddInstanceTags(vm.backend_vm_id, [tag],
867
                                   dry_run=settings.TEST)
868

    
869
        # XXX NOP ModifyInstance call to force process_net_status to run
870
        # on the dispatcher
871
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
872
        client.ModifyInstance(vm.backend_vm_id,
873
                              os_name=os_name)
874
    return None
875

    
876

    
877
def get_instances(backend, bulk=True):
878
    with pooled_rapi_client(backend) as c:
879
        return c.GetInstances(bulk=bulk)
880

    
881

    
882
def get_nodes(backend, bulk=True):
883
    with pooled_rapi_client(backend) as c:
884
        return c.GetNodes(bulk=bulk)
885

    
886

    
887
def get_jobs(backend, bulk=True):
888
    with pooled_rapi_client(backend) as c:
889
        return c.GetJobs(bulk=bulk)
890

    
891

    
892
def get_physical_resources(backend):
893
    """ Get the physical resources of a backend.
894

895
    Get the resources of a backend as reported by the backend (not the db).
896

897
    """
898
    nodes = get_nodes(backend, bulk=True)
899
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
900
    res = {}
901
    for a in attr:
902
        res[a] = 0
903
    for n in nodes:
904
        # Filter out drained, offline and not vm_capable nodes since they will
905
        # not take part in the vm allocation process
906
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
907
        if can_host_vms and n['cnodes']:
908
            for a in attr:
909
                res[a] += int(n[a])
910
    return res
911

    
912

    
913
def update_backend_resources(backend, resources=None):
914
    """ Update the state of the backend resources in db.
915

916
    """
917

    
918
    if not resources:
919
        resources = get_physical_resources(backend)
920

    
921
    backend.mfree = resources['mfree']
922
    backend.mtotal = resources['mtotal']
923
    backend.dfree = resources['dfree']
924
    backend.dtotal = resources['dtotal']
925
    backend.pinst_cnt = resources['pinst_cnt']
926
    backend.ctotal = resources['ctotal']
927
    backend.updated = datetime.now()
928
    backend.save()
929

    
930

    
931
def get_memory_from_instances(backend):
932
    """ Get the memory that is used from instances.
933

934
    Get the used memory of a backend. Note: This is different for
935
    the real memory used, due to kvm's memory de-duplication.
936

937
    """
938
    with pooled_rapi_client(backend) as client:
939
        instances = client.GetInstances(bulk=True)
940
    mem = 0
941
    for i in instances:
942
        mem += i['oper_ram']
943
    return mem
944

    
945

    
946
def get_available_disk_templates(backend):
947
    """Get the list of available disk templates of a Ganeti backend.
948

949
    The list contains the disk templates that are enabled in the Ganeti backend
950
    and also included in ipolicy-disk-templates.
951

952
    """
953
    with pooled_rapi_client(backend) as c:
954
        info = c.GetInfo()
955
    ipolicy_disk_templates = info["ipolicy"]["disk-templates"]
956
    try:
957
        enabled_disk_templates = info["enabled_disk_templates"]
958
        return [dp for dp in enabled_disk_templates
959
                if dp in ipolicy_disk_templates]
960
    except KeyError:
961
        # Ganeti < 2.8 does not have 'enabled_disk_templates'
962
        return ipolicy_disk_templates
963

    
964

    
965
def update_backend_disk_templates(backend):
966
    disk_templates = get_available_disk_templates(backend)
967
    backend.disk_templates = disk_templates
968
    backend.save()
969

    
970

    
971
##
972
## Synchronized operations for reconciliation
973
##
974

    
975

    
976
def create_network_synced(network, backend):
977
    result = _create_network_synced(network, backend)
978
    if result[0] != 'success':
979
        return result
980
    result = connect_network_synced(network, backend)
981
    return result
982

    
983

    
984
def _create_network_synced(network, backend):
985
    with pooled_rapi_client(backend) as client:
986
        job = _create_network(network, backend)
987
        result = wait_for_job(client, job)
988
    return result
989

    
990

    
991
def connect_network_synced(network, backend):
992
    with pooled_rapi_client(backend) as client:
993
        for group in client.GetGroups():
994
            job = client.ConnectNetwork(network.backend_id, group,
995
                                        network.mode, network.link)
996
            result = wait_for_job(client, job)
997
            if result[0] != 'success':
998
                return result
999

    
1000
    return result
1001

    
1002

    
1003
def wait_for_job(client, jobid):
1004
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
1005
    status = result['job_info'][0]
1006
    while status not in ['success', 'error', 'cancel']:
1007
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
1008
                                         [result], None)
1009
        status = result['job_info'][0]
1010

    
1011
    if status == 'success':
1012
        return (status, None)
1013
    else:
1014
        error = result['job_info'][1]
1015
        return (status, error)