Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / quotas / __init__.py @ 19b2c29d

History | View | Annotate | Download (13.5 kB)

1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
from django.utils import simplejson as json
31
from django.db import transaction
32

    
33
from snf_django.lib.api import faults
34
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
35
                               IPAddress)
36

    
37
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
38
                              ASTAKOS_AUTH_URL)
39
from astakosclient import AstakosClient
40
from astakosclient import errors
41

    
42
import logging
43
log = logging.getLogger(__name__)
44

    
45

    
46
DEFAULT_SOURCE = 'system'
47
RESOURCES = [
48
    "cyclades.vm",
49
    "cyclades.total_cpu",
50
    "cyclades.cpu",
51
    "cyclades.disk",
52
    "cyclades.total_ram",
53
    "cyclades.ram",
54
    "cyclades.network.private",
55
    "cyclades.floating_ip",
56
]
57

    
58

    
59
class Quotaholder(object):
60
    _object = None
61

    
62
    @classmethod
63
    def get(cls):
64
        if cls._object is None:
65
            cls._object = AstakosClient(ASTAKOS_TOKEN,
66
                                        ASTAKOS_AUTH_URL,
67
                                        use_pool=True,
68
                                        retry=3,
69
                                        logger=log)
70
        return cls._object
71

    
72

    
73
class AstakosClientExceptionHandler(object):
74
    def __init__(self, *args, **kwargs):
75
        pass
76

    
77
    def __enter__(self):
78
        pass
79

    
80
    def __exit__(self, exc_type, value, traceback):
81
        if value is not None:  # exception
82
            if not isinstance(value, errors.AstakosClientException):
83
                return False  # reraise
84
            if exc_type is errors.QuotaLimit:
85
                msg, details = render_overlimit_exception(value)
86
                raise faults.OverLimit(msg, details=details)
87

    
88
            log.exception("Unexpected error %s" % value.message)
89
            raise faults.InternalServerError("Unexpected error")
90

    
91

    
92
def issue_commission(resource, action, name="", force=False, auto_accept=False,
93
                     action_fields=None):
94
    """Issue a new commission to the quotaholder.
95

96
    Issue a new commission to the quotaholder, and create the
97
    corresponing QuotaHolderSerial object in DB.
98

99
    """
100

    
101
    provisions = get_commission_info(resource=resource, action=action,
102
                                     action_fields=action_fields)
103

    
104
    if provisions is None:
105
        return None
106

    
107
    user = resource.userid
108
    source = DEFAULT_SOURCE
109

    
110
    qh = Quotaholder.get()
111
    if True:  # placeholder
112
        with AstakosClientExceptionHandler():
113
            serial = qh.issue_one_commission(user, source,
114
                                             provisions, name=name,
115
                                             force=force,
116
                                             auto_accept=auto_accept)
117

    
118
    if serial:
119
        serial_info = {"serial": serial}
120
        if auto_accept:
121
            serial_info["pending"] = False
122
            serial_info["accept"] = True
123
            serial_info["resolved"] = True
124
        return QuotaHolderSerial.objects.create(**serial_info)
125
    else:
126
        raise Exception("No serial")
127

    
128

    
129
def accept_serial(serial, strict=True):
130
    assert serial.pending or serial.accept
131
    response = resolve_commissions(accept=[serial.serial], strict=strict)
132
    serial.pending = False
133
    serial.accept = True
134
    serial.resolved = True
135
    serial.save()
136
    return response
137

    
138

    
139
def reject_serial(serial, strict=True):
140
    assert serial.pending or not serial.accept
141
    response = resolve_commissions(reject=[serial.serial], strict=strict)
142
    serial.pending = False
143
    serial.accept = False
144
    serial.resolved = True
145
    serial.save()
146
    return response
147

    
148

    
149
def accept_commissions(accepted, strict=True):
150
    return resolve_commissions(accept=accepted, strict=strict)
151

    
152

    
153
def reject_commissions(rejected, strict=True):
154
    return resolve_commissions(reject=rejected, strict=strict)
155

    
156

    
157
def resolve_commissions(accept=None, reject=None, strict=True):
158
    if accept is None:
