Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ d9b5b020

History | View | Annotate | Download (27.8 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic)
40
from synnefo.logic import utils
41
from synnefo import quotas
42
from synnefo.api.util import release_resource
43
from synnefo.util.mac2eui64 import mac2eui64
44
from synnefo.logic.rapi import GanetiApiError, JOB_STATUS_FINALIZED
45

    
46
from logging import getLogger
47
log = getLogger(__name__)
48

    
49

    
50
_firewall_tags = {
51
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
52
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
53
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
54

    
55
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
56

    
57

    
58
@transaction.commit_on_success
59
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None):
60
    """Process a job progress notification from the backend
61

62
    Process an incoming message from the backend (currently Ganeti).
63
    Job notifications with a terminating status (sucess, error, or canceled),
64
    also update the operating state of the VM.
65

66
    """
67
    # See #1492, #1031, #1111 why this line has been removed
68
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
69
    if status not in [x[0] for x in BACKEND_STATUSES]:
70
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
71

    
72
    vm.backendjobid = jobid
73
    vm.backendjobstatus = status
74
    vm.backendopcode = opcode
75
    vm.backendlogmsg = logmsg
76

    
77
    # Update backendtime only for jobs that have been successfully completed,
78
    # since only these jobs update the state of the VM. Else a "race condition"
79
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
80
    # before an error job and messages arrive in reversed order.
81
    if status == 'success':
82
        vm.backendtime = etime
83

    
84
    # Notifications of success change the operating state
85
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
86
    if status == 'success' and state_for_success is not None:
87
        vm.operstate = state_for_success
88

    
89
    # Update the NICs of the VM
90
    if status == "success" and nics is not None:
91
        _process_net_status(vm, etime, nics)
92

    
93
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
94
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
95
        vm.operstate = 'ERROR'
96
        vm.backendtime = etime
97
    elif opcode == 'OP_INSTANCE_REMOVE':
98
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
99
        # when no instance exists at the Ganeti backend.
100
        if status == "success" or (status == "error" and
101
                                   not vm_exists_in_backend(vm)):
102
            _process_net_status(vm, etime, nics=[])
103
            vm.operstate = state_for_success
104
            vm.backendtime = etime
105
            if not vm.deleted:
106
                vm.deleted = True
107
                # Issue and accept commission to Quotaholder
108
                quotas.issue_and_accept_commission(vm, delete=True)
109
                # the above has already saved the object and committed;
110
                # a second save would override others' changes, since the
111
                # object is now unlocked
112
                return
113

    
114
    vm.save()
115

    
116

    
117
@transaction.commit_on_success
118
def process_net_status(vm, etime, nics):
119
    """Wrap _process_net_status inside transaction."""
120
    _process_net_status(vm, etime, nics)
121

    
122

    
123
def _process_net_status(vm, etime, nics):
124
    """Process a net status notification from the backend
125

126
    Process an incoming message from the Ganeti backend,
127
    detailing the NIC configuration of a VM instance.
128

129
    Update the state of the VM in the DB accordingly.
130
    """
131

    
132
    ganeti_nics = process_ganeti_nics(nics)
133
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
134
        log.debug("NICs for VM %s have not changed", vm)
135
        return
136

    
137
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
138
    # guarantee that no deadlock will occur with Backend allocator.
139
    Backend.objects.select_for_update().get(id=vm.backend_id)
140

    
141
    release_instance_nics(vm)
142

    
143
    for nic in ganeti_nics:
144
        ipv4 = nic.get('ipv4', '')
145
        net = nic['network']
146
        if ipv4:
147
            net.reserve_address(ipv4)
148

    
149
        nic['dirty'] = False
150
        vm.nics.create(**nic)
151
        # Dummy save the network, because UI uses changed-since for VMs
152
        # and Networks in order to show the VM NICs
153
        net.save()
154

    
155
    vm.backendtime = etime
156
    vm.save()
157

    
158

    
159
def process_ganeti_nics(ganeti_nics):
160
    """Process NIC dict from ganeti hooks."""
161
    new_nics = []
162
    for i, new_nic in enumerate(ganeti_nics):
163
        network = new_nic.get('network', '')
164
        n = str(network)
165
        pk = utils.id_from_network_name(n)
