Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / quotas / __init__.py @ 22f54174

History | View | Annotate | Download (13.7 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
from django.utils import simplejson as json
31
from django.db import transaction
32

    
33
from snf_django.lib.api import faults
34
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
35
                               IPAddress)
36

    
37
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
38
                              ASTAKOS_AUTH_URL)
39
from astakosclient import AstakosClient
40
from astakosclient import errors
41

    
42
import logging
43
log = logging.getLogger(__name__)
44

    
45

    
46
QUOTABLE_RESOURCES = [VirtualMachine, Network, IPAddress]
47

    
48

    
49
DEFAULT_SOURCE = 'system'
50
RESOURCES = [
51
    "cyclades.vm",
52
    "cyclades.total_cpu",
53
    "cyclades.cpu",
54
    "cyclades.disk",
55
    "cyclades.total_ram",
56
    "cyclades.ram",
57
    "cyclades.network.private",
58
    "cyclades.floating_ip",
59
]
60

    
61

    
62
class Quotaholder(object):
63
    _object = None
64

    
65
    @classmethod
66
    def get(cls):
67
        if cls._object is None:
68
            cls._object = AstakosClient(ASTAKOS_TOKEN,
69
                                        ASTAKOS_AUTH_URL,
70
                                        use_pool=True,
71
                                        retry=3,
72
                                        logger=log)
73
        return cls._object
74

    
75

    
76
class AstakosClientExceptionHandler(object):
77
    def __init__(self, *args, **kwargs):
78
        pass
79

    
80
    def __enter__(self):
81
        pass
82

    
83
    def __exit__(self, exc_type, value, traceback):
84
        if value is not None:  # exception
85
            if not isinstance(value, errors.AstakosClientException):
86
                return False  # reraise
87
            if exc_type is errors.QuotaLimit:
88
                msg, details = render_overlimit_exception(value)
89
                raise faults.OverLimit(msg, details=details)
90

    
91
            log.exception("Unexpected error %s" % value.message)
92
            raise faults.InternalServerError("Unexpected error")
93

    
94

    
95
def issue_commission(resource, action, name="", force=False, auto_accept=False,
96
                     action_fields=None):
97
    """Issue a new commission to the quotaholder.
98

99
    Issue a new commission to the quotaholder, and create the
100
    corresponing QuotaHolderSerial object in DB.
101

102
    """
103

    
104
    provisions = get_commission_info(resource=resource, action=action,
105
                                     action_fields=action_fields)
106

    
107
    if provisions is None:
108
        return None
109

    
110
    user = resource.userid
111
    source = DEFAULT_SOURCE
112

    
113
    qh = Quotaholder.get()
114
    if True:  # placeholder
115
        with AstakosClientExceptionHandler():
116
            serial = qh.issue_one_commission(user, source,
117
                                             provisions, name=name,
118
                                             force=force,
119
                                             auto_accept=auto_accept)
120

    
121
    if not serial:
122
        raise Exception("No serial")
123

    
124
    serial = QuotaHolderSerial.objects.create(serial=serial)
125
    if auto_accept:
126
        serial.pending = False
127
        serial.accept = True
128
        serial.resolved = True
129
        serial.save()
130
    else:
131
        resource.serial = serial
132
        resource.save()
133
    return serial
134

    
135

    
136
def mark_accept_serial(serial):
137
    serial.pending = False
138
    serial.accept = True
139
    serial.save()
140

    
141

    
142
def accept_resource_serial(resource, strict=True):
143
    serial = resource.serial
144
    assert serial.pending or serial.accept, "%s can't be accepted" % serial
145
    response = _resolve_commissions(accept=[serial.serial], strict=strict)
146
    if serial.serial in response.get("accepted", []):
147
        resource.serial = None
148
        resource.save()
149

    
150

    
151
def reject_resource_serial(resource, strict=True):
152
    serial = resource.serial
153
    assert serial.pending or not serial.accept, "%s can't be rejected" % serial
154
    response = _resolve_commissions(reject=[serial.serial], strict=strict)
155
    if serial.serial in response.get("rejected", []):
156
        resource.serial = None
157
        resource.save()
158

    
159

    
160
def _resolve_commissions(accept=None, reject=None, strict=True):
161
    if accept is None:
162
        accept = []
163
    if reject is None:
164
        reject = []
165

    
166
    qh = Quotaholder.get()
167
    with AstakosClientExceptionHandler():
168
        response = qh.resolve_commissions(accept, reject)
169

    
170
    accepted = response.get("accepted", [])
171
    rejected = response.get("rejected", [])
172

    
173
    if accepted:
174
        QuotaHolderSerial.objects.filter(serial__in=accepted).update(
175
            accept=True, pending=False, resolved=True)
176
    if rejected:
177
        QuotaHolderSerial.objects.filter(serial__in=rejected).update(
178
            accept=False, pending=False, resolved=True)
179

    
180
    if strict:
181
        failed = response["failed"]
182
        if failed:
183
            log.error("Unexpected error while resolving commissions: %s",
184
                      failed)
185

    
186
    return response
187

    
188

    
189
def reconcile_resolve_commissions(accept=None, reject=None, strict=True):
190
    response = _resolve_commissions(accept=accept,
191
                                    reject=reject,
192
                                    strict=strict)
193
    affected = response.get("accepted", []) + response.get("rejected", [])
194
    for resource in QUOTABLE_RESOURCES:
195
        resource.objects.filter(serial__in=affected).update(serial=None)
196

    
197

    
198
def resolve_pending_commissions():
199
    """Resolve quotaholder pending commissions.
200

201
    Get pending commissions from the quotaholder and resolve them
202
    to accepted and rejected, according to the state of the
203
    QuotaHolderSerial DB table. A pending commission in the quotaholder
204
    can exist in the QuotaHolderSerial table and be either accepted or
205
    rejected, or cannot exist in this table, so it is rejected.
206

207
    """
208

    
209
    qh_pending = get_quotaholder_pending()
210
    if not qh_pending:
211
        return ([], [])
212

    
213
    qh_pending.sort()
214
    min_ = qh_pending[0]
215

    
216
    serials = QuotaHolderSerial.objects.filter(serial__gte=min_, pending=False)
217
    accepted = serials.filter(accept=True).values_list('serial', flat=True)
218
    accepted = filter(lambda x: x in qh_pending, accepted)
219

    
220
    rejected = list(set(qh_pending) - set(accepted))
221

    
222
    return (accepted, rejected)
223

    
224

    
225
def get_quotaholder_pending():
226
    qh = Quotaholder.get()
227
    pending_serials = qh.get_pending_commissions()
228
    return pending_serials
229

    
230

    
231
def render_overlimit_exception(e):
232
    resource_name = {"vm": "Virtual Machine",
233
                     "cpu": "CPU",
234
                     "ram": "RAM",
235
                     "network.private": "Private Network",
236
                     "floating_ip": "Floating IP address"}
237
    details = json.loads(e.details)
238
    data = details['overLimit']['data']
239
    usage = data["usage"]
240
    limit = data["limit"]
241
    available = limit - usage
242
    provision = data['provision']
243
    requested = provision['quantity']
244
    resource = provision['resource']
245
    res = resource.replace("cyclades.", "", 1)
246
    try:
247
        resource = resource_name[res]
248
    except KeyError:
249
        resource = res
250

    
251
    msg = "Resource Limit Exceeded for your account."
252
    details = "Limit for resource '%s' exceeded for your account."\
253
              " Available: %s, Requested: %s"\
254
              % (resource, available, requested)
255
    return msg, details
256

    
257

    
258
@transaction.commit_on_success
259
def issue_and_accept_commission(resource, action="BUILD", action_fields=None):
260
    """Issue and accept a commission to Quotaholder.
261

262
    This function implements the Commission workflow, and must be called
263
    exactly after and in the same transaction that created/updated the
264
    resource. The workflow that implements is the following:
265
    0) Resolve previous unresolved commission if exists
266
    1) Issue commission, get a serial and correlate it with the resource
267
    2) Store the serial in DB as a serial to accept
268
    3) COMMIT!
269
    4) Accept commission to QH
270

271
    """
272
    commission_reason = ("client: api, resource: %s, action: %s"
273
                         % (resource, action))
274
    handle_resource_commission(resource=resource, action=action,
275
                               action_fields=action_fields,
276
                               commission_name=commission_reason)
