Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / quotas / __init__.py @ ff5edb80

History | View | Annotate | Download (15.6 kB)

1
# Copyright 2012-2014 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
from django.utils import simplejson as json
31
from django.db import transaction
32

    
33
from snf_django.lib.api import faults
34
from synnefo.db.models import (QuotaHolderSerial, VirtualMachine, Network,
35
                               IPAddress)
36

    
37
from synnefo.settings import (CYCLADES_SERVICE_TOKEN as ASTAKOS_TOKEN,
38
                              ASTAKOS_AUTH_URL)
39
from astakosclient import AstakosClient
40
from astakosclient import errors
41

    
42
import logging
43
log = logging.getLogger(__name__)
44

    
45

    
46
QUOTABLE_RESOURCES = [VirtualMachine, Network, IPAddress]
47

    
48

    
49
RESOURCES = [
50
    "cyclades.vm",
51
    "cyclades.total_cpu",
52
    "cyclades.cpu",
53
    "cyclades.disk",
54
    "cyclades.total_ram",
55
    "cyclades.ram",
56
    "cyclades.network.private",
57
    "cyclades.floating_ip",
58
]
59

    
60

    
61
class Quotaholder(object):
62
    _object = None
63

    
64
    @classmethod
65
    def get(cls):
66
        if cls._object is None:
67
            cls._object = AstakosClient(ASTAKOS_TOKEN,
68
                                        ASTAKOS_AUTH_URL,
69
                                        use_pool=True,
70
                                        retry=3,
71
                                        logger=log)
72
        return cls._object
73

    
74

    
75
class AstakosClientExceptionHandler(object):
76
    def __init__(self, *args, **kwargs):
77
        self.user = kwargs.get("user")
78
        self.projects = kwargs.get("projects")
79

    
80
    def __enter__(self):
81
        pass
82

    
83
    def check_notFound(self):
84
        if not self.user or not self.projects:
85
            return
86
        try:
87
            qh = Quotaholder.get()
88
            user_quota = qh.service_get_quotas(self.user)
89
        except errors.AstakosClientException as e:
90
            log.exception("Unexpected error %s" % e.message)
91
            raise faults.InternalServerError("Unexpected error")
92

    
93
        user_quota = user_quota[self.user]
94
        for project in self.projects:
95
            try:
96
                user_quota[project]
97
            except KeyError:
98
                m = "User %s not in project %s" % (self.user, project)
99
                raise faults.BadRequest(m)
100

    
101
    def __exit__(self, exc_type, value, traceback):
102
        if value is not None:  # exception
103
            if not isinstance(value, errors.AstakosClientException):
104
                return False  # reraise
105
            if exc_type is errors.QuotaLimit:
106
                msg, details = render_overlimit_exception(value)
107
                raise faults.OverLimit(msg, details=details)
108
            if exc_type is errors.NotFound:
109
                self.check_notFound()
110

    
111
            log.exception("Unexpected error %s" % value.message)
112
            raise faults.InternalServerError("Unexpected error")
113

    
114

    
115
def issue_commission(resource, action, name="", force=False, auto_accept=False,
116
                     action_fields=None):
117
    """Issue a new commission to the quotaholder.
118

119
    Issue a new commission to the quotaholder, and create the
120
    corresponing QuotaHolderSerial object in DB.
121

122
    """
123

    
124
    provisions = get_commission_info(resource=resource, action=action,
125
                                     action_fields=action_fields)
126

    
127
    if provisions is None:
128
        return None
129

    
130
    user = resource.userid
131
    source = resource.project
132

    
133
    qh = Quotaholder.get()
134
    if action == "REASSIGN":
135
        try:
136
            from_project = action_fields["from_project"]
137
            to_project = action_fields["to_project"]
138
        except KeyError:
139
            raise Exception("Missing project attribute.")
140

    
141
        projects = [from_project, to_project]
142
        with AstakosClientExceptionHandler(user=user, projects=projects):
143
            serial = qh.issue_resource_reassignment(user,
144
                                                    from_project, to_project,
145
                                                    provisions, name=name,
146
                                                    force=force,
147
                                                    auto_accept=auto_accept)
148
    else:
149
        with AstakosClientExceptionHandler(user=user, projects=[source]):
150
            serial = qh.issue_one_commission(user, source,
151
                                             provisions, name=name,
152
                                             force=force,
153
                                             auto_accept=auto_accept)
154

    
155
    if not serial:
156
        raise Exception("No serial")
157

    
158
    serial_info = {"serial": serial}
159
    if auto_accept:
160
        serial_info["pending"] = False
161
        serial_info["accept"] = True
162
        serial_info["resolved"] = True
163

    
164
    serial = QuotaHolderSerial.objects.create(**serial_info)
165

    
166
    # Correlate the serial with the resource. Resolved serials are not
167
    # attached to resources
168
    if not auto_accept:
169
        resource.serial = serial
170
        resource.save()