166

    
167
        net = Network.objects.get(pk=pk)
168

    
169
        # Get the new nic info
170
        mac = new_nic.get('mac', '')
171
        ipv4 = new_nic.get('ip', '')
172
        if net.subnet6:
173
            ipv6 = mac2eui64(mac, net.subnet6)
174
        else:
175
            ipv6 = ''
176

    
177
        firewall = new_nic.get('firewall', '')
178
        firewall_profile = _reverse_tags.get(firewall, '')
179
        if not firewall_profile and net.public:
180
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
181

    
182
        nic = {
183
            'index': i,
184
            'network': net,
185
            'mac': mac,
186
            'ipv4': ipv4,
187
            'ipv6': ipv6,
188
            'firewall_profile': firewall_profile,
189
            'state': 'ACTIVE'}
190

    
191
        new_nics.append(nic)
192
    return new_nics
193

    
194

    
195
def nics_changed(old_nics, new_nics):
196
    """Return True if NICs have changed in any way."""
197
    if len(old_nics) != len(new_nics):
198
        return True
199
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
200
    for old_nic, new_nic in zip(old_nics, new_nics):
201
        for field in fields:
202
            if getattr(old_nic, field) != new_nic[field]:
203
                return True
204
    return False
205

    
206

    
207
def release_instance_nics(vm):
208
    for nic in vm.nics.all():
209
        net = nic.network
210
        if nic.ipv4:
211
            net.release_address(nic.ipv4)
212
        nic.delete()
213
        net.save()
214

    
215

    
216
@transaction.commit_on_success
217
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
218
    if status not in [x[0] for x in BACKEND_STATUSES]:
219
        raise Network.InvalidBackendMsgError(opcode, status)
220

    
221
    back_network.backendjobid = jobid
222
    back_network.backendjobstatus = status
223
    back_network.backendopcode = opcode
224
    back_network.backendlogmsg = logmsg
225

    
226
    network = back_network.network
227

    
228
    # Notifications of success change the operating state
229
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
230
    if status == 'success' and state_for_success is not None:
231
        back_network.operstate = state_for_success
232

    
233
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
234
        back_network.operstate = 'ERROR'
235
        back_network.backendtime = etime
236

    
237
    if opcode == 'OP_NETWORK_REMOVE':
238
        network_is_deleted = (status == "success")
239
        if network_is_deleted or (status == "error" and not
240
                                  network_exists_in_backend(back_network)):
241
            back_network.operstate = state_for_success
242
            back_network.deleted = True
243
            back_network.backendtime = etime
244

    
245
    if status == 'success':
246
        back_network.backendtime = etime
247
    back_network.save()
248
    # Also you must update the state of the Network!!
249
    update_network_state(network)
250

    
251

    
252
def update_network_state(network):
253
    """Update the state of a Network based on BackendNetwork states.
254

255
    Update the state of a Network based on the operstate of the networks in the
256
    backends that network exists.
257

258
    The state of the network is:
259
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
260
    * DELETED: If it is is 'DELETED' in all backends that have been created.
261

262
    This function also releases the resources (MAC prefix or Bridge) and the
263
    quotas for the network.
264

265
    """
266
    if network.deleted:
267
        # Network has already been deleted. Just assert that state is also
268
        # DELETED
269
        if not network.state == "DELETED":
270
            network.state = "DELETED"
271
            network.save()
272
        return
273

    
274
    backend_states = [s.operstate for s in network.backend_networks.all()]
275
    if not backend_states and network.action != "DESTROY":
276
        if network.state != "ACTIVE":
277
            network.state = "ACTIVE"
278
            network.save()
279
            return
280

    
281
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
282
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
283
                     "DELETED")
284

    
285
    # Release the resources on the deletion of the Network
286
    if deleted:
287
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
288
                 network.id, network.mac_prefix, network.link)
289
        network.deleted = True
290
        network.state = "DELETED"
291
        if network.mac_prefix:
292
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
293
                release_resource(res_type="mac_prefix",
294
                                 value=network.mac_prefix)
295
        if network.link:
296
            if network.FLAVORS[network.flavor]["link"] == "pool":
297
                release_resource(res_type="bridge", value=network.link)
298

    
299
        # Issue commission