277

    
278
    # Mark the serial as one to accept and associate it with the resource
279
    mark_accept_serial(resource.serial)
280
    transaction.commit()
281

    
282
    try:
283
        # Accept the commission to quotaholder
284
        accept_resource_serial(resource)
285
    except:
286
        # Do not crash if we can not accept commission to Quotaholder. Quotas
287
        # have already been reserved and the resource already exists in DB.
288
        # Just log the error
289
        log.exception("Failed to accept commission: %s", resource.serial)
290

    
291

    
292
def get_commission_info(resource, action, action_fields=None):
293
    if isinstance(resource, VirtualMachine):
294
        flavor = resource.flavor
295
        resources = {"cyclades.vm": 1,
296
                     "cyclades.total_cpu": flavor.cpu,
297
                     "cyclades.disk": 1073741824 * flavor.disk,
298
                     "cyclades.total_ram": 1048576 * flavor.ram}
299
        online_resources = {"cyclades.cpu": flavor.cpu,
300
                            "cyclades.ram": 1048576 * flavor.ram}
301
        if action == "BUILD":
302
            resources.update(online_resources)
303
            return resources
304
        if action == "START":
305
            if resource.operstate == "STOPPED":
306
                return online_resources
307
            else:
308
                return None
309
        elif action == "STOP":
310
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
311
                return reverse_quantities(online_resources)
312
            else:
313
                return None
314
        elif action == "REBOOT":
315
            if resource.operstate == "STOPPED":
316
                return online_resources
317
            else:
318
                return None
319
        elif action == "DESTROY":
320
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
321
                resources.update(online_resources)
322
            return reverse_quantities(resources)
323
        elif action == "RESIZE" and action_fields:
324
            beparams = action_fields.get("beparams")
325
            cpu = beparams.get("vcpus", flavor.cpu)
326
            ram = beparams.get("maxmem", flavor.ram)
327
            return {"cyclades.total_cpu": cpu - flavor.cpu,
328
                    "cyclades.total_ram": 1048576 * (ram - flavor.ram)}
329
        else:
330
            #["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
331
            return None
332
    elif isinstance(resource, Network):
333
        resources = {"cyclades.network.private": 1}
334
        if action == "BUILD":
335
            return resources
336
        elif action == "DESTROY":
337
            return reverse_quantities(resources)
338
    elif isinstance(resource, IPAddress):
339
        if resource.floating_ip:
340
            resources = {"cyclades.floating_ip": 1}
341
            if action == "BUILD":
342
                return resources
343
            elif action == "DESTROY":
344
                return reverse_quantities(resources)
345
        else:
346
            return None
347

    
348

    
349
def reverse_quantities(resources):
350
    return dict((r, -s) for r, s in resources.items())
351

    
352

    
353
def handle_resource_commission(resource, action, commission_name,
354
                               force=False, auto_accept=False,
355
                               action_fields=None):
356
    """Handle a issuing of a commission for a resource.
357

358
    Create a new commission for a resource based on the action that
359
    is performed. If the resource has a previous pending commission,
360
    resolved it before issuing the new one.
361

362
    """
363
    # Try to resolve previous serial:
364
    # If action is DESTROY, we must always reject the previous commission,
365
    # since multiple DESTROY actions are allowed in the same resource (e.g. VM)
366
    # The one who succeeds will be finally accepted, and all other will be
367
    # rejected
368
    force = force or (action == "DESTROY")
369
    resolve_resource_commission(resource, force=force)
370

    
371
    serial = issue_commission(resource, action, name=commission_name,
372
                              force=force, auto_accept=auto_accept,
373
                              action_fields=action_fields)
374
    return serial
375

    
376

    
377
class ResolveError(Exception):
378
    pass
379

    
380

    
381
def resolve_resource_commission(resource, force=False):
382
    serial = resource.serial
383
    if serial is None or serial.resolved:
384
        return
385
    if serial.pending and not force:
386
        m = "Could not resolve commission: serial %s is undecided" % serial
387
        raise ResolveError(m)
388
    log.warning("Resolving pending commission: %s", serial)
389
    if not serial.pending and serial.accept:
390
        accept_resource_serial(resource)
391
    else:
392
        reject_resource_serial(resource)