159
        accept = []
160
    if reject is None:
161
        reject = []
162

    
163
    qh = Quotaholder.get()
164
    with AstakosClientExceptionHandler():
165
        response = qh.resolve_commissions(accept, reject)
166

    
167
    # Update correspodning entries in DB
168
    QuotaHolderSerial.objects.filter(serial__in=accept).update(accept=True,
169
                                                               pending=False,
170
                                                               resolved=True)
171
    QuotaHolderSerial.objects.filter(serial__in=reject).update(accept=False,
172
                                                               pending=False,
173
                                                               resolved=True)
174

    
175
    if strict:
176
        failed = response["failed"]
177
        if failed:
178
            log.error("Unexpected error while resolving commissions: %s",
179
                      failed)
180

    
181
    return response
182

    
183

    
184
def fix_pending_commissions():
185
    (accepted, rejected) = resolve_pending_commissions()
186
    resolve_commissions(accept=accepted, reject=rejected)
187

    
188

    
189
def resolve_pending_commissions():
190
    """Resolve quotaholder pending commissions.
191

192
    Get pending commissions from the quotaholder and resolve them
193
    to accepted and rejected, according to the state of the
194
    QuotaHolderSerial DB table. A pending commission in the quotaholder
195
    can exist in the QuotaHolderSerial table and be either accepted or
196
    rejected, or cannot exist in this table, so it is rejected.
197

198
    """
199

    
200
    qh_pending = get_quotaholder_pending()
201
    if not qh_pending:
202
        return ([], [])
203

    
204
    qh_pending.sort()
205
    min_ = qh_pending[0]
206

    
207
    serials = QuotaHolderSerial.objects.filter(serial__gte=min_, pending=False)
208
    accepted = serials.filter(accept=True).values_list('serial', flat=True)
209
    accepted = filter(lambda x: x in qh_pending, accepted)
210

    
211
    rejected = list(set(qh_pending) - set(accepted))
212

    
213
    return (accepted, rejected)
214

    
215

    
216
def get_quotaholder_pending():
217
    qh = Quotaholder.get()
218
    pending_serials = qh.get_pending_commissions()
219
    return pending_serials
220

    
221

    
222
def render_overlimit_exception(e):
223
    resource_name = {"vm": "Virtual Machine",
224
                     "cpu": "CPU",
225
                     "ram": "RAM",
226
                     "network.private": "Private Network",
227
                     "floating_ip": "Floating IP address"}
228
    details = json.loads(e.details)
229
    data = details['overLimit']['data']
230
    usage = data["usage"]
231
    limit = data["limit"]
232
    available = limit - usage
233
    provision = data['provision']
234
    requested = provision['quantity']
235
    resource = provision['resource']
236
    res = resource.replace("cyclades.", "", 1)
237
    try:
238
        resource = resource_name[res]
239
    except KeyError:
240
        resource = res
241

    
242
    msg = "Resource Limit Exceeded for your account."
243
    details = "Limit for resource '%s' exceeded for your account."\
244
              " Available: %s, Requested: %s"\
245
              % (resource, available, requested)
246
    return msg, details
247

    
248

    
249
@transaction.commit_on_success
250
def issue_and_accept_commission(resource, action="BUILD", action_fields=None):
251
    """Issue and accept a commission to Quotaholder.
252

253
    This function implements the Commission workflow, and must be called
254
    exactly after and in the same transaction that created/updated the
255
    resource. The workflow that implements is the following:
256
    0) Resolve previous unresolved commission if exists
257
    1) Issue commission, get a serial and correlate it with the resource
258
    2) Store the serial in DB as a serial to accept
259
    3) COMMIT!
260
    4) Accept commission to QH
261

262
    """
263
    commission_reason = ("client: api, resource: %s, action: %s"
264
                         % (resource, action))
265
    serial = handle_resource_commission(resource=resource, action=action,
266
                                        action_fields=action_fields,
267
                                        commission_name=commission_reason)
268

    
269
    # Mark the serial as one to accept and associate it with the resource
270
    serial.pending = False
271
    serial.accept = True
272
    serial.save()