300
        if network.userid:
301
            quotas.issue_and_accept_commission(network, delete=True)
302
            # the above has already saved the object and committed;
303
            # a second save would override others' changes, since the
304
            # object is now unlocked
305
            return
306
        elif not network.public:
307
            log.warning("Network %s does not have an owner!", network.id)
308
    network.save()
309

    
310

    
311
@transaction.commit_on_success
312
def process_network_modify(back_network, etime, jobid, opcode, status,
313
                           add_reserved_ips, remove_reserved_ips):
314
    assert (opcode == "OP_NETWORK_SET_PARAMS")
315
    if status not in [x[0] for x in BACKEND_STATUSES]:
316
        raise Network.InvalidBackendMsgError(opcode, status)
317

    
318
    back_network.backendjobid = jobid
319
    back_network.backendjobstatus = status
320
    back_network.opcode = opcode
321

    
322
    if add_reserved_ips or remove_reserved_ips:
323
        net = back_network.network
324
        pool = net.get_pool()
325
        if add_reserved_ips:
326
            for ip in add_reserved_ips:
327
                pool.reserve(ip, external=True)
328
        if remove_reserved_ips:
329
            for ip in remove_reserved_ips:
330
                pool.put(ip, external=True)
331
        pool.save()
332

    
333
    if status == 'success':
334
        back_network.backendtime = etime
335
    back_network.save()
336

    
337

    
338
@transaction.commit_on_success
339
def process_create_progress(vm, etime, progress):
340

    
341
    percentage = int(progress)
342

    
343
    # The percentage may exceed 100%, due to the way
344
    # snf-image:copy-progress tracks bytes read by image handling processes
345
    percentage = 100 if percentage > 100 else percentage
346
    if percentage < 0:
347
        raise ValueError("Percentage cannot be negative")
348

    
349
    # FIXME: log a warning here, see #1033
350
#   if last_update > percentage:
351
#       raise ValueError("Build percentage should increase monotonically " \
352
#                        "(old = %d, new = %d)" % (last_update, percentage))
353

    
354
    # This assumes that no message of type 'ganeti-create-progress' is going to
355
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
356
    # the instance is STARTED.  What if the two messages are processed by two
357
    # separate dispatcher threads, and the 'ganeti-op-status' message for
358
    # successful creation gets processed before the 'ganeti-create-progress'
359
    # message? [vkoukis]
360
    #
361
    #if not vm.operstate == 'BUILD':
362
    #    raise VirtualMachine.IllegalState("VM is not in building state")
363

    
364
    vm.buildpercentage = percentage
365
    vm.backendtime = etime
366
    vm.save()
367

    
368

    
369
@transaction.commit_on_success
370
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
371
                               details=None):
372
    """
373
    Create virtual machine instance diagnostic entry.
374

375
    :param vm: VirtualMachine instance to create diagnostic for.
376
    :param message: Diagnostic message.
377
    :param source: Diagnostic source identifier (e.g. image-helper).
378
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
379
    :param etime: The time the message occured (if available).
380
    :param details: Additional details or debug information.
381
    """
382
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
383
                                                   source_date=etime,
384
                                                   message=message,
385
                                                   details=details)
386

    
387

    
388
def create_instance(vm, public_nic, flavor, image):
389
    """`image` is a dictionary which should contain the keys:
390
            'backend_id', 'format' and 'metadata'
391

392
        metadata value should be a dictionary.
393
    """
394

    
395
    # Handle arguments to CreateInstance() as a dictionary,
396
    # initialize it based on a deployment-specific value.
397
    # This enables the administrator to override deployment-specific
398
    # arguments, such as the disk template to use, name of os provider
399
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
400
    #
401
    kw = vm.backend.get_create_params()
402
    kw['mode'] = 'create'
403
    kw['name'] = vm.backend_vm_id
404
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
405

    
406
    kw['disk_template'] = flavor.disk_template
407
    kw['disks'] = [{"size": flavor.disk * 1024}]
408
    provider = flavor.disk_provider
409
    if provider:
410
        kw['disks'][0]['provider'] = provider
411
        kw['disks'][0]['origin'] = flavor.disk_origin
412

    
413
    kw['nics'] = [public_nic]
