Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / quotas / __init__.py @ ba777b02

History | View | Annotate | Download (13.8 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
from django.utils import simplejson as json
31
from django.db import transaction
32

    
33
from snf_django.lib.api import faults
34
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
35
                               IPAddress)
36

    
37
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
38
                              ASTAKOS_AUTH_URL)
39
from astakosclient import AstakosClient
40
from astakosclient import errors
41

    
42
import logging
43
log = logging.getLogger(__name__)
44

    
45

    
46
QUOTABLE_RESOURCES = [VirtualMachine, Network, IPAddress]
47

    
48

    
49
DEFAULT_SOURCE = 'system'
50
RESOURCES = [
51
    "cyclades.vm",
52
    "cyclades.total_cpu",
53
    "cyclades.cpu",
54
    "cyclades.disk",
55
    "cyclades.total_ram",
56
    "cyclades.ram",
57
    "cyclades.network.private",
58
    "cyclades.floating_ip",
59
]
60

    
61

    
62
class Quotaholder(object):
63
    _object = None
64

    
65
    @classmethod
66
    def get(cls):
67
        if cls._object is None:
68
            cls._object = AstakosClient(ASTAKOS_TOKEN,
69
                                        ASTAKOS_AUTH_URL,
70
                                        use_pool=True,
71
                                        retry=3,
72
                                        logger=log)
73
        return cls._object
74

    
75

    
76
class AstakosClientExceptionHandler(object):
77
    def __init__(self, *args, **kwargs):
78
        pass
79

    
80
    def __enter__(self):
81
        pass
82

    
83
    def __exit__(self, exc_type, value, traceback):
84
        if value is not None:  # exception
85
            if not isinstance(value, errors.AstakosClientException):
86
                return False  # reraise
87
            if exc_type is errors.QuotaLimit:
88
                msg, details = render_overlimit_exception(value)
89
                raise faults.OverLimit(msg, details=details)
90

    
91
            log.exception("Unexpected error %s" % value.message)
92
            raise faults.InternalServerError("Unexpected error")
93

    
94

    
95
def issue_commission(resource, action, name="", force=False, auto_accept=False,
96
                     action_fields=None):
97
    """Issue a new commission to the quotaholder.
98

99
    Issue a new commission to the quotaholder, and create the
100
    corresponing QuotaHolderSerial object in DB.
101

102
    """
103

    
104
    provisions = get_commission_info(resource=resource, action=action,
105
                                     action_fields=action_fields)
106

    
107
    if provisions is None:
108
        return None
109

    
110
    user = resource.userid
111
    source = DEFAULT_SOURCE
112

    
113
    qh = Quotaholder.get()
114
    if True:  # placeholder
115
        with AstakosClientExceptionHandler():
116
            serial = qh.issue_one_commission(user, source,
117
                                             provisions, name=name,
118
                                             force=force,
119
                                             auto_accept=auto_accept)
120

    
121
    if not serial:
122
        raise Exception("No serial")
123

    
124
    serial_info = {"serial": serial}
125
    if auto_accept:
126
        serial_info["pending"] = False
127
        serial_info["accept"] = True
128
        serial_info["resolved"] = True
129

    
130
    serial = QuotaHolderSerial.objects.create(**serial_info)
131

    
132
    # Correlate the serial with the resource. Resolved serials are not
133
    # attached to resources
134
    if not auto_accept:
135
        resource.serial = serial
136
        resource.save()
137

    
138
    return serial
139

    
140

    
141
def accept_resource_serial(resource, strict=True):
142
    serial = resource.serial
143
    assert serial.pending or serial.accept, "%s can't be accepted" % serial
144
    log.debug("Accepting serial %s of resource %s", serial, resource)
145
    _resolve_commissions(accept=[serial.serial], strict=strict)
146
    resource.serial = None
147
    resource.save()
148
    return resource
149

    
150

    
151
def reject_resource_serial(resource, strict=True):
152
    serial = resource.serial
153
    assert serial.pending or not serial.accept, "%s can't be rejected" % serial
154
    log.debug("Rejecting serial %s of resource %s", serial, resource)
155
    _resolve_commissions(reject=[serial.serial], strict=strict)
156
    resource.serial = None
157
    resource.save()
158
    return resource