273
    transaction.commit()
274

    
275
    try:
276
        # Accept the commission to quotaholder
277
        accept_serial(serial)
278
    except:
279
        # Do not crash if we can not accept commission to Quotaholder. Quotas
280
        # have already been reserved and the resource already exists in DB.
281
        # Just log the error
282
        log.exception("Failed to accept commission: %s", serial)
283

    
284
    return serial
285

    
286

    
287
def get_commission_info(resource, action, action_fields=None):
288
    if isinstance(resource, VirtualMachine):
289
        flavor = resource.flavor
290
        resources = {"cyclades.vm": 1,
291
                     "cyclades.total_cpu": flavor.cpu,
292
                     "cyclades.disk": 1073741824 * flavor.disk,
293
                     "cyclades.total_ram": 1048576 * flavor.ram}
294
        online_resources = {"cyclades.cpu": flavor.cpu,
295
                            "cyclades.ram": 1048576 * flavor.ram}
296
        if action == "BUILD":
297
            resources.update(online_resources)
298
            return resources
299
        if action == "START":
300
            if resource.operstate == "STOPPED":
301
                return online_resources
302
            else:
303
                return None
304
        elif action == "STOP":
305
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
306
                return reverse_quantities(online_resources)
307
            else:
308
                return None
309
        elif action == "REBOOT":
310
            if resource.operstate == "STOPPED":
311
                return online_resources
312
            else:
313
                return None
314
        elif action == "DESTROY":
315
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
316
                resources.update(online_resources)
317
            return reverse_quantities(resources)
318
        elif action == "RESIZE" and action_fields:
319
            beparams = action_fields.get("beparams")
320
            cpu = beparams.get("vcpus", flavor.cpu)
321
            ram = beparams.get("maxmem", flavor.ram)
322
            return {"cyclades.total_cpu": cpu - flavor.cpu,
323
                    "cyclades.total_ram": 1048576 * (ram - flavor.ram)}
324
        else:
325
            #["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
326
            return None
327
    elif isinstance(resource, Network):
328
        resources = {"cyclades.network.private": 1}
329
        if action == "BUILD":
330
            return resources
331
        elif action == "DESTROY":
332
            return reverse_quantities(resources)
333
    elif isinstance(resource, IPAddress):
334
        if resource.floating_ip:
335
            resources = {"cyclades.floating_ip": 1}
336
            if action == "BUILD":
337
                return resources
338
            elif action == "DESTROY":
339
                return reverse_quantities(resources)
340
        else:
341
            return None
342

    
343

    
344
def reverse_quantities(resources):
345
    return dict((r, -s) for r, s in resources.items())
346

    
347

    
348
def handle_resource_commission(resource, action, commission_name,
349
                               force=False, auto_accept=False,
350
                               action_fields=None):
351
    """Handle a issuing of a commission for a resource.
352

353
    Create a new commission for a resource based on the action that
354
    is performed. If the resource has a previous pending commission,
355
    resolved it before issuing the new one.
356

357
    """
358
    # Try to resolve previous serial:
359
    # If action is DESTROY, we must always reject the previous commission,
360
    # since multiple DESTROY actions are allowed in the same resource (e.g. VM)
361
    # The one who succeeds will be finally accepted, and all other will be
362
    # rejected
363
    force = force or (action == "DESTROY")
364
    resolve_commission(resource.serial, force=force)
365

    
366
    serial = issue_commission(resource, action, name=commission_name,
367
                              force=force, auto_accept=auto_accept,
368
                              action_fields=action_fields)
369
    resource.serial = serial
370
    resource.save()
371
    return serial
372

    
373

    
374
class ResolveError(Exception):
375
    pass
376

    
377

    
378
def resolve_commission(serial, force=False):
379
    if serial is None or serial.resolved:
380
        return
381
    if serial.pending and not force:
382
        m = "Could not resolve commission: serial %s is undecided" % serial
383
        raise ResolveError(m)
384
    log.warning("Resolving pending commission: %s", serial)
385
    if not serial.pending and serial.accept:
386
        accept_serial(serial)
387
    else:
388
        reject_serial(serial)