Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / quotas / __init__.py @ d05e5324

History | View | Annotate | Download (15.5 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
from django.utils import simplejson as json
31
from django.db import transaction
32
from django.db.models import Sum
33

    
34
from snf_django.lib.api import faults
35
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
36
                               IPAddress, Volume)
37

    
38
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
39
                              ASTAKOS_AUTH_URL)
40
from astakosclient import AstakosClient
41
from astakosclient import errors
42
from synnefo.logic.utils import id_from_disk_name
43

    
44
import logging
45
log = logging.getLogger(__name__)
46

    
47

    
48
QUOTABLE_RESOURCES = [VirtualMachine, Network, IPAddress]
49

    
50

    
51
DEFAULT_SOURCE = 'system'
52
RESOURCES = [
53
    "cyclades.vm",
54
    "cyclades.total_cpu",
55
    "cyclades.cpu",
56
    "cyclades.disk",
57
    "cyclades.total_ram",
58
    "cyclades.ram",
59
    "cyclades.network.private",
60
    "cyclades.floating_ip",
61
]
62

    
63

    
64
class Quotaholder(object):
65
    _object = None
66

    
67
    @classmethod
68
    def get(cls):
69
        if cls._object is None:
70
            cls._object = AstakosClient(ASTAKOS_TOKEN,
71
                                        ASTAKOS_AUTH_URL,
72
                                        use_pool=True,
73
                                        retry=3,
74
                                        logger=log)
75
        return cls._object
76

    
77

    
78
class AstakosClientExceptionHandler(object):
79
    def __init__(self, *args, **kwargs):
80
        pass
81

    
82
    def __enter__(self):
83
        pass
84

    
85
    def __exit__(self, exc_type, value, traceback):
86
        if value is not None:  # exception
87
            if not isinstance(value, errors.AstakosClientException):
88
                return False  # reraise
89
            if exc_type is errors.QuotaLimit:
90
                msg, details = render_overlimit_exception(value)
91
                raise faults.OverLimit(msg, details=details)
92

    
93
            log.exception("Unexpected error %s" % value.message)
94
            raise faults.InternalServerError("Unexpected error")
95

    
96

    
97
def issue_commission(resource, action, name="", force=False, auto_accept=False,
98
                     action_fields=None):
99
    """Issue a new commission to the quotaholder.
100

101
    Issue a new commission to the quotaholder, and create the
102
    corresponing QuotaHolderSerial object in DB.
103

104
    """
105

    
106
    provisions = get_commission_info(resource=resource, action=action,
107
                                     action_fields=action_fields)
108

    
109
    if provisions is None:
110
        return None
111

    
112
    user = resource.userid
113
    source = DEFAULT_SOURCE
114

    
115
    qh = Quotaholder.get()
116
    if True:  # placeholder
117
        with AstakosClientExceptionHandler():
118
            serial = qh.issue_one_commission(user, source,
119
                                             provisions, name=name,
120
                                             force=force,
121
                                             auto_accept=auto_accept)
122

    
123
    if not serial:
124
        raise Exception("No serial")
125

    
126
    serial_info = {"serial": serial}
127
    if auto_accept:
128
        serial_info["pending"] = False
129
        serial_info["accept"] = True
130
        serial_info["resolved"] = True
131

    
132
    serial = QuotaHolderSerial.objects.create(**serial_info)
133

    
134
    # Correlate the serial with the resource. Resolved serials are not
135
    # attached to resources
136
    if not auto_accept:
137
        resource.serial = serial
138
        resource.save()
139

    
140
    return serial
141

    
142

    
143
def accept_resource_serial(resource, strict=True):
144
    serial = resource.serial
145
    assert serial.pending or serial.accept, "%s can't be accepted" % serial
146
    log.debug("Accepting serial %s of resource %s", serial, resource)
147
    _resolve_commissions(accept=[serial.serial], strict=strict)
148
    resource.serial = None
149
    resource.save()
150
    return resource
151

    
152

    
153
def reject_resource_serial(resource, strict=True):
154
    serial = resource.serial
155
    assert serial.pending or not serial.accept, "%s can't be rejected" % serial
156
    log.debug("Rejecting serial %s of resource %s", serial, resource)
157
    _resolve_commissions(reject=[serial.serial], strict=strict)
158
    resource.serial = None
159
    resource.save()
160
    return resource
161

    
162

    
163
def _resolve_commissions(accept=None, reject=None, strict=True):
164
    if accept is None:
165
        accept = []
166
    if reject is None:
167
        reject = []
168

    
169
    qh = Quotaholder.get()
170
    with AstakosClientExceptionHandler():
171
        response = qh.resolve_commissions(accept, reject)
172

    
173
    accepted = response.get("accepted", [])
174
    rejected = response.get("rejected", [])
175

    
176
    if accepted:
177
        QuotaHolderSerial.objects.filter(serial__in=accepted).update(
178
            accept=True, pending=False, resolved=True)
179
    if rejected:
180
        QuotaHolderSerial.objects.filter(serial__in=rejected).update(
181
            accept=False, pending=False, resolved=True)
182

    
183
    if strict:
184
        failed = response["failed"]
185
        if failed:
186
            log.error("Unexpected error while resolving commissions: %s",
187
                      failed)
188

    
189
    return response
190

    
191

    
192
def reconcile_resolve_commissions(accept=None, reject=None, strict=True):
193
    response = _resolve_commissions(accept=accept,
194
                                    reject=reject,
195
                                    strict=strict)
196
    affected = response.get("accepted", []) + response.get("rejected", [])
197
    for resource in QUOTABLE_RESOURCES:
198
        resource.objects.filter(serial__in=affected).update(serial=None)
199

    
200

    
201
def resolve_pending_commissions():
202
    """Resolve quotaholder pending commissions.
203

204
    Get pending commissions from the quotaholder and resolve them
205
    to accepted and rejected, according to the state of the
206
    QuotaHolderSerial DB table. A pending commission in the quotaholder
207
    can exist in the QuotaHolderSerial table and be either accepted or
208
    rejected, or cannot exist in this table, so it is rejected.
209

210
    """
211

    
212
    qh_pending = get_quotaholder_pending()
213
    if not qh_pending:
214
        return ([], [])
215

    
216
    qh_pending.sort()
217
    min_ = qh_pending[0]
218

    
219
    serials = QuotaHolderSerial.objects.filter(serial__gte=min_, pending=False)
220
    accepted = serials.filter(accept=True).values_list('serial', flat=True)
221
    accepted = filter(lambda x: x in qh_pending, accepted)
222

    
223
    rejected = list(set(qh_pending) - set(accepted))
224

    
225
    return (accepted, rejected)
226

    
227

    
228
def get_quotaholder_pending():
229
    qh = Quotaholder.get()
230
    pending_serials = qh.get_pending_commissions()
231
    return pending_serials
232

    
233

    
234
def render_overlimit_exception(e):
235
    resource_name = {"vm": "Virtual Machine",
236
                     "cpu": "CPU",
237
                     "ram": "RAM",
238
                     "network.private": "Private Network",
239
                     "floating_ip": "Floating IP address"}
240
    details = json.loads(e.details)
241
    data = details['overLimit']['data']
242
    usage = data["usage"]
243
    limit = data["limit"]
244
    available = limit - usage
245
    provision = data['provision']
246
    requested = provision['quantity']
247
    resource = provision['resource']
248
    res = resource.replace("cyclades.", "", 1)
249
    try:
250
        resource = resource_name[res]
251
    except KeyError:
252
        resource = res
253

    
254
    msg = "Resource Limit Exceeded for your account."
255
    details = "Limit for resource '%s' exceeded for your account."\
256
              " Available: %s, Requested: %s"\
257
              % (resource, available, requested)
258
    return msg, details
259

    
260

    
261
@transaction.commit_on_success
262
def issue_and_accept_commission(resource, action="BUILD", action_fields=None):
263
    """Issue and accept a commission to Quotaholder.
264

265
    This function implements the Commission workflow, and must be called
266
    exactly after and in the same transaction that created/updated the
267
    resource. The workflow that implements is the following:
268
    0) Resolve previous unresolved commission if exists
269
    1) Issue commission, get a serial and correlate it with the resource
270
    2) Store the serial in DB as a serial to accept
271
    3) COMMIT!
272
    4) Accept commission to QH
273

274
    """
275
    commission_reason = ("client: api, resource: %s, action: %s"
276
                         % (resource, action))
277
    serial = handle_resource_commission(resource=resource, action=action,
278
                                        action_fields=action_fields,
279
                                        commission_name=commission_reason)
280

    
281
    if serial is None:
282
        return
283

    
284
    # Mark the serial as one to accept and associate it with the resource
285
    serial.pending = False
286
    serial.accept = True
287
    serial.save()
288
    transaction.commit()
289

    
290
    try:
291
        # Accept the commission to quotaholder
292
        accept_resource_serial(resource)
293
    except:
294
        # Do not crash if we can not accept commission to Quotaholder. Quotas
295
        # have already been reserved and the resource already exists in DB.
296
        # Just log the error
297
        log.exception("Failed to accept commission: %s", resource.serial)
298

    
299

    
300
def get_commission_info(resource, action, action_fields=None):
301
    if isinstance(resource, VirtualMachine):
302
        flavor = resource.flavor
303
        resources = {"cyclades.vm": 1,
304
                     "cyclades.total_cpu": flavor.cpu,
305
                     "cyclades.total_ram": flavor.ram << 20}
306
        online_resources = {"cyclades.cpu": flavor.cpu,
307
                            "cyclades.ram": flavor.ram << 20}