171

    
172
    return serial
173

    
174

    
175
def accept_resource_serial(resource, strict=True):
176
    serial = resource.serial
177
    assert serial.pending or serial.accept, "%s can't be accepted" % serial
178
    log.debug("Accepting serial %s of resource %s", serial, resource)
179
    _resolve_commissions(accept=[serial.serial], strict=strict)
180
    resource.serial = None
181
    resource.save()
182
    return resource
183

    
184

    
185
def reject_resource_serial(resource, strict=True):
186
    serial = resource.serial
187
    assert serial.pending or not serial.accept, "%s can't be rejected" % serial
188
    log.debug("Rejecting serial %s of resource %s", serial, resource)
189
    _resolve_commissions(reject=[serial.serial], strict=strict)
190
    resource.serial = None
191
    resource.save()
192
    return resource
193

    
194

    
195
def _resolve_commissions(accept=None, reject=None, strict=True):
196
    if accept is None:
197
        accept = []
198
    if reject is None:
199
        reject = []
200

    
201
    qh = Quotaholder.get()
202
    with AstakosClientExceptionHandler():
203
        response = qh.resolve_commissions(accept, reject)
204

    
205
    accepted = response.get("accepted", [])
206
    rejected = response.get("rejected", [])
207

    
208
    if accepted:
209
        QuotaHolderSerial.objects.filter(serial__in=accepted).update(
210
            accept=True, pending=False, resolved=True)
211
    if rejected:
212
        QuotaHolderSerial.objects.filter(serial__in=rejected).update(
213
            accept=False, pending=False, resolved=True)
214

    
215
    if strict:
216
        failed = response["failed"]
217
        if failed:
218
            log.error("Unexpected error while resolving commissions: %s",
219
                      failed)
220

    
221
    return response
222

    
223

    
224
def reconcile_resolve_commissions(accept=None, reject=None, strict=True):
225
    response = _resolve_commissions(accept=accept,
226
                                    reject=reject,
227
                                    strict=strict)
228
    affected = response.get("accepted", []) + response.get("rejected", [])
229
    for resource in QUOTABLE_RESOURCES:
230
        resource.objects.filter(serial__in=affected).update(serial=None)
231

    
232

    
233
def resolve_pending_commissions():
234
    """Resolve quotaholder pending commissions.
235

236
    Get pending commissions from the quotaholder and resolve them
237
    to accepted and rejected, according to the state of the
238
    QuotaHolderSerial DB table. A pending commission in the quotaholder
239
    can exist in the QuotaHolderSerial table and be either accepted or
240
    rejected, or cannot exist in this table, so it is rejected.
241

242
    """
243

    
244
    qh_pending = get_quotaholder_pending()
245
    if not qh_pending:
246
        return ([], [])
247

    
248
    qh_pending.sort()
249
    min_ = qh_pending[0]
250

    
251
    serials = QuotaHolderSerial.objects.filter(serial__gte=min_, pending=False)
252
    accepted = serials.filter(accept=True).values_list('serial', flat=True)
253
    accepted = filter(lambda x: x in qh_pending, accepted)
254

    
255
    rejected = list(set(qh_pending) - set(accepted))
256

    
257
    return (accepted, rejected)
258

    
259

    
260
def get_quotaholder_pending():
261
    qh = Quotaholder.get()
262
    pending_serials = qh.get_pending_commissions()
263
    return pending_serials
264

    
265

    
266
def render_overlimit_exception(e):
267
    resource_name = {"vm": "Virtual Machine",
268
                     "cpu": "CPU",
269
                     "ram": "RAM",
270
                     "network.private": "Private Network",
271
                     "floating_ip": "Floating IP address"}
272
    details = json.loads(e.details)
273
    data = details['overLimit']['data']
274
    usage = data["usage"]
275
    limit = data["limit"]
276
    available = limit - usage
277
    provision = data['provision']
278
    requested = provision['quantity']
279
    resource = provision['resource']
280
    res = resource.replace("cyclades.", "", 1)
281
    try:
282
        resource = resource_name[res]
283
    except KeyError:
284
        resource = res
285

    
286
    msg = "Resource Limit Exceeded for your account."
287
    details = "Limit for resource '%s' exceeded for your account."\
288
              " Available: %s, Requested: %s"\
289
              % (resource, available, requested)
290
    return msg, details
291

    
292

    
293
@transaction.commit_on_success
294
def issue_and_accept_commission(resource, action="BUILD", action_fields=None):
295
    """Issue and accept a commission to Quotaholder.
296

297
    This function implements the Commission workflow, and must be called
298
    exactly after and in the same transaction that created/updated the
299
    resource. The workflow that implements is the following:
300
    0) Resolve previous unresolved commission if exists
301
    1) Issue commission, get a serial and correlate it with the resource
302
    2) Store the serial in DB as a serial to accept
303
    3) COMMIT!
304
    4) Accept commission to QH
305

306
    """
