Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / backend.py @ 3f68fa52

History | View | Annotate | Download (27.7 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33
from django.conf import settings
34
from django.db import transaction
35
from datetime import datetime
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, BACKEND_STATUSES,
39
                               pooled_rapi_client, VirtualMachineDiagnostic)
40
from synnefo.logic import utils
41
from synnefo import quotas
42
from synnefo.api.util import release_resource
43
from synnefo.util.mac2eui64 import mac2eui64
44
from synnefo.logic.rapi import GanetiApiError, JOB_STATUS_FINALIZED
45

    
46
from logging import getLogger
47
log = getLogger(__name__)
48

    
49

    
50
_firewall_tags = {
51
    'ENABLED': settings.GANETI_FIREWALL_ENABLED_TAG,
52
    'DISABLED': settings.GANETI_FIREWALL_DISABLED_TAG,
53
    'PROTECTED': settings.GANETI_FIREWALL_PROTECTED_TAG}
54

    
55
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
56

    
57

    
58
@transaction.commit_on_success
59
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None):
60
    """Process a job progress notification from the backend
61

62
    Process an incoming message from the backend (currently Ganeti).
63
    Job notifications with a terminating status (sucess, error, or canceled),
64
    also update the operating state of the VM.
65

66
    """
67
    # See #1492, #1031, #1111 why this line has been removed
68
    #if (opcode not in [x[0] for x in VirtualMachine.BACKEND_OPCODES] or
69
    if status not in [x[0] for x in BACKEND_STATUSES]:
70
        raise VirtualMachine.InvalidBackendMsgError(opcode, status)
71

    
72
    vm.backendjobid = jobid
73
    vm.backendjobstatus = status
74
    vm.backendopcode = opcode
75
    vm.backendlogmsg = logmsg
76

    
77
    # Update backendtime only for jobs that have been successfully completed,
78
    # since only these jobs update the state of the VM. Else a "race condition"
79
    # may occur when a successful job (e.g. OP_INSTANCE_REMOVE) completes
80
    # before an error job and messages arrive in reversed order.
81
    if status == 'success':
82
        vm.backendtime = etime
83

    
84
    # Notifications of success change the operating state
85
    state_for_success = VirtualMachine.OPER_STATE_FROM_OPCODE.get(opcode, None)
86
    if status == 'success' and state_for_success is not None:
87
        vm.operstate = state_for_success
88

    
89
    # Update the NICs of the VM
90
    if status == "success" and nics is not None:
91
        _process_net_status(vm, etime, nics)
92

    
93
    # Special case: if OP_INSTANCE_CREATE fails --> ERROR
94
    if opcode == 'OP_INSTANCE_CREATE' and status in ('canceled', 'error'):
95
        vm.operstate = 'ERROR'
96
        vm.backendtime = etime
97
    elif opcode == 'OP_INSTANCE_REMOVE':
98
        # Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
99
        # when no instance exists at the Ganeti backend.
100
        if status == "success" or (status == "error" and
101
                                   not vm_exists_in_backend(vm)):
102
            _process_net_status(vm, etime, nics=[])
103
            vm.operstate = state_for_success
104
            vm.backendtime = etime
105
            if not vm.deleted:
106
                vm.deleted = True
107
                # Issue and accept commission to Quotaholder
108
                quotas.issue_and_accept_commission(vm, delete=True)
109
                # the above has already saved the object and committed;
110
                # a second save would override others' changes, since the
111
                # object is now unlocked
112
                return
113

    
114
    vm.save()
115

    
116

    
117
@transaction.commit_on_success
118
def process_net_status(vm, etime, nics):
119
    """Wrap _process_net_status inside transaction."""
120
    _process_net_status(vm, etime, nics)
121

    
122

    
123
def _process_net_status(vm, etime, nics):
124
    """Process a net status notification from the backend
125

126
    Process an incoming message from the Ganeti backend,
127
    detailing the NIC configuration of a VM instance.
128

129
    Update the state of the VM in the DB accordingly.
130
    """
131

    
132
    ganeti_nics = process_ganeti_nics(nics)
133
    if not nics_changed(vm.nics.order_by('index'), ganeti_nics):
134
        log.debug("NICs for VM %s have not changed", vm)
