Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / quotas / __init__.py @ 47c27955

History | View | Annotate | Download (13.8 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
from django.utils import simplejson as json
31
from django.db import transaction
32

    
33
from snf_django.lib.api import faults
34
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
35
                               IPAddress)
36

    
37
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
38
                              ASTAKOS_AUTH_URL)
39
from astakosclient import AstakosClient
40
from astakosclient import errors
41

    
42
import logging
43
log = logging.getLogger(__name__)
44

    
45

    
46
QUOTABLE_RESOURCES = [VirtualMachine, Network, IPAddress]
47

    
48

    
49
RESOURCES = [
50
    "cyclades.vm",
51
    "cyclades.total_cpu",
52
    "cyclades.cpu",
53
    "cyclades.disk",
54
    "cyclades.total_ram",
55
    "cyclades.ram",
56
    "cyclades.network.private",
57
    "cyclades.floating_ip",
58
]
59

    
60

    
61
class Quotaholder(object):
62
    _object = None
63

    
64
    @classmethod
65
    def get(cls):
66
        if cls._object is None:
67
            cls._object = AstakosClient(ASTAKOS_TOKEN,
68
                                        ASTAKOS_AUTH_URL,
69
                                        use_pool=True,
70
                                        retry=3,
71
                                        logger=log)
72
        return cls._object
73

    
74

    
75
class AstakosClientExceptionHandler(object):
76
    def __init__(self, *args, **kwargs):
77
        pass
78

    
79
    def __enter__(self):
80
        pass
81

    
82
    def __exit__(self, exc_type, value, traceback):
83
        if value is not None:  # exception
84
            if not isinstance(value, errors.AstakosClientException):
85
                return False  # reraise
86
            if exc_type is errors.QuotaLimit:
87
                msg, details = render_overlimit_exception(value)
88
                raise faults.OverLimit(msg, details=details)
89

    
90
            log.exception("Unexpected error %s" % value.message)
91
            raise faults.InternalServerError("Unexpected error")
92

    
93

    
94
def issue_commission(resource, action, name="", force=False, auto_accept=False,
95
                     action_fields=None):
96
    """Issue a new commission to the quotaholder.
97

98
    Issue a new commission to the quotaholder, and create the
99
    corresponing QuotaHolderSerial object in DB.
100

101
    """
102

    
103
    provisions = get_commission_info(resource=resource, action=action,
104
                                     action_fields=action_fields)
105

    
106
    if provisions is None:
107
        return None
108

    
109
    user = resource.userid
110
    source = resource.project
111

    
112
    qh = Quotaholder.get()
113
    if True:  # placeholder
114
        with AstakosClientExceptionHandler():
115
            serial = qh.issue_one_commission(user, source,
116
                                             provisions, name=name,
117
                                             force=force,
118
                                             auto_accept=auto_accept)
119

    
120
    if not serial:
121
        raise Exception("No serial")
122

    
123
    serial_info = {"serial": serial}
124
    if auto_accept:
125
        serial_info["pending"] = False
126
        serial_info["accept"] = True
127
        serial_info["resolved"] = True
128

    
129
    serial = QuotaHolderSerial.objects.create(**serial_info)
130

    
131
    # Correlate the serial with the resource. Resolved serials are not
132
    # attached to resources
133
    if not auto_accept:
134
        resource.serial = serial
135
        resource.save()
136

    
137
    return serial
138

    
139

    
140
def accept_resource_serial(resource, strict=True):
141
    serial = resource.serial
142
    assert serial.pending or serial.accept, "%s can't be accepted" % serial
143
    log.debug("Accepting serial %s of resource %s", serial, resource)
144
    _resolve_commissions(accept=[serial.serial], strict=strict)
145
    resource.serial = None
146
    resource.save()
147
    return resource
148

    
149

    
150
def reject_resource_serial(resource, strict=True):
151
    serial = resource.serial
152
    assert serial.pending or not serial.accept, "%s can't be rejected" % serial
153
    log.debug("Rejecting serial %s of resource %s", serial, resource)
154
    _resolve_commissions(reject=[serial.serial], strict=strict)
155
    resource.serial = None
156
    resource.save()
157
    return resource
158

    
159

    
160
def _resolve_commissions(accept=None, reject=None, strict=True):
161
    if accept is None:
162
        accept = []
163
    if reject is None:
164
        reject = []
165

    
166
    qh = Quotaholder.get()
167
    with AstakosClientExceptionHandler():
168
        response = qh.resolve_commissions(accept, reject)
169

    
170
    accepted = response.get("accepted", [])
171
    rejected = response.get("rejected", [])
172

    
173
    if accepted:
174
        QuotaHolderSerial.objects.filter(serial__in=accepted).update(
175
            accept=True, pending=False, resolved=True)
176
    if rejected:
177
        QuotaHolderSerial.objects.filter(serial__in=rejected).update(
178
            accept=False, pending=False, resolved=True)
179

    
180
    if strict:
181
        failed = response["failed"]
182
        if failed:
183
            log.error("Unexpected error while resolving commissions: %s",
184
                      failed)
185

    
186
    return response
187

    
188

    
189
def reconcile_resolve_commissions(accept=None, reject=None, strict=True):
190
    response = _resolve_commissions(accept=accept,
191
                                    reject=reject,
192
                                    strict=strict)
193
    affected = response.get("accepted", []) + response.get("rejected", [])
194
    for resource in QUOTABLE_RESOURCES:
195
        resource.objects.filter(serial__in=affected).update(serial=None)
196

    
197

    
198
def resolve_pending_commissions():
199
    """Resolve quotaholder pending commissions.
200

201
    Get pending commissions from the quotaholder and resolve them
202
    to accepted and rejected, according to the state of the
203
    QuotaHolderSerial DB table. A pending commission in the quotaholder
204
    can exist in the QuotaHolderSerial table and be either accepted or
205
    rejected, or cannot exist in this table, so it is rejected.
206

207
    """
208

    
209
    qh_pending = get_quotaholder_pending()
210
    if not qh_pending:
211
        return ([], [])
212

    
213
    qh_pending.sort()
214
    min_ = qh_pending[0]
215

    
216
    serials = QuotaHolderSerial.objects.filter(serial__gte=min_, pending=False)
217
    accepted = serials.filter(accept=True).values_list('serial', flat=True)
218
    accepted = filter(lambda x: x in qh_pending, accepted)
219

    
220
    rejected = list(set(qh_pending) - set(accepted))
221

    
222
    return (accepted, rejected)
223

    
224

    
225
def get_quotaholder_pending():
226
    qh = Quotaholder.get()
227
    pending_serials = qh.get_pending_commissions()
228
    return pending_serials
229

    
230

    
231
def render_overlimit_exception(e):
232
    resource_name = {"vm": "Virtual Machine",
233
                     "cpu": "CPU",
234
                     "ram": "RAM",
235
                     "network.private": "Private Network",
236
                     "floating_ip": "Floating IP address"}
237
    details = json.loads(e.details)
238
    data = details['overLimit']['data']
239
    usage = data["usage"]
240
    limit = data["limit"]
241
    available = limit - usage
242
    provision = data['provision']
243
    requested = provision['quantity']
244
    resource = provision['resource']
245
    res = resource.replace("cyclades.", "", 1)
246
    try:
247
        resource = resource_name[res]
248
    except KeyError:
249
        resource = res
250

    
251
    msg = "Resource Limit Exceeded for your account."
252
    details = "Limit for resource '%s' exceeded for your account."\
253
              " Available: %s, Requested: %s"\
254
              % (resource, available, requested)
255
    return msg, details
256

    
257

    
258
@transaction.commit_on_success
259
def issue_and_accept_commission(resource, action="BUILD", action_fields=None):
260
    """Issue and accept a commission to Quotaholder.
261

262
    This function implements the Commission workflow, and must be called
263
    exactly after and in the same transaction that created/updated the
264
    resource. The workflow that implements is the following:
265
    0) Resolve previous unresolved commission if exists
266
    1) Issue commission, get a serial and correlate it with the resource
267
    2) Store the serial in DB as a serial to accept
268
    3) COMMIT!
269
    4) Accept commission to QH
270

271
    """
272
    commission_reason = ("client: api, resource: %s, action: %s"
273
                         % (resource, action))
274
    serial = handle_resource_commission(resource=resource, action=action,
275
                                        action_fields=action_fields,
276
                                        commission_name=commission_reason)
277

    
278
    if serial is None:
