Revision 2509ce17

b/snf-cyclades-app/synnefo/api/networks.py
147 147

  
148 148

  
149 149
@api.api_method(http_method='POST', user_required=True, logger=log)
150
@quotas.uses_commission
151 150
@transaction.commit_manually
152
def create_network(serials, request):
151
def create_network(request):
153 152
    # Normal Response Code: 202
154 153
    # Error Response Codes: computeFault (400, 500),
155 154
    #                       serviceUnavailable (503),
......
194 193
        # Check that user provided a valid subnet
195 194
        util.validate_network_params(subnet, gateway, subnet6, gateway6)
196 195

  
197
        # Issue commission
198
        serial = quotas.issue_network_commission(user_id)
199
        serials.append(serial)
200
        # Make the commission accepted, since in the end of this
201
        # transaction the Network will have been created in the DB.
202
        serial.accepted = True
203
        serial.save()
204

  
205 196
        try:
206 197
            mode, link, mac_prefix, tags = util.values_from_flavor(flavor)
207 198
            network = Network.objects.create(
......
218 209
                mac_prefix=mac_prefix,
219 210
                tags=tags,
220 211
                action='CREATE',
221
                state='PENDING',
222
                serial=serial)
212
                state='PENDING')
223 213
        except EmptyPool:
224 214
            log.error("Failed to allocate resources for network of type: %s",
225 215
                      flavor)
......
227 217

  
228 218
        # Create BackendNetwork entries for each Backend
229 219
        network.create_backend_network()
220
        # Issue commission to Quotaholder and accept it since at the end of
221
        # this transaction the Network object will be created in the DB.
222
        # Note: the following call does a commit!
223
        quotas.issue_and_accept_commission(network)
230 224
    except:
231 225
        transaction.rollback()
232 226
        raise
b/snf-cyclades-app/synnefo/api/servers.py
249 249
# access (SELECT..FOR UPDATE). Running create_server with commit_on_success
250 250
# would result in backends and public networks to be locked until the job is
251 251
# sent to the Ganeti backend.
252
@quotas.uses_commission
253 252
@transaction.commit_manually
254
def create_server(serials, request):
253
def create_server(request):
255 254
    # Normal Response Code: 202
256 255
    # Error Response Codes: computeFault (400, 500),
257 256
    #                       serviceUnavailable (503),
......
311 310
        flavor.disk_provider = None
312 311

  
313 312
    try:
314
        # Issue commission
315
        serial = quotas.issue_vm_commission(user_id, flavor)
316
        serials.append(serial)
317
        # Make the commission accepted, since in the end of this
318
        # transaction the VM will have been created in the DB.
319
        serial.accepted = True
320
        serial.save()
321

  
322 313
        # Allocate IP from public network
323 314
        (network, address) = util.get_public_ip(backend)
324 315
        nic = {'ip': address, 'network': network.backend_id}
......
331 322
            userid=user_id,
332 323
            imageid=image_id,
333 324
            flavor=flavor,
334
            action="CREATE",
335
            serial=serial)
325
            action="CREATE")
336 326

  
337 327
        # Create VM's public NIC. Do not wait notification form ganeti hooks to
338 328
        # create this NIC, because if the hooks never run (e.g. building error)
......
357 347
                meta_key=key,
358 348
                meta_value=val,
359 349
                vm=vm)
350
        # Issue commission to Quotaholder and accept it since at the end of
351
        # this transaction the Network object will be created in the DB.
352
        # Note: the following call does a commit!
353
        quotas.issue_and_accept_commission(vm)
360 354
    except:
361 355
        transaction.rollback()
362 356
        raise