135
        return
136

    
137
    # Get X-Lock on backend before getting X-Lock on network IP pools, to
138
    # guarantee that no deadlock will occur with Backend allocator.
139
    Backend.objects.select_for_update().get(id=vm.backend_id)
140

    
141
    release_instance_nics(vm)
142

    
143
    for nic in ganeti_nics:
144
        ipv4 = nic.get('ipv4', '')
145
        net = nic['network']
146
        if ipv4:
147
            net.reserve_address(ipv4)
148

    
149
        nic['dirty'] = False
150
        vm.nics.create(**nic)
151
        # Dummy save the network, because UI uses changed-since for VMs
152
        # and Networks in order to show the VM NICs
153
        net.save()
154

    
155
    vm.backendtime = etime
156
    vm.save()
157

    
158

    
159
def process_ganeti_nics(ganeti_nics):
160
    """Process NIC dict from ganeti hooks."""
161
    new_nics = []
162
    for i, new_nic in enumerate(ganeti_nics):
163
        network = new_nic.get('network', '')
164
        n = str(network)
165
        pk = utils.id_from_network_name(n)
166

    
167
        net = Network.objects.get(pk=pk)
168

    
169
        # Get the new nic info
170
        mac = new_nic.get('mac', '')
171
        ipv4 = new_nic.get('ip', '')
172
        if net.subnet6:
173
            ipv6 = mac2eui64(mac, net.subnet6)
174
        else:
175
            ipv6 = ''
176

    
177
        firewall = new_nic.get('firewall', '')
178
        firewall_profile = _reverse_tags.get(firewall, '')
179
        if not firewall_profile and net.public:
180
            firewall_profile = settings.DEFAULT_FIREWALL_PROFILE
181

    
182
        nic = {
183
            'index': i,
184
            'network': net,
185
            'mac': mac,
186
            'ipv4': ipv4,
187
            'ipv6': ipv6,
188
            'firewall_profile': firewall_profile,
189
            'state': 'ACTIVE'}
190

    
191
        new_nics.append(nic)
192
    return new_nics
193

    
194

    
195
def nics_changed(old_nics, new_nics):
196
    """Return True if NICs have changed in any way."""
197
    if len(old_nics) != len(new_nics):
198
        return True
199
    fields = ["ipv4", "ipv6", "mac", "firewall_profile", "index", "network"]
200
    for old_nic, new_nic in zip(old_nics, new_nics):
201
        for field in fields:
202
            if getattr(old_nic, field) != new_nic[field]:
203
                return True
204
    return False
205

    
206

    
207
def release_instance_nics(vm):
208
    for nic in vm.nics.all():
209
        net = nic.network
210
        if nic.ipv4:
211
            net.release_address(nic.ipv4)
212
        nic.delete()
213
        net.save()
214

    
215

    
216
@transaction.commit_on_success
217
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
218
    if status not in [x[0] for x in BACKEND_STATUSES]:
219
        raise Network.InvalidBackendMsgError(opcode, status)
220

    
221
    back_network.backendjobid = jobid
222
    back_network.backendjobstatus = status
223
    back_network.backendopcode = opcode
224
    back_network.backendlogmsg = logmsg
225

    
226
    network = back_network.network
227

    
228
    # Notifications of success change the operating state
229
    state_for_success = BackendNetwork.OPER_STATE_FROM_OPCODE.get(opcode, None)
230
    if status == 'success' and state_for_success is not None:
231
        back_network.operstate = state_for_success
232

    
233
    if status in ('canceled', 'error') and opcode == 'OP_NETWORK_ADD':
234
        back_network.operstate = 'ERROR'
235
        back_network.backendtime = etime
236

    
237
    if opcode == 'OP_NETWORK_REMOVE':
238
        network_is_deleted = (status == "success")
239
        if network_is_deleted or (status == "error" and not
240
                                  network_exists_in_backend(back_network)):
241
            back_network.operstate = state_for_success
242
            back_network.deleted = True
243
            back_network.backendtime = etime
244

    
245
    if status == 'success':
246
        back_network.backendtime = etime
247
    back_network.save()
248
    # Also you must update the state of the Network!!
249
    update_network_state(network)