307
    commission_reason = ("client: api, resource: %s, action: %s"
308
                         % (resource, action))
309
    serial = handle_resource_commission(resource=resource, action=action,
310
                                        action_fields=action_fields,
311
                                        commission_name=commission_reason)
312

    
313
    if serial is None:
314
        return
315

    
316
    # Mark the serial as one to accept and associate it with the resource
317
    serial.pending = False
318
    serial.accept = True
319
    serial.save()
320
    transaction.commit()
321

    
322
    try:
323
        # Accept the commission to quotaholder
324
        accept_resource_serial(resource)
325
    except:
326
        # Do not crash if we can not accept commission to Quotaholder. Quotas
327
        # have already been reserved and the resource already exists in DB.
328
        # Just log the error
329
        log.exception("Failed to accept commission: %s", resource.serial)
330

    
331

    
332
def get_commission_info(resource, action, action_fields=None):
333
    if isinstance(resource, VirtualMachine):
334
        flavor = resource.flavor
335
        resources = {"cyclades.vm": 1,
336
                     "cyclades.total_cpu": flavor.cpu,
337
                     "cyclades.disk": 1073741824 * flavor.disk,
338
                     "cyclades.total_ram": 1048576 * flavor.ram}
339
        online_resources = {"cyclades.cpu": flavor.cpu,
340
                            "cyclades.ram": 1048576 * flavor.ram}
341
        if action == "BUILD":
342
            resources.update(online_resources)
343
            return resources
344
        if action == "START":
345
            if resource.operstate == "STOPPED":
346
                return online_resources
347
            else:
348
                return None
349
        elif action == "STOP":
350
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
351
                return reverse_quantities(online_resources)
352
            else:
353
                return None
354
        elif action == "REBOOT":
355
            if resource.operstate == "STOPPED":
356
                return online_resources
357
            else:
358
                return None
359
        elif action == "DESTROY":
360
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
361
                resources.update(online_resources)
362
            return reverse_quantities(resources)
363
        elif action == "RESIZE" and action_fields:
364
            beparams = action_fields.get("beparams")
365
            cpu = beparams.get("vcpus", flavor.cpu)
366
            ram = beparams.get("maxmem", flavor.ram)
367
            return {"cyclades.total_cpu": cpu - flavor.cpu,
368
                    "cyclades.total_ram": 1048576 * (ram - flavor.ram)}
369
        elif action == "REASSIGN":
370
            if resource.operstate in ["STARTED", "BUILD", "ERROR"]:
371
                resources.update(online_resources)
372
            return resources
373
        else:
374
            #["CONNECT", "DISCONNECT", "SET_FIREWALL_PROFILE"]:
375
            return None
376
    elif isinstance(resource, Network):
377
        resources = {"cyclades.network.private": 1}
378
        if action == "BUILD":
379
            return resources
380
        elif action == "DESTROY":
381
            return reverse_quantities(resources)
382
        elif action == "REASSIGN":
383
            return resources
384
    elif isinstance(resource, IPAddress):
385
        if resource.floating_ip:
386
            resources = {"cyclades.floating_ip": 1}
387
            if action == "BUILD":
388
                return resources
389
            elif action == "DESTROY":
390
                return reverse_quantities(resources)
391
            elif action == "REASSIGN":
392
                return resources
393
        else:
394
            return None
395

    
396

    
397
def reverse_quantities(resources):
398
    return dict((r, -s) for r, s in resources.items())
399

    
400

    
401
def handle_resource_commission(resource, action, commission_name,
402
                               force=False, auto_accept=False,
403
                               action_fields=None):
404
    """Handle a issuing of a commission for a resource.
405

406
    Create a new commission for a resource based on the action that
407
    is performed. If the resource has a previous pending commission,
408
    resolved it before issuing the new one.
409

410
    """
411
    # Try to resolve previous serial:
412
    # If action is DESTROY, we must always reject the previous commission,
413
    # since multiple DESTROY actions are allowed in the same resource (e.g. VM)
414
    # The one who succeeds will be finally accepted, and all other will be
415
    # rejected
416
    force = force or (action == "DESTROY")
417
    resolve_resource_commission(resource, force=force)
418

    
419
    serial = issue_commission(resource, action, name=commission_name,
420
                              force=force, auto_accept=auto_accept,
421
                              action_fields=action_fields)
422
    return serial
423

    
424

    
425
class ResolveError(Exception):
426
    pass
427

    
428

    
429
def resolve_resource_commission(resource, force=False):
430
    serial = resource.serial
431
    if serial is None or serial.resolved:
432
        return
433
    if serial.pending and not force:
434
        m = "Could not resolve commission: serial %s is undecided" % serial
435
        raise ResolveError(m)
436
    log.warning("Resolving pending commission: %s", serial)
437
    if not serial.pending and serial.accept:
438
        accept_resource_serial(resource)
439
    else:
440
        reject_resource_serial(resource)