b/snf-cyclades-app/synnefo/logic/backend.py
57 57
_reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
58 58

  
59 59

  
60
@quotas.uses_commission
61 60
@transaction.commit_on_success
62
def process_op_status(serials, vm, etime, jobid, opcode, status, logmsg):
61
def process_op_status(vm, etime, jobid, opcode, status, logmsg):
63 62
    """Process a job progress notification from the backend
64 63

  
65 64
    Process an incoming message from the backend (currently Ganeti).
......
94 93
        #
95 94
        if status == 'success' or (status == 'error' and
96 95
                                   vm.operstate == 'ERROR'):
97
            # Issue commission
98
            serial = quotas.issue_vm_commission(vm.userid, vm.flavor,
99
                                                delete=True)
100
            serials.append(serial)
101
            vm.serial = serial
102
            serial.accepted = True
103
            serial.save()
104 96
            release_instance_nics(vm)
105 97
            vm.nics.all().delete()
106 98
            vm.deleted = True
107 99
            vm.operstate = state_for_success
108 100
            vm.backendtime = etime
101
            # Issue and accept commission to Quotaholder
102
            quotas.issue_and_accept_commission(vm, delete=True)
109 103

  
110 104
    # Update backendtime only for jobs that have been successfully completed,
111 105
    # since only these jobs update the state of the VM. Else a "race condition"
......
239 233
    update_network_state(back_network.network)
240 234

  
241 235

  
242
@quotas.uses_commission
243
def update_network_state(serials, network):
236
def update_network_state(network):
244 237
    old_state = network.state
245 238

  
246 239
    backend_states = [s.operstate for s in
......
267 260

  
268 261
        # Issue commission
269 262
        if network.userid:
270
            serial = quotas.issue_network_commission(network.userid,
271
                                                     delete=True)
272
            serials.append(serial)
273
            network.serial = serial
274
            serial.accepted = True
275
            serial.save()
263
            quotas.issue_and_accept_commission(network, delete=True)
276 264
        elif not network.public:
277 265
            log.warning("Network %s does not have an owner!", network.id)
278 266
    network.save()
b/snf-cyclades-app/synnefo/quotas/__init__.py
27 27
# those of the authors and should not be interpreted as representing official
28 28
# policies, either expressed or implied, of GRNET S.A.
29 29

  
30
from functools import wraps
31 30
from django.utils import simplejson as json
31
from django.db import transaction
32 32

  
33 33
from snf_django.lib.api import faults
34
from synnefo.db.models import QuotaHolderSerial
34
from synnefo.db.models import QuotaHolderSerial, VirtualMachine, Network
35 35

  
36 36
from synnefo.settings import (CYCLADES_ASTAKOS_SERVICE_TOKEN as ASTAKOS_TOKEN,
37 37
                              ASTAKOS_URL)
......
64 64
        return cls._object
65 65

  
66 66

  
67
def uses_commission(func):
68
    """Decorator for wrapping functions that needs commission.
69

  
70
    All decorated functions must take as first argument the `serials` list in
71
    order to extend them with the needed serial numbers, as return by the
72
    Quotaholder
73

  
74
    On successful competition of the decorated function, all serials are
75
    accepted to the quotaholder, otherwise they are rejected.
76

  
77
    """
78

  
79
    @wraps(func)
80
    def wrapper(*args, **kwargs):
81
        try:
82
            serials = []
83
            ret = func(serials, *args, **kwargs)
84
        except:
85
            log.exception("Unexpected error")
86
            try:
87
                if serials:
88
                    reject_commission(serials=serials)
89
            except:
90
                log.exception("Exception while rejecting serials %s", serials)
91
                raise
92
            raise
93

  
94
        # func has completed successfully. accept serials
95
        try:
96
            if serials:
97
                accept_commission(serials)
98
            return ret
99
        except:
100
            log.exception("Exception while accepting serials %s", serials)
101
            raise
102
    return wrapper
103

  
104

  
105
## FIXME: Wrap the following two functions inside transaction ?
106
def accept_commission(serials, update_db=True):
107
    """Accept a list of pending commissions.
108

  
109
    @param serials: List of QuotaHolderSerial objects
110

  
111
    """
112
    if update_db:
113
        for s in serials:
114
            if s.pending:
115
                s.accepted = True
116
                s.save()
117

  
118
    accept_serials = [s.serial for s in serials]
119
    qh_resolve_commissions(accept=accept_serials)
120

  
121

  
122
def reject_commission(serials, update_db=True):
123
    """Reject a list of pending commissions.
124

  
125
    @param serials: List of QuotaHolderSerial objects
126

  
127
    """
128
    if update_db:
129
        for s in serials:
130
            if s.pending:
131
                s.rejected = True
132
                s.save()
133

  
134
    reject_serials = [s.serial for s in serials]
135
    qh_resolve_commissions(reject=reject_serials)
136

  
137

  
138 67
def issue_commission(user, source, provisions,
139 68
                     force=False, auto_accept=False):
140 69
    """Issue a new commission to the quotaholder.
......
162 91
        raise Exception("No serial")
163 92

  
164 93

  
165
# Wrapper functions for issuing commissions for each resource type.  Each
166
# functions creates the `commission_info` dictionary as expected by the
167
# `issue_commision` function. Commissions for deleting a resource, are the same
168
# as for creating the same resource, but with negative resource sizes.
169

  
170

  
171
def issue_vm_commission(user, flavor, delete=False):
172
    resources = get_server_resources(flavor)
173
    if delete:
174
        resources = reverse_quantities(resources)
175
    return issue_commission(user, DEFAULT_SOURCE, resources)
176

  
177

  
178
def get_server_resources(flavor):
179
    return {'cyclades.vm': 1,
180
            'cyclades.cpu': flavor.cpu,
181
            'cyclades.disk': 1073741824 * flavor.disk,  # flavor.disk is in GB
182
            # 'public_ip': 1,
183
            #'disk_template': flavor.disk_template,
184
            'cyclades.ram': 1048576 * flavor.ram}  # flavor.ram is in MB
185

  
186

  
187
def issue_network_commission(user, delete=False):
188
    resources = get_network_resources()
189
    if delete:
190
        resources = reverse_quantities(resources)