250

    
251

    
252
def update_network_state(network):
253
    """Update the state of a Network based on BackendNetwork states.
254

255
    Update the state of a Network based on the operstate of the networks in the
256
    backends that network exists.
257

258
    The state of the network is:
259
    * ACTIVE: If it is 'ACTIVE' in at least one backend.
260
    * DELETED: If it is is 'DELETED' in all backends that have been created.
261

262
    This function also releases the resources (MAC prefix or Bridge) and the
263
    quotas for the network.
264

265
    """
266
    if network.deleted:
267
        # Network has already been deleted. Just assert that state is also
268
        # DELETED
269
        if not network.state == "DELETED":
270
            network.state = "DELETED"
271
            network.save()
272
        return
273

    
274
    backend_states = [s.operstate for s in network.backend_networks.all()]
275
    if not backend_states and network.action != "DESTROY":
276
        if network.state != "ACTIVE":
277
            network.state = "ACTIVE"
278
            network.save()
279
            return
280

    
281
    # Network is deleted when all BackendNetworks go to "DELETED" operstate
282
    deleted = reduce(lambda x, y: x == y and "DELETED", backend_states,
283
                     "DELETED")
284

    
285
    # Release the resources on the deletion of the Network
286
    if deleted:
287
        log.info("Network %r deleted. Releasing link %r mac_prefix %r",
288
                 network.id, network.mac_prefix, network.link)
289
        network.deleted = True
290
        network.state = "DELETED"
291
        if network.mac_prefix:
292
            if network.FLAVORS[network.flavor]["mac_prefix"] == "pool":
293
                release_resource(res_type="mac_prefix",
294
                                 value=network.mac_prefix)
295
        if network.link:
296
            if network.FLAVORS[network.flavor]["link"] == "pool":
297
                release_resource(res_type="bridge", value=network.link)
298

    
299
        # Issue commission
300
        if network.userid:
301
            quotas.issue_and_accept_commission(network, delete=True)
302
            # the above has already saved the object and committed;
303
            # a second save would override others' changes, since the
304
            # object is now unlocked
305
            return
306
        elif not network.public:
307
            log.warning("Network %s does not have an owner!", network.id)
308
    network.save()
309

    
310

    
311
@transaction.commit_on_success
312
def process_network_modify(back_network, etime, jobid, opcode, status,
313
                           add_reserved_ips, remove_reserved_ips):
314
    assert (opcode == "OP_NETWORK_SET_PARAMS")
315
    if status not in [x[0] for x in BACKEND_STATUSES]:
316
        raise Network.InvalidBackendMsgError(opcode, status)
317

    
318
    back_network.backendjobid = jobid
319
    back_network.backendjobstatus = status
320
    back_network.opcode = opcode
321

    
322
    if add_reserved_ips or remove_reserved_ips:
323
        net = back_network.network
324
        pool = net.get_pool()
325
        if add_reserved_ips:
326
            for ip in add_reserved_ips:
327
                pool.reserve(ip, external=True)
328
        if remove_reserved_ips:
329
            for ip in remove_reserved_ips:
330
                pool.put(ip, external=True)
331
        pool.save()
332

    
333
    if status == 'success':
334
        back_network.backendtime = etime
335
    back_network.save()
336

    
337

    
338
@transaction.commit_on_success
339
def process_create_progress(vm, etime, progress):
340

    
341
    percentage = int(progress)
342

    
343
    # The percentage may exceed 100%, due to the way
344
    # snf-image:copy-progress tracks bytes read by image handling processes
345
    percentage = 100 if percentage > 100 else percentage
346
    if percentage < 0:
347
        raise ValueError("Percentage cannot be negative")
348

    
349
    # FIXME: log a warning here, see #1033
350
#   if last_update > percentage:
351
#       raise ValueError("Build percentage should increase monotonically " \
352
#                        "(old = %d, new = %d)" % (last_update, percentage))
353

    
354
    # This assumes that no message of type 'ganeti-create-progress' is going to
355
    # arrive once OP_INSTANCE_CREATE has succeeded for a Ganeti instance and
356
    # the instance is STARTED.  What if the two messages are processed by two
357
    # separate dispatcher threads, and the 'ganeti-op-status' message for