414
    if vm.backend.use_hotplug():
415
        kw['hotplug'] = True
416
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
417
    # kw['os'] = settings.GANETI_OS_PROVIDER
418
    kw['ip_check'] = False
419
    kw['name_check'] = False
420

    
421
    # Do not specific a node explicitly, have
422
    # Ganeti use an iallocator instead
423
    #kw['pnode'] = rapi.GetNodes()[0]
424

    
425
    kw['dry_run'] = settings.TEST
426

    
427
    kw['beparams'] = {
428
        'auto_balance': True,
429
        'vcpus': flavor.cpu,
430
        'memory': flavor.ram}
431

    
432
    kw['osparams'] = {
433
        'config_url': vm.config_url,
434
        # Store image id and format to Ganeti
435
        'img_id': image['backend_id'],
436
        'img_format': image['format']}
437

    
438
    # Use opportunistic locking
439
    kw['opportunistic_locking'] = True
440

    
441
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
442
    # kw['hvparams'] = dict(serial_console=False)
443

    
444
    log.debug("Creating instance %s", utils.hide_pass(kw))
445
    with pooled_rapi_client(vm) as client:
446
        return client.CreateInstance(**kw)
447

    
448

    
449
def delete_instance(vm):
450
    with pooled_rapi_client(vm) as client:
451
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
452

    
453

    
454
def reboot_instance(vm, reboot_type):
455
    assert reboot_type in ('soft', 'hard')
456
    with pooled_rapi_client(vm) as client:
457
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
458
                                     dry_run=settings.TEST)
459

    
460

    
461
def startup_instance(vm):
462
    with pooled_rapi_client(vm) as client:
463
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
464

    
465

    
466
def shutdown_instance(vm):
467
    with pooled_rapi_client(vm) as client:
468
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
469

    
470

    
471
def get_instance_console(vm):
472
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
473
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
474
    # useless (see #783).
475
    #
476
    # Until this is fixed on the Ganeti side, construct a console info reply
477
    # directly.
478
    #
479
    # WARNING: This assumes that VNC runs on port network_port on
480
    #          the instance's primary node, and is probably
481
    #          hypervisor-specific.
482
    #
483
    log.debug("Getting console for vm %s", vm)
484

    
485
    console = {}
486
    console['kind'] = 'vnc'
487

    
488
    with pooled_rapi_client(vm) as client:
489
        i = client.GetInstance(vm.backend_vm_id)
490

    
491
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
492
        raise Exception("hv parameter serial_console cannot be true")
493
    console['host'] = i['pnode']
494
    console['port'] = i['network_port']
495

    
496
    return console
497

    
498

    
499
def get_instance_info(vm):
500
    with pooled_rapi_client(vm) as client:
501
        return client.GetInstance(vm.backend_vm_id)
502

    
503

    
504
def vm_exists_in_backend(vm):
505
    try:
506
        get_instance_info(vm)
507
        return True
508
    except GanetiApiError as e:
509
        if e.code == 404:
510
            return False
511
        raise e
512

    
513

    
514
def get_network_info(backend_network):
515
    with pooled_rapi_client(backend_network) as client:
516
        return client.GetNetwork(backend_network.network.backend_id)
517

    
518

    
519
def network_exists_in_backend(backend_network):
520
    try:
521
        get_network_info(backend_network)
522
        return True
523
    except GanetiApiError as e:
524
        if e.code == 404:
525
            return False
526

    
527

    
528
def job_is_still_running(vm):
529
    with pooled_rapi_client(vm) as c:
530
        try:
531
            job_info = c.GetJobStatus(vm.backendjobid)
532
            return not (job_info["status"] in JOB_STATUS_FINALIZED)
533
        except GanetiApiError:
534
            return False
535

    
536

    
537
def create_network(network, backend, connect=True):
538
    """Create a network in a Ganeti backend"""
539
    log.debug("Creating network %s in backend %s", network, backend)
540

    
541
    job_id = _create_network(network, backend)
542

    
543
    if connect:
544
        job_ids = connect_network(network, backend, depends=[job_id])
545
        return job_ids
546
    else:
547
        return [job_id]
548

    
549

    
550
def _create_network(network, backend):
551
    """Create a network."""