159

    
160

    
161
def _resolve_commissions(accept=None, reject=None, strict=True):
162
    if accept is None:
163
        accept = []
164
    if reject is None:
165
        reject = []
166

    
167
    qh = Quotaholder.get()
168
    with AstakosClientExceptionHandler():
169
        response = qh.resolve_commissions(accept, reject)
170

    
171
    accepted = response.get("accepted", [])
172
    rejected = response.get("rejected", [])
173

    
174
    if accepted:
175
        QuotaHolderSerial.objects.filter(serial__in=accepted).update(
176
            accept=True, pending=False, resolved=True)
177
    if rejected:
178
        QuotaHolderSerial.objects.filter(serial__in=rejected).update(
179
            accept=False, pending=False, resolved=True)
180

    
181
    if strict:
182
        failed = response["failed"]
183
        if failed:
184
            log.error("Unexpected error while resolving commissions: %s",
185
                      failed)
186

    
187
    return response
188

    
189

    
190
def reconcile_resolve_commissions(accept=None, reject=None, strict=True):
191
    response = _resolve_commissions(accept=accept,
192
                                    reject=reject,
193
                                    strict=strict)
194
    affected = response.get("accepted", []) + response.get("rejected", [])
195
    for resource in QUOTABLE_RESOURCES:
196
        resource.objects.filter(serial__in=affected).update(serial=None)
197

    
198

    
199
def resolve_pending_commissions():
200
    """Resolve quotaholder pending commissions.
201

202
    Get pending commissions from the quotaholder and resolve them
203
    to accepted and rejected, according to the state of the
204
    QuotaHolderSerial DB table. A pending commission in the quotaholder
205
    can exist in the QuotaHolderSerial table and be either accepted or
206
    rejected, or cannot exist in this table, so it is rejected.
207

208
    """
209

    
210
    qh_pending = get_quotaholder_pending()
211
    if not qh_pending:
212
        return ([], [])
213

    
214
    qh_pending.sort()
215
    min_ = qh_pending[0]
216

    
217
    serials = QuotaHolderSerial.objects.filter(serial__gte=min_, pending=False)
218
    accepted = serials.filter(accept=True).values_list('serial', flat=True)
219
    accepted = filter(lambda x: x in qh_pending, accepted)
220

    
221
    rejected = list(set(qh_pending) - set(accepted))
222

    
223
    return (accepted, rejected)
224

    
225

    
226
def get_quotaholder_pending():
227
    qh = Quotaholder.get()
228
    pending_serials = qh.get_pending_commissions()
229
    return pending_serials
230

    
231

    
232
def render_overlimit_exception(e):
233
    resource_name = {"vm": "Virtual Machine",
234
                     "cpu": "CPU",
235
                     "ram": "RAM",
236
                     "network.private": "Private Network",
237
                     "floating_ip": "Floating IP address"}
238
    details = json.loads(e.details)
239
    data = details['overLimit']['data']
240
    usage = data["usage"]
241
    limit = data["limit"]
242
    available = limit - usage
243
    provision = data['provision']
244
    requested = provision['quantity']
245
    resource = provision['resource']
246
    res = resource.replace("cyclades.", "", 1)
247
    try:
248
        resource = resource_name[res]
249
    except KeyError:
250
        resource = res
251

    
252
    msg = "Resource Limit Exceeded for your account."
253
    details = "Limit for resource '%s' exceeded for your account."\
254
              " Available: %s, Requested: %s"\
255
              % (resource, available, requested)
256
    return msg, details
257

    
258

    
259
@transaction.commit_on_success
260
def issue_and_accept_commission(resource, action="BUILD", action_fields=None):
261
    """Issue and accept a commission to Quotaholder.
262

263
    This function implements the Commission workflow, and must be called
264
    exactly after and in the same transaction that created/updated the
265
    resource. The workflow that implements is the following:
266
    0) Resolve previous unresolved commission if exists
267
    1) Issue commission, get a serial and correlate it with the resource
268
    2) Store the serial in DB as a serial to accept
269
    3) COMMIT!
270
    4) Accept commission to QH
271

272
    """
273
    commission_reason = ("client: api, resource: %s, action: %s"
274
                         % (resource, action))
275
    serial = handle_resource_commission(resource=resource, action=action,
276
                                        action_fields=action_fields,
277
                                        commission_name=commission_reason)