358
    # successful creation gets processed before the 'ganeti-create-progress'
359
    # message? [vkoukis]
360
    #
361
    #if not vm.operstate == 'BUILD':
362
    #    raise VirtualMachine.IllegalState("VM is not in building state")
363

    
364
    vm.buildpercentage = percentage
365
    vm.backendtime = etime
366
    vm.save()
367

    
368

    
369
@transaction.commit_on_success
370
def create_instance_diagnostic(vm, message, source, level="DEBUG", etime=None,
371
                               details=None):
372
    """
373
    Create virtual machine instance diagnostic entry.
374

375
    :param vm: VirtualMachine instance to create diagnostic for.
376
    :param message: Diagnostic message.
377
    :param source: Diagnostic source identifier (e.g. image-helper).
378
    :param level: Diagnostic level (`DEBUG`, `INFO`, `WARNING`, `ERROR`).
379
    :param etime: The time the message occured (if available).
380
    :param details: Additional details or debug information.
381
    """
382
    VirtualMachineDiagnostic.objects.create_for_vm(vm, level, source=source,
383
                                                   source_date=etime,
384
                                                   message=message,
385
                                                   details=details)
386

    
387

    
388
def create_instance(vm, public_nic, flavor, image):
389
    """`image` is a dictionary which should contain the keys:
390
            'backend_id', 'format' and 'metadata'
391

392
        metadata value should be a dictionary.
393
    """
394

    
395
    # Handle arguments to CreateInstance() as a dictionary,
396
    # initialize it based on a deployment-specific value.
397
    # This enables the administrator to override deployment-specific
398
    # arguments, such as the disk template to use, name of os provider
399
    # and hypervisor-specific parameters at will (see Synnefo #785, #835).
400
    #
401
    kw = vm.backend.get_create_params()
402
    kw['mode'] = 'create'
403
    kw['name'] = vm.backend_vm_id
404
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
405

    
406
    kw['disk_template'] = flavor.disk_template
407
    kw['disks'] = [{"size": flavor.disk * 1024}]
408
    provider = flavor.disk_provider
409
    if provider:
410
        kw['disks'][0]['provider'] = provider
411
        kw['disks'][0]['origin'] = flavor.disk_origin
412

    
413
    kw['nics'] = [public_nic]
414
    if vm.backend.use_hotplug():
415
        kw['hotplug'] = True
416
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
417
    # kw['os'] = settings.GANETI_OS_PROVIDER
418
    kw['ip_check'] = False
419
    kw['name_check'] = False
420

    
421
    # Do not specific a node explicitly, have
422
    # Ganeti use an iallocator instead
423
    #kw['pnode'] = rapi.GetNodes()[0]
424

    
425
    kw['dry_run'] = settings.TEST
426

    
427
    kw['beparams'] = {
428
        'auto_balance': True,
429
        'vcpus': flavor.cpu,
430
        'memory': flavor.ram}
431

    
432
    kw['osparams'] = {
433
        'config_url': vm.config_url,
434
        # Store image id and format to Ganeti
435
        'img_id': image['backend_id'],
436
        'img_format': image['format']}
437

    
438
    # Use opportunistic locking
439
    kw['opportunistic_locking'] = True
440

    
441
    # Defined in settings.GANETI_CREATEINSTANCE_KWARGS
442
    # kw['hvparams'] = dict(serial_console=False)
443

    
444
    log.debug("Creating instance %s", utils.hide_pass(kw))
445
    with pooled_rapi_client(vm) as client:
446
        return client.CreateInstance(**kw)
447

    
448

    
449
def delete_instance(vm):
450
    with pooled_rapi_client(vm) as client:
451
        return client.DeleteInstance(vm.backend_vm_id, dry_run=settings.TEST)
452

    
453

    
454
def reboot_instance(vm, reboot_type):
455
    assert reboot_type in ('soft', 'hard')
456
    with pooled_rapi_client(vm) as client:
457
        return client.RebootInstance(vm.backend_vm_id, reboot_type,
458
                                     dry_run=settings.TEST)
459

    
460

    
461
def startup_instance(vm):
462
    with pooled_rapi_client(vm) as client:
463
        return client.StartupInstance(vm.backend_vm_id, dry_run=settings.TEST)
464

    
465

    
466
def shutdown_instance(vm):
467
    with pooled_rapi_client(vm) as client:
468
        return client.ShutdownInstance(vm.backend_vm_id, dry_run=settings.TEST)
469

    
470

    
471
def get_instance_console(vm):
472
    # RAPI GetInstanceConsole() returns endpoints to the vnc_bind_address,
473
    # which is a cluster-wide setting, either 0.0.0.0 or 127.0.0.1, and pretty
474
    # useless (see #783).
475
    #
476
    # Until this is fixed on the Ganeti side, construct a console info reply
477
    # directly.
478
    #
479
    # WARNING: This assumes that VNC runs on port network_port on
480
    #          the instance's primary node, and is probably
481
    #          hypervisor-specific.
482
    #
483
    log.debug("Getting console for vm %s", vm)
484

    
485
    console = {}
486
    console['kind'] = 'vnc'
487

    
488
    with pooled_rapi_client(vm) as client:
489
        i = client.GetInstance(vm.backend_vm_id)
490

    
491
    if vm.backend.hypervisor == "kvm" and i['hvparams']['serial_console']:
492
        raise Exception("hv parameter serial_console cannot be true")
493
    console['host'] = i['pnode']
494
    console['port'] = i['network_port']
495

    
496
    return console
497

    
498

    
499
def get_instance_info(vm):
500
    with pooled_rapi_client(vm) as client:
501
        return client.GetInstance(vm.backend_vm_id)
502

    
503

    
504
def vm_exists_in_backend(vm):
505
    try:
506
        get_instance_info(vm)
507
        return True
508
    except GanetiApiError as e:
509
        if e.code == 404:
510
            return False
511
        raise e
512

    
513

    
514
def get_network_info(backend_network):
515
    with pooled_rapi_client(backend_network) as client:
516
        return client.GetNetwork(backend_network.network.backend_id)
517

    
518

    
519
def network_exists_in_backend(backend_network):
520
    try:
521
        get_network_info(backend_network)
522
        return True
523
    except GanetiApiError as e:
524
        if e.code == 404:
525
            return False
526

    
527

    
528
def job_is_still_running(vm):
529
    with pooled_rapi_client(vm) as c:
530
        try:
531
            job_info = c.GetJobStatus(vm.backendjobid)
532
            return not (job_info["status"] in JOB_STATUS_FINALIZED)
533
        except GanetiApiError:
534
            return False
535

    
536

    
537
def create_network(network, backend, connect=True):
538
    """Create a network in a Ganeti backend"""
539
    log.debug("Creating network %s in backend %s", network, backend)
540

    
541
    job_id = _create_network(network, backend)
542

    
543
    if connect:
544
        job_ids = connect_network(network, backend, depends=[job_id])
545
        return job_ids
546
    else:
547
        return [job_id]
548

    
549

    
550
def _create_network(network, backend):
551
    """Create a network."""
552

    
553
    tags = network.backend_tag
554
    if network.dhcp:
555
        tags.append('nfdhcpd')
556

    
557
    if network.public:
558
        conflicts_check = True
559
    else:
560
        conflicts_check = False
561

    
562
    try:
563
        bn = BackendNetwork.objects.get(network=network, backend=backend)
564
        mac_prefix = bn.mac_prefix
565
    except BackendNetwork.DoesNotExist:
566
        raise Exception("BackendNetwork for network '%s' in backend '%s'"
567
                        " does not exist" % (network.id, backend.id))
568

    
569
    with pooled_rapi_client(backend) as client:
570
        return client.CreateNetwork(network_name=network.backend_id,
571
                                    network=network.subnet,
572
                                    network6=network.subnet6,
573
                                    gateway=network.gateway,
574
                                    gateway6=network.gateway6,
575
                                    mac_prefix=mac_prefix,
576
                                    conflicts_check=conflicts_check,
577
                                    tags=tags)
578

    
579

    
580
def connect_network(network, backend, depends=[], group=None):
581
    """Connect a network to nodegroups."""
582
    log.debug("Connecting network %s to backend %s", network, backend)
583

    
584
    if network.public:
585
        conflicts_check = True