308
        if action == "BUILD":
309
            new_volumes = resource.volumes.filter(status="CREATING")
310
            new_volumes_size = new_volumes.aggregate(Sum("size"))["size__sum"]
311
            resources["cyclades.disk"] = new_volumes_size << 30
312
            resources.update(online_resources)
313
            return resources
314
        if action == "START":
315
            if resource.operstate == "STOPPED":
316
                return online_resources
317
            else:
318
                return None
319
        elif action == "STOP":
320
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
321
                return reverse_quantities(online_resources)
322
            else:
323
                return None
324
        elif action == "REBOOT":
325
            if resource.operstate == "STOPPED":
326
                return online_resources
327
            else:
328
                return None
329
        elif action == "DESTROY":
330
            volumes = resource.volumes.filter(deleted=False)
331
            volumes_size = volumes.aggregate(Sum("size"))["size__sum"]
332
            resources["cyclades.disk"] = volumes_size << 30
333
            resources.update(online_resources)
334
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
335
                resources.update(online_resources)
336
            return reverse_quantities(resources)
337
        elif action == "RESIZE" and action_fields:
338
            beparams = action_fields.get("beparams")
339
            cpu = beparams.get("vcpus", flavor.cpu)
340
            ram = beparams.get("maxmem", flavor.ram)
341
            return {"cyclades.total_cpu": cpu - flavor.cpu,
342
                    "cyclades.total_ram": (ram - flavor.ram) << 20}
343
        elif action in ["ATTACH_VOLUME", "DETACH_VOLUME"]:
344
            if action_fields is not None:
345
                volumes_changes = action_fields.get("disks")
346
                if volumes_changes is not None:
347
                    size_delta = get_volumes_size_delta(volumes_changes)
348
                    if size_delta:
349
                        return {"cyclades.disk": size_delta << 30}
350
        else:
351
            #["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
352
            return None
353
    elif isinstance(resource, Network):
354
        resources = {"cyclades.network.private": 1}
355
        if action == "BUILD":
356
            return resources
357
        elif action == "DESTROY":
358
            return reverse_quantities(resources)
359
    elif isinstance(resource, IPAddress):
360
        if resource.floating_ip:
361
            resources = {"cyclades.floating_ip": 1}
362
            if action == "BUILD":
363
                return resources
364
            elif action == "DESTROY":
365
                return reverse_quantities(resources)
366
        else:
367
            return None
368
    elif isinstance(resource, Volume):
369
        size = resource.size
370
        resources = {"cyclades.disk": size << 30}
371
        if resource.status == "CREATING" and action == "BUILD":
372
            return resources
373
        elif action == "DESTROY":
374
            reverse_quantities(resources)
375
        else:
376
            return None
377

    
378

    
379
def get_volumes_size_delta(volumes_changes):
380
    """Compute the total change in the size of volumes"""
381
    size_delta = 0
382
    for vchange in volumes_changes:
383
        action, db_volume, info = vchange
384
        if action == "add":
385
            size_delta += int(db_volume.size)
386
        elif action == "remove":
387
            size_delta -= int(db_volume.size)
388
        elif action == "modify":
389
            size_delta += info.get("size_delta", 0)
390
        else:
391
            raise ValueError("Unknwon volume action '%s'" % action)
392
    return size_delta
393

    
394

    
395
def reverse_quantities(resources):
396
    return dict((r, -s) for r, s in resources.items())
397

    
398

    
399
def handle_resource_commission(resource, action, commission_name,
400
                               force=False, auto_accept=False,
401
                               action_fields=None):
402
    """Handle a issuing of a commission for a resource.
403

404
    Create a new commission for a resource based on the action that
405
    is performed. If the resource has a previous pending commission,
406
    resolved it before issuing the new one.
407

408
    """
409
    # Try to resolve previous serial:
410
    # If action is DESTROY, we must always reject the previous commission,
411
    # since multiple DESTROY actions are allowed in the same resource (e.g. VM)
412
    # The one who succeeds will be finally accepted, and all other will be
413
    # rejected
414
    force = force or (action == "DESTROY")
415
    resolve_resource_commission(resource, force=force)
416

    
417
    serial = issue_commission(resource, action, name=commission_name,
418
                              force=force, auto_accept=auto_accept,
419
                              action_fields=action_fields)
420
    return serial
421

    
422

    
423
class ResolveError(Exception):
424
    pass
425

    
426

    
427
def resolve_resource_commission(resource, force=False):
428
    serial = resource.serial
429
    if serial is None or serial.resolved:
430
        return
431
    if serial.pending and not force:
432
        m = "Could not resolve commission: serial %s is undecided" % serial
433
        raise ResolveError(m)
434
    log.warning("Resolving pending commission: %s", serial)
435
    if not serial.pending and serial.accept:
436
        accept_resource_serial(resource)
437
    else:
438
        reject_resource_serial(resource)