191
    return issue_commission(user, DEFAULT_SOURCE, resources)
192

  
193

  
194
def get_network_resources():
195
    return {"cyclades.network.private": 1}
196

  
197

  
198
def reverse_quantities(resources):
199
    return dict((r, -s) for r, s in resources.items())
200

  
201

  
202
##
203
## Reconcile pending commissions
204
##
205

  
206

  
207 94
def accept_commissions(accepted):
208 95
    qh_resolve_commissions(accept=accepted)
209 96

  
......
212 99
    qh_resolve_commissions(reject=rejected)
213 100

  
214 101

  
215
def fix_pending_commissions():
216
    (accepted, rejected) = resolve_pending_commissions()
217
    qh_resolve_commissions(accepted, rejected)
218

  
219

  
220 102
def qh_resolve_commissions(accept=None, reject=None):
221 103
    if accept is None:
222 104
        accept = []
......
227 109
    qh.resolve_commissions(ASTAKOS_TOKEN, accept, reject)
228 110

  
229 111

  
112
def fix_pending_commissions():
113
    (accepted, rejected) = resolve_pending_commissions()
114
    qh_resolve_commissions(accepted, rejected)
115

  
116

  
230 117
def resolve_pending_commissions():
231 118
    """Resolve quotaholder pending commissions.
232 119

  
......
246 133
    min_ = qh_pending[0]
247 134

  
248 135
    serials = QuotaHolderSerial.objects.filter(serial__gte=min_, pending=False)
249
    accepted = serials.filter(accepted=True).values_list('serial', flat=True)
136
    accepted = serials.filter(accept=True).values_list('serial', flat=True)
250 137
    accepted = filter(lambda x: x in qh_pending, accepted)
251 138

  
252 139
    rejected = list(set(qh_pending) - set(accepted))
......
284 171
              " Available: %s, Requested: %s"\
285 172
              % (resource, available, requested)
286 173
    return msg, details
174

  
175

  
176
@transaction.commit_manually
177
def issue_and_accept_commission(resource, delete=False):
178
    """Issue and accept a commission to Quotaholder.
179

  
180
    This function implements the Commission workflow, and must be called
181
    exactly after and in the same transaction that created/updated the
182
    resource. The workflow that implements is the following:
183
    1) Issue commission and get a serial
184
    2) Store the serial in DB and mark is as one to accept
185
    3) Correlate the serial with the resource
186
    4) COMMIT!
187
    5) Accept commission to QH (reject if failed until 5)
188
    6) Mark serial as resolved
189
    7) COMMIT!
190

  
191
    """
192
    try:
193
        # Convert resources in the format expected by Quotaholder
194
        qh_resources = prepare_qh_resources(resource)
195
        if delete:
196
            qh_resources = reverse_quantities(qh_resources)
197

  
198
        # Issue commission and get the assigned serial
199
        serial = issue_commission(resource.userid, DEFAULT_SOURCE,
200
                                  qh_resources)
201
    except:
202
        transaction.rollback()
203
        raise
204

  
205
    try:
206
        # Mark the serial as one to accept. This step is necessary for
207
        # reconciliation
208
        serial.pending = False
209
        serial.accept = True
210
        serial.save()
211

  
212
        # Associate serial with the resource
213
        resource.serial = serial
214
        resource.save()
215

  
216
        # Commit transaction in the DB! If this commit succeeds, then the
217
        # serial is created in the DB with all the necessary information to
218
        # reconcile commission
219
        transaction.commit()
220
    except:
221
        transaction.rollback()
222
        serial.pending = False
223
        serial.accept = False
224
        serial.save()
225
        transaction.commit()
226
        raise
227

  
228
    if serial.accept:
229
        # Accept commission to Quotaholder
230
        accept_commissions(accepted=[serial.serial])
231
    else:
232
        reject_commissions(rejected=[serial.serial])
233

  
234
    # Mark the serial as resolved, indicating that no further actions are
235
    # needed for this serial
236
    serial.resolved = True
237
    serial.save()
238
    transaction.commit()
239

  
240
    return serial
241

  
242

  
243
def prepare_qh_resources(resource):
244
    if isinstance(resource, VirtualMachine):
245
        flavor = resource.flavor
246
        return {'cyclades.vm': 1,
247
                'cyclades.cpu': flavor.cpu,
248
                'cyclades.disk': 1073741824 * flavor.disk,  # flavor.disk in GB
249
                # 'public_ip': 1,
250
                #'disk_template': flavor.disk_template,
251
                'cyclades.ram': 1048576 * flavor.ram}  # flavor.ram is in MB
252
    elif isinstance(resource, Network):
253
        return {"cyclades.network.private": 1}
254
    else:
255
        raise ValueError("Unknown Resource '%s'" % resource)
256

  
257

  
258
def reverse_quantities(resources):
259
    return dict((r, -s) for r, s in resources.items())

Also available in: Unified diff