586
    else:
587
        conflicts_check = False
588

    
589
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
590
    with pooled_rapi_client(backend) as client:
591
        groups = [group] if group is not None else client.GetGroups()
592
        job_ids = []
593
        for group in groups:
594
            job_id = client.ConnectNetwork(network.backend_id, group,
595
                                           network.mode, network.link,
596
                                           conflicts_check,
597
                                           depends=depends)
598
            job_ids.append(job_id)
599
    return job_ids
600

    
601

    
602
def delete_network(network, backend, disconnect=True):
603
    log.debug("Deleting network %s from backend %s", network, backend)
604

    
605
    depends = []
606
    if disconnect:
607
        depends = disconnect_network(network, backend)
608
    _delete_network(network, backend, depends=depends)
609

    
610

    
611
def _delete_network(network, backend, depends=[]):
612
    depends = [[job, ["success", "error", "canceled"]] for job in depends]
613
    with pooled_rapi_client(backend) as client:
614
        return client.DeleteNetwork(network.backend_id, depends)
615

    
616

    
617
def disconnect_network(network, backend, group=None):
618
    log.debug("Disconnecting network %s to backend %s", network, backend)
619

    
620
    with pooled_rapi_client(backend) as client:
621
        groups = [group] if group is not None else client.GetGroups()
622
        job_ids = []
623
        for group in groups:
624
            job_id = client.DisconnectNetwork(network.backend_id, group)
625
            job_ids.append(job_id)
626
    return job_ids
627

    
628

    
629
def connect_to_network(vm, network, address=None):
630
    backend = vm.backend
631
    network = Network.objects.select_for_update().get(id=network.id)
632
    bnet, created = BackendNetwork.objects.get_or_create(backend=backend,
633
                                                         network=network)
634
    depend_jobs = []
635
    if bnet.operstate != "ACTIVE":
636
        depend_jobs = create_network(network, backend, connect=True)
637

    
638
    depends = [[job, ["success", "error", "canceled"]] for job in depend_jobs]
639

    
640
    nic = {'ip': address, 'network': network.backend_id}
641

    
642
    log.debug("Connecting vm %s to network %s(%s)", vm, network, address)
643

    
644
    with pooled_rapi_client(vm) as client:
645
        return client.ModifyInstance(vm.backend_vm_id, nics=[('add',  nic)],
646
                                     hotplug=vm.backend.use_hotplug(),
647
                                     depends=depends,
648
                                     dry_run=settings.TEST)
649

    
650

    
651
def disconnect_from_network(vm, nic):
652
    op = [('remove', nic.index, {})]
653

    
654
    log.debug("Removing nic of VM %s, with index %s", vm, str(nic.index))
655

    
656
    with pooled_rapi_client(vm) as client:
657
        return client.ModifyInstance(vm.backend_vm_id, nics=op,
658
                                     hotplug=vm.backend.use_hotplug(),
659
                                     dry_run=settings.TEST)
660

    
661

    
662
def set_firewall_profile(vm, profile):
663
    try:
664
        tag = _firewall_tags[profile]
665
    except KeyError:
666
        raise ValueError("Unsopported Firewall Profile: %s" % profile)
667

    
668
    log.debug("Setting tag of VM %s to %s", vm, profile)
669

    
670
    with pooled_rapi_client(vm) as client:
671
        # Delete all firewall tags
672
        for t in _firewall_tags.values():
673
            client.DeleteInstanceTags(vm.backend_vm_id, [t],
674
                                      dry_run=settings.TEST)
675

    
676
        client.AddInstanceTags(vm.backend_vm_id, [tag], dry_run=settings.TEST)
677

    
678
        # XXX NOP ModifyInstance call to force process_net_status to run
679
        # on the dispatcher
680
        os_name = settings.GANETI_CREATEINSTANCE_KWARGS['os']
681
        client.ModifyInstance(vm.backend_vm_id,
682
                              os_name=os_name)
683

    
684

    
685
def get_ganeti_instances(backend=None, bulk=False):
686
    instances = []
687
    for backend in get_backends(backend):
688
        with pooled_rapi_client(backend) as client:
689
            instances.append(client.GetInstances(bulk=bulk))