279
        return
280

    
281
    # Mark the serial as one to accept and associate it with the resource
282
    serial.pending = False
283
    serial.accept = True
284
    serial.save()
285
    transaction.commit()
286

    
287
    try:
288
        # Accept the commission to quotaholder
289
        accept_resource_serial(resource)
290
    except:
291
        # Do not crash if we can not accept commission to Quotaholder. Quotas
292
        # have already been reserved and the resource already exists in DB.
293
        # Just log the error
294
        log.exception("Failed to accept commission: %s", resource.serial)
295

    
296

    
297
def get_commission_info(resource, action, action_fields=None):
298
    if isinstance(resource, VirtualMachine):
299
        flavor = resource.flavor
300
        resources = {"cyclades.vm": 1,
301
                     "cyclades.total_cpu": flavor.cpu,
302
                     "cyclades.disk": 1073741824 * flavor.disk,
303
                     "cyclades.total_ram": 1048576 * flavor.ram}
304
        online_resources = {"cyclades.cpu": flavor.cpu,
305
                            "cyclades.ram": 1048576 * flavor.ram}
306
        if action == "BUILD":
307
            resources.update(online_resources)
308
            return resources
309
        if action == "START":
310
            if resource.operstate == "STOPPED":
311
                return online_resources
312
            else:
313
                return None
314
        elif action == "STOP":
315
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
316
                return reverse_quantities(online_resources)
317
            else:
318
                return None
319
        elif action == "REBOOT":
320
            if resource.operstate == "STOPPED":
321
                return online_resources
322
            else:
323
                return None
324
        elif action == "DESTROY":
325
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
326
                resources.update(online_resources)
327
            return reverse_quantities(resources)
328
        elif action == "RESIZE" and action_fields:
329
            beparams = action_fields.get("beparams")
330
            cpu = beparams.get("vcpus", flavor.cpu)
331
            ram = beparams.get("maxmem", flavor.ram)
332
            return {"cyclades.total_cpu": cpu - flavor.cpu,
333
                    "cyclades.total_ram": 1048576 * (ram - flavor.ram)}
334
        else:
335
            #["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
336
            return None
337
    elif isinstance(resource, Network):
338
        resources = {"cyclades.network.private": 1}
339
        if action == "BUILD":
340
            return resources
341
        elif action == "DESTROY":
342
            return reverse_quantities(resources)
343
    elif isinstance(resource, IPAddress):
344
        if resource.floating_ip:
345
            resources = {"cyclades.floating_ip": 1}
346
            if action == "BUILD":
347
                return resources
348
            elif action == "DESTROY":
349
                return reverse_quantities(resources)
350
        else:
351
            return None
352

    
353

    
354
def reverse_quantities(resources):
355
    return dict((r, -s) for r, s in resources.items())
356

    
357

    
358
def handle_resource_commission(resource, action, commission_name,
359
                               force=False, auto_accept=False,
360
                               action_fields=None):
361
    """Handle a issuing of a commission for a resource.
362

363
    Create a new commission for a resource based on the action that
364
    is performed. If the resource has a previous pending commission,
365
    resolved it before issuing the new one.
366

367
    """
368
    # Try to resolve previous serial:
369
    # If action is DESTROY, we must always reject the previous commission,
370
    # since multiple DESTROY actions are allowed in the same resource (e.g. VM)
371
    # The one who succeeds will be finally accepted, and all other will be
372
    # rejected
373
    force = force or (action == "DESTROY")
374
    resolve_resource_commission(resource, force=force)
375

    
376
    serial = issue_commission(resource, action, name=commission_name,
377
                              force=force, auto_accept=auto_accept,
378
                              action_fields=action_fields)
379
    return serial
380

    
381

    
382
class ResolveError(Exception):
383
    pass
384

    
385

    
386
def resolve_resource_commission(resource, force=False):
387
    serial = resource.serial
388
    if serial is None or serial.resolved:
389
        return
390
    if serial.pending and not force:
391
        m = "Could not resolve commission: serial %s is undecided" % serial
392
        raise ResolveError(m)
393
    log.warning("Resolving pending commission: %s", serial)
394
    if not serial.pending and serial.accept:
395
        accept_resource_serial(resource)
396
    else:
397
        reject_resource_serial(resource)