552

    
553
    tags = network.backend_tag
554
    if network.dhcp:
555
        tags.append('nfdhcpd')
556

    
557
    if network.public:
558
        conflicts_check = True
559
        tags.append('public')
560
    else:
561
        conflicts_check = False
562
        tags.append('private')
563

    
564
    try:
565
        bn = BackendNetwork.objects.get(network=network, backend=backend)
566
        mac_prefix = bn.mac_prefix
567
    except BackendNetwork.DoesNotExist:
568
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
569
                        " does not exist" % (network.id, backend.id))
570

    
571
    with pooled_rapi_client(backend) as client:
572
        return client.CreateNetwork(network_name=network.backend_id,
573
                                    network=network.subnet,
574
                                    network6=network.subnet6,
575
                                    gateway=network.gateway,
576
                                    gateway6=network.gateway6,
577
                                    mac_prefix=mac_prefix,
578
                                    conflicts_check=conflicts_check,
579
                                    tags=tags)
580

    
581

    
582
def connect_network(network, backend, depends=[], group=None):
583
    """Connect a network to nodegroups."""
584
    log.debug("Connecting network %s to backend %s", network, backend)
585

    
586
    if network.public:
587
        conflicts_check = True
588
    else:
589
        conflicts_check = False
590

    
591
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
592
    with pooled_rapi_client(backend) as client:
593
        groups = [group] if group is not None else client.GetGroups()
594
        job_ids = []
595
        for group in groups:
596
            job_id = client.ConnectNetwork(network.backend_id, group,
597
                                           network.mode, network.link,
598
                                           conflicts_check,
599
                                           depends=depends)
600
            job_ids.append(job_id)
601
    return job_ids
602

    
603

    
604
def delete_network(network, backend, disconnect=True):
605
    log.debug("Deleting network %s from backend %s", network, backend)
606

    
607
    depends = []
608
    if disconnect:
609
        depends = disconnect_network(network, backend)
610
    _delete_network(network, backend, depends=depends)
611

    
612

    
613
def _delete_network(network, backend, depends=[]):
614
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
615
    with pooled_rapi_client(backend) as client:
616
        return client.DeleteNetwork(network.backend_id, depends)
617

    
618

    
619
def disconnect_network(network, backend, group=None):
620
    log.debug("Disconnecting network %s to backend %s", network, backend)
621

    
622
    with pooled_rapi_client(backend) as client:
623
        groups = [group] if group is not None else client.GetGroups()
624
        job_ids = []
625
        for group in groups:
626
            job_id = client.DisconnectNetwork(network.backend_id, group)
627
            job_ids.append(job_id)
628
    return job_ids
629

    
630

    
631
def connect_to_network(vm, network, address=None):
632
    backend = vm.backend
633
    network = Network.objects.select_for_update().get(id=network.id)
634
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
635
                                                         network=network)
636
    depend_jobs = []
637
    if bnet.operstate != "ACTIVE":
638
        depend_jobs = create_network(network, backend, connect=True)
639

    
640
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
641

    
642
    nic = {'ip': address, 'network': network.backend_id}
643

    
644
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
645

    
646
    with pooled_rapi_client(vm) as client:
647
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
648
                                     hotplug=vm.backend.use_hotplug(),
649
                                     depends=depends,
650
                                     dry_run=settings.TEST)
651

    
652

    
653
def disconnect_from_network(vm, nic):
654
    op = [('remove', nic.index, {})]
655

    
656
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
657

    
658
    with pooled_rapi_client(vm) as client:
659
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
660
                                     hotplug=vm.backend.use_hotplug(),
661
                                     dry_run=settings.TEST)
662

    
663

    
664
def set_firewall_profile(vm, profile):
665
    try:
666
        tag = _firewall_tags[profile]
667
    except KeyError:
668
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
669

    
670
    log.debug("Setting tag of VM %s to %s", vm, profile)
671

    
672
    with pooled_rapi_client(vm) as client:
673
        # Delete all firewall tags
674
        for t in _firewall_tags.values():
675
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
676
                                      dry_run=settings.TEST)
677

    
678
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
679

    
680
        # XXX NOP ModifyInstance call to force process_net_status to run
681
        # on the dispatcher