690

    
691
    return reduce(list.__add__, instances, [])
692

    
693

    
694
def get_ganeti_nodes(backend=None, bulk=False):
695
    nodes = []
696
    for backend in get_backends(backend):
697
        with pooled_rapi_client(backend) as client:
698
            nodes.append(client.GetNodes(bulk=bulk))
699

    
700
    return reduce(list.__add__, nodes, [])
701

    
702

    
703
def get_ganeti_jobs(backend=None, bulk=False):
704
    jobs = []
705
    for backend in get_backends(backend):
706
        with pooled_rapi_client(backend) as client:
707
            jobs.append(client.GetJobs(bulk=bulk))
708
    return reduce(list.__add__, jobs, [])
709

    
710
##
711
##
712
##
713

    
714

    
715
def get_backends(backend=None):
716
    if backend:
717
        if backend.offline:
718
            return []
719
        return [backend]
720
    return Backend.objects.filter(offline=False)
721

    
722

    
723
def get_physical_resources(backend):
724
    """ Get the physical resources of a backend.
725

726
    Get the resources of a backend as reported by the backend (not the db).
727

728
    """
729
    nodes = get_ganeti_nodes(backend, bulk=True)
730
    attr = ['mfree', 'mtotal', 'dfree', 'dtotal', 'pinst_cnt', 'ctotal']
731
    res = {}
732
    for a in attr:
733
        res[a] = 0
734
    for n in nodes:
735
        # Filter out drained, offline and not vm_capable nodes since they will
736
        # not take part in the vm allocation process
737
        can_host_vms = n['vm_capable'] and not (n['drained'] or n['offline'])
738
        if can_host_vms and n['cnodes']:
739
            for a in attr:
740
                res[a] += int(n[a])
741
    return res
742

    
743

    
744
def update_resources(backend, resources=None):
745
    """ Update the state of the backend resources in db.
746

747
    """
748

    
749
    if not resources:
750
        resources = get_physical_resources(backend)
751

    
752
    backend.mfree = resources['mfree']
753
    backend.mtotal = resources['mtotal']
754
    backend.dfree = resources['dfree']
755
    backend.dtotal = resources['dtotal']
756
    backend.pinst_cnt = resources['pinst_cnt']
757
    backend.ctotal = resources['ctotal']
758
    backend.updated = datetime.now()
759
    backend.save()
760

    
761

    
762
def get_memory_from_instances(backend):
763
    """ Get the memory that is used from instances.
764

765
    Get the used memory of a backend. Note: This is different for
766
    the real memory used, due to kvm's memory de-duplication.
767

768
    """
769
    with pooled_rapi_client(backend) as client:
770
        instances = client.GetInstances(bulk=True)
771
    mem = 0
772
    for i in instances:
773
        mem += i['oper_ram']
774
    return mem
775

    
776
##
777
## Synchronized operations for reconciliation
778
##
779

    
780

    
781
def create_network_synced(network, backend):
782
    result = _create_network_synced(network, backend)
783
    if result[0] != 'success':
784
        return result
785
    result = connect_network_synced(network, backend)
786
    return result
787

    
788

    
789
def _create_network_synced(network, backend):
790
    with pooled_rapi_client(backend) as client:
791
        job = _create_network(network, backend)
792
        result = wait_for_job(client, job)
793
    return result
794

    
795

    
796
def connect_network_synced(network, backend):
797
    with pooled_rapi_client(backend) as client:
798
        for group in client.GetGroups():
799
            job = client.ConnectNetwork(network.backend_id, group,
800
                                        network.mode, network.link)
801
            result = wait_for_job(client, job)
802
            if result[0] != 'success':
803
                return result
804

    
805
    return result
806

    
807

    
808
def wait_for_job(client, jobid):
809
    result = client.WaitForJobChange(jobid, ['status', 'opresult'], None, None)
810
    status = result['job_info'][0]
811
    while status not in ['success', 'error', 'cancel']:
812
        result = client.WaitForJobChange(jobid, ['status', 'opresult'],
813
                                         [result], None)
814
        status = result['job_info'][0]
815

    
816
    if status == 'success':
817
        return (status, None)
818
    else:
819
        error = result['job_info'][1]
820
        return (status, error)