Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / plankton / backend.py @ f4366b6c

History | View | Annotate | Download (18.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
"""
35
The Plankton attributes are the following:
36
  - checksum: the 'hash' meta
37
  - container_format: stored as a user meta
38
  - created_at: the 'modified' meta of the first version
39
  - deleted_at: the timestamp of the last version
40
  - disk_format: stored as a user meta
41
  - id: the 'uuid' meta
42
  - is_public: True if there is a * entry for the read permission
43
  - location: generated based on the file's path
44
  - name: stored as a user meta
45
  - owner: the file's account
46
  - properties: stored as user meta prefixed with PROPERTY_PREFIX
47
  - size: the 'bytes' meta
48
  - status: stored as a system meta
49
  - store: is always 'pithos'
50
  - updated_at: the 'modified' meta
51
"""
52

    
53
import json
54
import warnings
55

    
56
from operator import itemgetter
57
from time import gmtime, strftime
58
from functools import wraps, partial
59

    
60
from django.conf import settings
61

    
62
from pithos.backends.base import NotAllowedError as PithosNotAllowedError
63
import synnefo.lib.astakos as lib_astakos
64
import logging
65

    
66
from synnefo.settings import (CYCLADES_USE_QUOTAHOLDER,
67
                              CYCLADES_QUOTAHOLDER_URL,
68
                              CYCLADES_QUOTAHOLDER_TOKEN)
69

    
70
logger = logging.getLogger(__name__)
71

    
72

    
73
PLANKTON_DOMAIN = 'plankton'
74
PLANKTON_PREFIX = 'plankton:'
75
PROPERTY_PREFIX = 'property:'
76

    
77
PLANKTON_META = ('container_format', 'disk_format', 'name', 'properties',
78
                 'status')
79

    
80
TRANSLATE_UUIDS = getattr(settings, 'TRANSLATE_UUIDS', False)
81

    
82
def get_displaynames(names):
83
    try:
84
        auth_url = settings.ASTAKOS_URL
85
        url = auth_url.replace('im/authenticate', 'service/api/user_catalogs')
86
        token = settings.CYCLADES_ASTAKOS_SERVICE_TOKEN
87
        uuids = lib_astakos.get_displaynames(token, names, url=url)
88
    except Exception, e:
89
        logger.exception(e)
90
        return {}
91

    
92
    return uuids
93

    
94

    
95
def get_location(account, container, object):
96
    assert '/' not in account, "Invalid account"
97
    assert '/' not in container, "Invalid container"
98
    return 'pithos://%s/%s/%s' % (account, container, object)
99

    
100

    
101
def split_location(location):
102
    """Returns (accout, container, object) from a location string"""
103
    t = location.split('/', 4)
104
    assert len(t) == 5, "Invalid location"
105
    return t[2:5]
106

    
107

    
108
class BackendException(Exception):
109
    pass
110

    
111

    
112
class NotAllowedError(BackendException):
113
    pass
114

    
115

    
116
from pithos.backends.util import PithosBackendPool
117
POOL_SIZE = 8
118
_pithos_backend_pool = \
119
    PithosBackendPool(POOL_SIZE,
120
                      quotaholder_enabled=CYCLADES_USE_QUOTAHOLDER,
121
                      quotaholder_url=CYCLADES_QUOTAHOLDER_URL,
122
                      quotaholder_token=CYCLADES_QUOTAHOLDER_TOKEN,
123
                      db_connection=settings.BACKEND_DB_CONNECTION,
124
                      block_path=settings.BACKEND_BLOCK_PATH)
125

    
126

    
127
def get_pithos_backend():
128
    return _pithos_backend_pool.pool_get()
129

    
130

    
131
def handle_backend_exceptions(func):
132
    @wraps(func)
133
    def wrapper(*args, **kwargs):
134
        try:
135
            return func(*args, **kwargs)
136
        except PithosNotAllowedError:
137
            raise NotAllowedError()
138
    return wrapper
139

    
140

    
141
class ImageBackend(object):
142
    """A wrapper arround the pithos backend to simplify image handling."""
143

    
144
    def __init__(self, user):
145
        self.user = user
146

    
147
        original_filters = warnings.filters
148
        warnings.simplefilter('ignore')         # Suppress SQLAlchemy warnings
149
        self.backend = get_pithos_backend()
150
        warnings.filters = original_filters     # Restore warnings
151

    
152
    @handle_backend_exceptions
153
    def _get_image(self, location):
154
        def format_timestamp(t):
155
            return strftime('%Y-%m-%d %H:%M:%S', gmtime(t))
156

    
157
        account, container, object = split_location(location)
158

    
159
        try:
160
            versions = self.backend.list_versions(self.user, account,
161
                                                  container, object)
162
        except NameError:
163
            return None
164

    
165
        image = {}
166

    
167
        meta = self._get_meta(location)
168
        if meta:
169
            image['deleted_at'] = ''
170
        else:
171
            # Object was deleted, use the latest version
172
            version, timestamp = versions[-1]
173
            meta = self._get_meta(location, version)
174
            image['deleted_at'] = format_timestamp(timestamp)
175

    
176
        if PLANKTON_PREFIX + 'name' not in meta:
177
            return None     # Not a Plankton image
178

    
179
        permissions = self._get_permissions(location)
180

    
181
        image['checksum'] = meta['hash']
182
        image['created_at'] = format_timestamp(versions[0][1])
183
        image['id'] = meta['uuid']
184
        image['is_public'] = '*' in permissions.get('read', [])
185
        image['location'] = location
186
        if TRANSLATE_UUIDS:
187
            displaynames = get_displaynames([account])
188
            if account in displaynames:
189
                display_account = displaynames[account]
190
            else:
191
                display_account = 'unknown'
192
            image['owner'] = display_account
193
        else:
194
            image['owner'] = account
195
        image['size'] = meta['bytes']
196
        image['store'] = 'pithos'
197
        image['updated_at'] = format_timestamp(meta['modified'])
198
        image['properties'] = {}
199

    
200
        for key, val in meta.items():
201
            if not key.startswith(PLANKTON_PREFIX):
202
                continue
203
            key = key[len(PLANKTON_PREFIX):]
204
            if key == 'properties':
205
                val = json.loads(val)
206
            if key in PLANKTON_META:
207
                image[key] = val
208

    
209
        return image
210

    
211
    @handle_backend_exceptions
212
    def _get_meta(self, location, version=None):
213
        account, container, object = split_location(location)
214
        try:
215
            return self.backend.get_object_meta(self.user, account, container,
216
                                                object, PLANKTON_DOMAIN,
217
                                                version)
218
        except NameError:
219
            return None
220

    
221
    @handle_backend_exceptions
222
    def _get_permissions(self, location):
223
        account, container, object = split_location(location)
224
        _a, _p, permissions = self.backend.get_object_permissions(self.user,
225
                                                                  account,
226
                                                                  container,
227
                                                                  object)
228
        return permissions
229

    
230
    @handle_backend_exceptions
231
    def _store(self, f, size=None):
232
        """Breaks data into blocks and stores them in the backend"""
233

    
234
        bytes = 0
235
        hashmap = []
236
        backend = self.backend
237
        blocksize = backend.block_size
238

    
239
        data = f.read(blocksize)
240
        while data:
241
            hash = backend.put_block(data)
242
            hashmap.append(hash)
243
            bytes += len(data)
244
            data = f.read(blocksize)
245

    
246
        if size and size != bytes:
247
            raise BackendException("Invalid size")
248

    
249
        return hashmap, bytes
250

    
251
    @handle_backend_exceptions
252
    def _update(self, location, size, hashmap, meta, permissions):
253
        account, container, object = split_location(location)
254
        self.backend.update_object_hashmap(self.user, account, container,
255
                                           object, size, hashmap, '',
256
                                           PLANKTON_DOMAIN,
257
                                           permissions=permissions)
258
        self._update_meta(location, meta, replace=True)
259

    
260
    @handle_backend_exceptions
261
    def _update_meta(self, location, meta, replace=False):
262
        account, container, object = split_location(location)
263

    
264
        prefixed = {}
265
        for key, val in meta.items():
266
            if key == 'properties':
267
                val = json.dumps(val)
268
            if key in PLANKTON_META:
269
                prefixed[PLANKTON_PREFIX + key] = val
270

    
271
        self.backend.update_object_meta(self.user, account, container, object,
272
                                        PLANKTON_DOMAIN, prefixed, replace)
273

    
274
    @handle_backend_exceptions
275
    def _update_permissions(self, location, permissions):
276
        account, container, object = split_location(location)
277
        self.backend.update_object_permissions(self.user, account, container,
278
                                               object, permissions)
279

    
280
    @handle_backend_exceptions
281
    def add_user(self, image_id, user):
282
        image = self.get_image(image_id)
283
        assert image, "Image not found"
284

    
285
        location = image['location']
286
        permissions = self._get_permissions(location)
287
        read = set(permissions.get('read', []))
288
        read.add(user)
289
        permissions['read'] = list(read)
290
        self._update_permissions(location, permissions)
291

    
292
    def close(self):
293
        self.backend.close()
294

    
295
    @handle_backend_exceptions
296
    def delete(self, image_id):
297
        image = self.get_image(image_id)
298
        account, container, object = split_location(image['location'])
299
        self.backend.delete_object(self.user, account, container, object)
300

    
301
    @handle_backend_exceptions
302
    def get_data(self, location):
303
        account, container, object = split_location(location)
304
        size, hashmap = self.backend.get_object_hashmap(self.user, account,
305
                                                        container, object)
306
        data = ''.join(self.backend.get_block(hash) for hash in hashmap)
307
        assert len(data) == size
308
        return data
309

    
310
    @handle_backend_exceptions
311
    def get_image(self, image_id):
312
        try:
313
            account, container, object = self.backend.get_uuid(self.user,
314
                                                               image_id)
315
        except NameError:
316
            return None
317

    
318
        location = get_location(account, container, object)
319
        return self._get_image(location)
320

    
321
    @handle_backend_exceptions
322
    def _iter(self, public=False, filters=None, shared_from=None):
323
        filters = filters or {}
324

    
325
        # Fix keys
326
        keys = [PLANKTON_PREFIX + 'name']
327
        size_range = (None, None)
328
        for key, val in filters.items():
329
            if key == 'size_min':
330
                size_range = (val, size_range[1])
331
            elif key == 'size_max':
332
                size_range = (size_range[0], val)
333
            else:
334
                keys.append('%s = %s' % (PLANKTON_PREFIX + key, val))
335

    
336
        backend = self.backend
337
        if shared_from:
338
            # To get shared images, we connect as shared_from member and
339
            # get the list shared by us
340
            user = shared_from
341
        else:
342
            user = None if public else self.user
343
            accounts = backend.list_accounts(user)
344

    
345
        for account in accounts:
346
            for container in backend.list_containers(user, account,
347
                                                     shared=True):
348
                for path, _ in backend.list_objects(user, account, container,
349
                                                    domain=PLANKTON_DOMAIN,
350
                                                    keys=keys, shared=True,
351
                                                    size_range=size_range):
352
                    location = get_location(account, container, path)
353
                    image = self._get_image(location)
354
                    if image:
355
                        yield image
356

    
357
    def iter(self, filters=None):
358
        """Iter over all images available to the user"""
359
        return self._iter(filters=filters)
360

    
361
    def iter_public(self, filters=None):
362
        """Iter over public images"""
363
        return self._iter(public=True, filters=filters)
364

    
365
    def iter_shared(self, filters=None, member=None):
366
        """Iter over images shared to member"""
367
        return self._iter(filters=filters, shared_from=member)
368

    
369
    def list(self, filters=None, params={}):
370
        """Return all images available to the user"""
371
        images = list(self.iter(filters))
372
        key = itemgetter(params.get('sort_key', 'created_at'))
373
        reverse = params.get('sort_dir', 'desc') == 'desc'
374
        images.sort(key=key, reverse=reverse)
375
        return images
376

    
377
    def list_public(self, filters, params={}):
378
        """Return public images"""
379
        images = list(self.iter_public(filters))
380
        key = itemgetter(params.get('sort_key', 'created_at'))
381
        reverse = params.get('sort_dir', 'desc') == 'desc'
382
        images.sort(key=key, reverse=reverse)
383
        return images
384

    
385
    def list_users(self, image_id):
386
        image = self.get_image(image_id)
387
        assert image, "Image not found"
388

    
389
        permissions = self._get_permissions(image['location'])
390
        return [user for user in permissions.get('read', []) if user != '*']
391

    
392
    @handle_backend_exceptions
393
    def put(self, name, f, params):
394
        assert 'checksum' not in params, "Passing a checksum is not supported"
395
        assert 'id' not in params, "Passing an ID is not supported"
396
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
397
        disk_format = params.setdefault('disk_format',
398
                                        settings.DEFAULT_DISK_FORMAT)
399
        assert disk_format in settings.ALLOWED_DISK_FORMATS,\
400
            "Invalid disk_format"
401
        assert params.setdefault('container_format',
402
                settings.DEFAULT_CONTAINER_FORMAT) in \
403
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
404

    
405
        container = settings.DEFAULT_PLANKTON_CONTAINER
406
        filename = params.pop('filename', name)
407
        location = 'pithos://%s/%s/%s' % (self.user, container, filename)
408
        is_public = params.pop('is_public', False)
409
        permissions = {'read': ['*']} if is_public else {}
410
        size = params.pop('size', None)
411

    
412
        hashmap, size = self._store(f, size)
413

    
414
        meta = {}
415
        meta['properties'] = params.pop('properties', {})
416
        meta.update(name=name, status='available', **params)
417

    
418
        self._update(location, size, hashmap, meta, permissions)
419
        return self._get_image(location)
420

    
421
    @handle_backend_exceptions
422
    def register(self, name, location, params):
423
        assert 'id' not in params, "Passing an ID is not supported"
424
        assert location.startswith('pithos://'), "Invalid location"
425
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
426
        assert params.setdefault('disk_format',
427
                settings.DEFAULT_DISK_FORMAT) in \
428
                settings.ALLOWED_DISK_FORMATS, "Invalid disk_format"
429
        assert params.setdefault('container_format',
430
                settings.DEFAULT_CONTAINER_FORMAT) in \
431
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
432

    
433
        # user = self.user
434
        account, container, object = split_location(location)
435

    
436
        meta = self._get_meta(location)
437
        assert meta, "File not found"
438

    
439
        size = int(params.pop('size', meta['bytes']))
440
        if size != meta['bytes']:
441
            raise BackendException("Invalid size")
442

    
443
        checksum = params.pop('checksum', meta['hash'])
444
        if checksum != meta['hash']:
445
            raise BackendException("Invalid checksum")
446

    
447
        is_public = params.pop('is_public', False)
448
        if is_public:
449
            permissions = {'read': ['*']}
450
        else:
451
            permissions = {'read': [self.user]}
452

    
453
        meta = {}
454
        meta['properties'] = params.pop('properties', {})
455
        meta.update(name=name, status='available', **params)
456

    
457
        self._update_meta(location, meta)
458
        self._update_permissions(location, permissions)
459
        return self._get_image(location)
460

    
461
    @handle_backend_exceptions
462
    def remove_user(self, image_id, user):
463
        image = self.get_image(image_id)
464
        assert image, "Image not found"
465

    
466
        location = image['location']
467
        permissions = self._get_permissions(location)
468
        try:
469
            permissions.get('read', []).remove(user)
470
        except ValueError:
471
            return      # User did not have access anyway
472
        self._update_permissions(location, permissions)
473

    
474
    @handle_backend_exceptions
475
    def replace_users(self, image_id, users):
476
        image = self.get_image(image_id)
477
        assert image, "Image not found"
478

    
479
        location = image['location']
480
        permissions = self._get_permissions(location)
481
        permissions['read'] = users
482
        if image.get('is_public', False):
483
            permissions['read'].append('*')
484
        self._update_permissions(location, permissions)
485

    
486
    @handle_backend_exceptions
487
    def update(self, image_id, params):
488
        image = self.get_image(image_id)
489
        assert image, "Image not found"
490

    
491
        location = image['location']
492
        is_public = params.pop('is_public', None)
493
        if is_public is not None:
494
            permissions = self._get_permissions(location)
495
            read = set(permissions.get('read', []))
496
            if is_public:
497
                read.add('*')
498
            else:
499
                read.discard('*')
500
            permissions['read'] = list(read)
501
            self.backend._update_permissions(location, permissions)
502

    
503
        meta = {}
504
        meta['properties'] = params.pop('properties', {})
505
        meta.update(**params)
506

    
507
        self._update_meta(location, meta)
508
        return self.get_image(image_id)