682
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
683
        client.ModifyInstance(vm.backend_vm_id,
684
                              os_name=os_name)
685

    
686

    
687
def get_ganeti_instances(backend=None, bulk=False):
688
    instances = []
689
    for backend in get_backends(backend):
690
        with pooled_rapi_client(backend) as client:
691
            instances.append(client.GetInstances(bulk=bulk))
692

    
693
    return reduce(list.__add__, instances, [])
694

    
695

    
696
def get_ganeti_nodes(backend=None, bulk=False):
697
    nodes = []
698
    for backend in get_backends(backend):
699
        with pooled_rapi_client(backend) as client:
700
            nodes.append(client.GetNodes(bulk=bulk))
701

    
702
    return reduce(list.__add__, nodes, [])
703

    
704

    
705
def get_ganeti_jobs(backend=None, bulk=False):
706
    jobs = []
707
    for backend in get_backends(backend):
708
        with pooled_rapi_client(backend) as client:
709
            jobs.append(client.GetJobs(bulk=bulk))
710
    return reduce(list.__add__, jobs, [])
711

    
712
##
713
##
714
##
715

    
716

    
717
def get_backends(backend=None):
718
    if backend:
719
        if backend.offline:
720
            return []
721
        return [backend]
722
    return Backend.objects.filter(offline=False)
723

    
724

    
725
def get_physical_resources(backend):
726
    """ Get the physical resources of a backend.
727

728
    Get the resources of a backend as reported by the backend (not the db).
729

730
    """
731
    nodes = get_ganeti_nodes(backend, bulk=True)
732
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
733
    res = {}
734
    for a in attr:
735
        res[a] = 0
736
    for n in nodes:
737
        # Filter out drained, offline and not vm_capable nodes since they will
738
        # not take part in the vm allocation process
739
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
740
        if can_host_vms and n['cnodes']:
741
            for a in attr:
742
                res[a] += int(n[a])
743
    return res
744

    
745

    
746
def update_resources(backend, resources=None):
747
    """ Update the state of the backend resources in db.
748

749
    """
750

    
751
    if not resources:
752
        resources = get_physical_resources(backend)
753

    
754
    backend.mfree = resources['mfree']
755
    backend.mtotal = resources['mtotal']
756
    backend.dfree = resources['dfree']
757
    backend.dtotal = resources['dtotal']
758
    backend.pinst_cnt = resources['pinst_cnt']
759
    backend.ctotal = resources['ctotal']
760
    backend.updated = datetime.now()
761
    backend.save()
762

    
763

    
764
def get_memory_from_instances(backend):
765
    """ Get the memory that is used from instances.
766

767
    Get the used memory of a backend. Note: This is different for
768
    the real memory used, due to kvm's memory de-duplication.
769

770
    """
771
    with pooled_rapi_client(backend) as client:
772
        instances = client.GetInstances(bulk=True)
773
    mem = 0
774
    for i in instances:
775
        mem += i['oper_ram']
776
    return mem
777

    
778
##
779
## Synchronized operations for reconciliation
780
##
781

    
782

    
783
def create_network_synced(network, backend):
784
    result = _create_network_synced(network, backend)
785
    if result[0] != 'success':
786
        return result
787
    result = connect_network_synced(network, backend)
788
    return result
789

    
790

    
791
def _create_network_synced(network, backend):
792
    with pooled_rapi_client(backend) as client:
793
        job = _create_network(network, backend)
794
        result = wait_for_job(client, job)
795
    return result
796

    
797

    
798
def connect_network_synced(network, backend):
799
    with pooled_rapi_client(backend) as client:
800
        for group in client.GetGroups():
801
            job = client.ConnectNetwork(network.backend_id, group,
802
                                        network.mode, network.link)
803
            result = wait_for_job(client, job)
804
            if result[0] != 'success':
805
                return result
806

    
807
    return result
808

    
809

    
810
def wait_for_job(client, jobid):
811
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
812
    status = result['job_info'][0]
813
    while status not in ['success', 'error', 'cancel']:
814
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
815
                                         [result], None)
816
        status = result['job_info'][0]
817

    
818
    if status == 'success':
819
        return (status, None)
820
    else:
821
        error = result['job_info'][1]
822
        return (status, error)