278

    
279
    if serial is None:
280
        return
281

    
282
    # Mark the serial as one to accept and associate it with the resource
283
    serial.pending = False
284
    serial.accept = True
285
    serial.save()
286
    transaction.commit()
287

    
288
    try:
289
        # Accept the commission to quotaholder
290
        accept_resource_serial(resource)
291
    except:
292
        # Do not crash if we can not accept commission to Quotaholder. Quotas
293
        # have already been reserved and the resource already exists in DB.
294
        # Just log the error
295
        log.exception("Failed to accept commission: %s", resource.serial)
296

    
297

    
298
def get_commission_info(resource, action, action_fields=None):
299
    if isinstance(resource, VirtualMachine):
300
        flavor = resource.flavor
301
        resources = {"cyclades.vm": 1,
302
                     "cyclades.total_cpu": flavor.cpu,
303
                     "cyclades.disk": 1073741824 * flavor.disk,
304
                     "cyclades.total_ram": 1048576 * flavor.ram}
305
        online_resources = {"cyclades.cpu": flavor.cpu,
306
                            "cyclades.ram": 1048576 * flavor.ram}
307
        if action == "BUILD":
308
            resources.update(online_resources)
309
            return resources
310
        if action == "START":
311
            if resource.operstate == "STOPPED":
312
                return online_resources
313
            else:
314
                return None
315
        elif action == "STOP":
316
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
317
                return reverse_quantities(online_resources)
318
            else:
319
                return None
320
        elif action == "REBOOT":
321
            if resource.operstate == "STOPPED":
322
                return online_resources
323
            else:
324
                return None
325
        elif action == "DESTROY":
326
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
327
                resources.update(online_resources)
328
            return reverse_quantities(resources)
329
        elif action == "RESIZE" and action_fields:
330
            beparams = action_fields.get("beparams")
331
            cpu = beparams.get("vcpus", flavor.cpu)
332
            ram = beparams.get("maxmem", flavor.ram)
333
            return {"cyclades.total_cpu": cpu - flavor.cpu,
334
                    "cyclades.total_ram": 1048576 * (ram - flavor.ram)}
335
        else:
336
            #["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
337
            return None
338
    elif isinstance(resource, Network):
339
        resources = {"cyclades.network.private": 1}
340
        if action == "BUILD":
341
            return resources
342
        elif action == "DESTROY":
343
            return reverse_quantities(resources)
344
    elif isinstance(resource, IPAddress):
345
        if resource.floating_ip:
346
            resources = {"cyclades.floating_ip": 1}
347
            if action == "BUILD":
348
                return resources
349
            elif action == "DESTROY":
350
                return reverse_quantities(resources)
351
        else:
352
            return None
353

    
354

    
355
def reverse_quantities(resources):
356
    return dict((r, -s) for r, s in resources.items())
357

    
358

    
359
def handle_resource_commission(resource, action, commission_name,
360
                               force=False, auto_accept=False,
361
                               action_fields=None):
362
    """Handle a issuing of a commission for a resource.
363

364
    Create a new commission for a resource based on the action that
365
    is performed. If the resource has a previous pending commission,
366
    resolved it before issuing the new one.
367

368
    """
369
    # Try to resolve previous serial:
370
    # If action is DESTROY, we must always reject the previous commission,
371
    # since multiple DESTROY actions are allowed in the same resource (e.g. VM)
372
    # The one who succeeds will be finally accepted, and all other will be
373
    # rejected
374
    force = force or (action == "DESTROY")
375
    resolve_resource_commission(resource, force=force)
376

    
377
    serial = issue_commission(resource, action, name=commission_name,
378
                              force=force, auto_accept=auto_accept,
379
                              action_fields=action_fields)
380
    return serial
381

    
382

    
383
class ResolveError(Exception):
384
    pass
385

    
386

    
387
def resolve_resource_commission(resource, force=False):
388
    serial = resource.serial
389
    if serial is None or serial.resolved:
390
        return
391
    if serial.pending and not force:
392
        m = "Could not resolve commission: serial %s is undecided" % serial
393
        raise ResolveError(m)
394
    log.warning("Resolving pending commission: %s", serial)
395
    if not serial.pending and serial.accept:
396
        accept_resource_serial(resource)
397
    else:
398
        reject_resource_serial(resource)