Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / plankton / backend.py @ b336e6fa

History | View | Annotate | Download (18.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
"""
35
The Plankton attributes are the following:
36
  - checksum: the 'hash' meta
37
  - container_format: stored as a user meta
38
  - created_at: the 'modified' meta of the first version
39
  - deleted_at: the timestamp of the last version
40
  - disk_format: stored as a user meta
41
  - id: the 'uuid' meta
42
  - is_public: True if there is a * entry for the read permission
43
  - location: generated based on the file's path
44
  - name: stored as a user meta
45
  - owner: the file's account
46
  - properties: stored as user meta prefixed with PROPERTY_PREFIX
47
  - size: the 'bytes' meta
48
  - status: stored as a system meta
49
  - store: is always 'pithos'
50
  - updated_at: the 'modified' meta
51
"""
52

    
53
import json
54
import warnings
55

    
56
from operator import itemgetter
57
from time import gmtime, strftime
58
from functools import wraps, partial
59

    
60
from django.conf import settings
61

    
62
from pithos.backends.base import NotAllowedError as PithosNotAllowedError
63
import synnefo.lib.astakos as lib_astakos
64
import logging
65

    
66
from synnefo.settings import (CYCLADES_USE_QUOTAHOLDER,
67
                              CYCLADES_QUOTAHOLDER_URL,
68
                              CYCLADES_QUOTAHOLDER_TOKEN,
69
                              CYCLADES_QUOTAHOLDER_POOLSIZE)
70

    
71
logger = logging.getLogger(__name__)
72

    
73

    
74
PLANKTON_DOMAIN = 'plankton'
75
PLANKTON_PREFIX = 'plankton:'
76
PROPERTY_PREFIX = 'property:'
77

    
78
PLANKTON_META = ('container_format', 'disk_format', 'name', 'properties',
79
                 'status')
80

    
81
TRANSLATE_UUIDS = getattr(settings, 'TRANSLATE_UUIDS', False)
82

    
83

    
84
def get_displaynames(names):
85
    try:
86
        auth_url = settings.ASTAKOS_URL
87
        url = auth_url.replace('im/authenticate', 'service/api/user_catalogs')
88
        token = settings.CYCLADES_ASTAKOS_SERVICE_TOKEN
89
        uuids = lib_astakos.get_displaynames(token, names, url=url)
90
    except Exception, e:
91
        logger.exception(e)
92
        return {}
93

    
94
    return uuids
95

    
96

    
97
def get_location(account, container, object):
98
    assert '/' not in account, "Invalid account"
99
    assert '/' not in container, "Invalid container"
100
    return 'pithos://%s/%s/%s' % (account, container, object)
101

    
102

    
103
def split_location(location):
104
    """Returns (accout, container, object) from a location string"""
105
    t = location.split('/', 4)
106
    assert len(t) == 5, "Invalid location"
107
    return t[2:5]
108

    
109

    
110
class BackendException(Exception):
111
    pass
112

    
113

    
114
class NotAllowedError(BackendException):
115
    pass
116

    
117

    
118
from pithos.backends.util import PithosBackendPool
119
POOL_SIZE = 8
120
_pithos_backend_pool = \
121
    PithosBackendPool(
122
        POOL_SIZE,
123
        quotaholder_enabled=CYCLADES_USE_QUOTAHOLDER,
124
        quotaholder_url=CYCLADES_QUOTAHOLDER_URL,
125
        quotaholder_token=CYCLADES_QUOTAHOLDER_TOKEN,
126
        quotaholder_client_poolsize=CYCLADES_QUOTAHOLDER_POOLSIZE,
127
        db_connection=settings.BACKEND_DB_CONNECTION,
128
        block_path=settings.BACKEND_BLOCK_PATH)
129

    
130

    
131
def get_pithos_backend():
132
    return _pithos_backend_pool.pool_get()
133

    
134

    
135
def handle_backend_exceptions(func):
136
    @wraps(func)
137
    def wrapper(*args, **kwargs):
138
        try:
139
            return func(*args, **kwargs)
140
        except PithosNotAllowedError:
141
            raise NotAllowedError()
142
    return wrapper
143

    
144

    
145
class ImageBackend(object):
146
    """A wrapper arround the pithos backend to simplify image handling."""
147

    
148
    def __init__(self, user):
149
        self.user = user
150

    
151
        original_filters = warnings.filters
152
        warnings.simplefilter('ignore')         # Suppress SQLAlchemy warnings
153
        self.backend = get_pithos_backend()
154
        warnings.filters = original_filters     # Restore warnings
155

    
156
    @handle_backend_exceptions
157
    def _get_image(self, location):
158
        def format_timestamp(t):
159
            return strftime('%Y-%m-%d %H:%M:%S', gmtime(t))
160

    
161
        account, container, object = split_location(location)
162

    
163
        try:
164
            versions = self.backend.list_versions(self.user, account,
165
                                                  container, object)
166
        except NameError:
167
            return None
168

    
169
        image = {}
170

    
171
        meta = self._get_meta(location)
172
        if meta:
173
            image['deleted_at'] = ''
174
        else:
175
            # Object was deleted, use the latest version
176
            version, timestamp = versions[-1]
177
            meta = self._get_meta(location, version)
178
            image['deleted_at'] = format_timestamp(timestamp)
179

    
180
        if PLANKTON_PREFIX + 'name' not in meta:
181
            return None     # Not a Plankton image
182

    
183
        permissions = self._get_permissions(location)
184

    
185
        image['checksum'] = meta['hash']
186
        image['created_at'] = format_timestamp(versions[0][1])
187
        image['id'] = meta['uuid']
188
        image['is_public'] = '*' in permissions.get('read', [])
189
        image['location'] = location
190
        if TRANSLATE_UUIDS:
191
            displaynames = get_displaynames([account])
192
            if account in displaynames:
193
                display_account = displaynames[account]
194
            else:
195
                display_account = 'unknown'
196
            image['owner'] = display_account
197
        else:
198
            image['owner'] = account
199
        image['size'] = meta['bytes']
200
        image['store'] = 'pithos'
201
        image['updated_at'] = format_timestamp(meta['modified'])
202
        image['properties'] = {}
203

    
204
        for key, val in meta.items():
205
            if not key.startswith(PLANKTON_PREFIX):
206
                continue
207
            key = key[len(PLANKTON_PREFIX):]
208
            if key == 'properties':
209
                val = json.loads(val)
210
            if key in PLANKTON_META:
211
                image[key] = val
212

    
213
        return image
214

    
215
    @handle_backend_exceptions
216
    def _get_meta(self, location, version=None):
217
        account, container, object = split_location(location)
218
        try:
219
            return self.backend.get_object_meta(self.user, account, container,
220
                                                object, PLANKTON_DOMAIN,
221
                                                version)
222
        except NameError:
223
            return None
224

    
225
    @handle_backend_exceptions
226
    def _get_permissions(self, location):
227
        account, container, object = split_location(location)
228
        _a, _p, permissions = self.backend.get_object_permissions(self.user,
229
                                                                  account,
230
                                                                  container,
231
                                                                  object)
232
        return permissions
233

    
234
    @handle_backend_exceptions
235
    def _store(self, f, size=None):
236
        """Breaks data into blocks and stores them in the backend"""
237

    
238
        bytes = 0
239
        hashmap = []
240
        backend = self.backend
241
        blocksize = backend.block_size
242

    
243
        data = f.read(blocksize)
244
        while data:
245
            hash = backend.put_block(data)
246
            hashmap.append(hash)
247
            bytes += len(data)
248
            data = f.read(blocksize)
249

    
250
        if size and size != bytes:
251
            raise BackendException("Invalid size")
252

    
253
        return hashmap, bytes
254

    
255
    @handle_backend_exceptions
256
    def _update(self, location, size, hashmap, meta, permissions):
257
        account, container, object = split_location(location)
258
        self.backend.update_object_hashmap(self.user, account, container,
259
                                           object, size, hashmap, '',
260
                                           PLANKTON_DOMAIN,
261
                                           permissions=permissions)
262
        self._update_meta(location, meta, replace=True)
263

    
264
    @handle_backend_exceptions
265
    def _update_meta(self, location, meta, replace=False):
266
        account, container, object = split_location(location)
267

    
268
        prefixed = {}
269
        for key, val in meta.items():
270
            if key == 'properties':
271
                val = json.dumps(val)
272
            if key in PLANKTON_META:
273
                prefixed[PLANKTON_PREFIX + key] = val
274

    
275
        self.backend.update_object_meta(self.user, account, container, object,
276
                                        PLANKTON_DOMAIN, prefixed, replace)
277

    
278
    @handle_backend_exceptions
279
    def _update_permissions(self, location, permissions):
280
        account, container, object = split_location(location)
281
        self.backend.update_object_permissions(self.user, account, container,
282
                                               object, permissions)
283

    
284
    @handle_backend_exceptions
285
    def add_user(self, image_id, user):
286
        image = self.get_image(image_id)
287
        assert image, "Image not found"
288

    
289
        location = image['location']
290
        permissions = self._get_permissions(location)
291
        read = set(permissions.get('read', []))
292
        read.add(user)
293
        permissions['read'] = list(read)
294
        self._update_permissions(location, permissions)
295

    
296
    def close(self):
297
        self.backend.close()
298

    
299
    @handle_backend_exceptions
300
    def delete(self, image_id):
301
        image = self.get_image(image_id)
302
        account, container, object = split_location(image['location'])
303
        self.backend.delete_object(self.user, account, container, object)
304

    
305
    @handle_backend_exceptions
306
    def get_data(self, location):
307
        account, container, object = split_location(location)
308
        size, hashmap = self.backend.get_object_hashmap(self.user, account,
309
                                                        container, object)
310
        data = ''.join(self.backend.get_block(hash) for hash in hashmap)
311
        assert len(data) == size
312
        return data
313

    
314
    @handle_backend_exceptions
315
    def get_image(self, image_id):
316
        try:
317
            account, container, object = self.backend.get_uuid(self.user,
318
                                                               image_id)
319
        except NameError:
320
            return None
321

    
322
        location = get_location(account, container, object)
323
        return self._get_image(location)
324

    
325
    @handle_backend_exceptions
326
    def _iter(self, public=False, filters=None, shared_from=None):
327
        filters = filters or {}
328

    
329
        # Fix keys
330
        keys = [PLANKTON_PREFIX + 'name']
331
        size_range = (None, None)
332
        for key, val in filters.items():
333
            if key == 'size_min':
334
                size_range = (val, size_range[1])
335
            elif key == 'size_max':
336
                size_range = (size_range[0], val)
337
            else:
338
                keys.append('%s = %s' % (PLANKTON_PREFIX + key, val))
339

    
340
        backend = self.backend
341
        if shared_from:
342
            # To get shared images, we connect as shared_from member and
343
            # get the list shared by us
344
            user = shared_from
345
        else:
346
            user = None if public else self.user
347
            accounts = backend.list_accounts(user)
348

    
349
        for account in accounts:
350
            for container in backend.list_containers(user, account,
351
                                                     shared=True):
352
                for path, _ in backend.list_objects(user, account, container,
353
                                                    domain=PLANKTON_DOMAIN,
354
                                                    keys=keys, shared=True,
355
                                                    size_range=size_range):
356
                    location = get_location(account, container, path)
357
                    image = self._get_image(location)
358
                    if image:
359
                        yield image
360

    
361
    def iter(self, filters=None):
362
        """Iter over all images available to the user"""
363
        return self._iter(filters=filters)
364

    
365
    def iter_public(self, filters=None):
366
        """Iter over public images"""
367
        return self._iter(public=True, filters=filters)
368

    
369
    def iter_shared(self, filters=None, member=None):
370
        """Iter over images shared to member"""
371
        return self._iter(filters=filters, shared_from=member)
372

    
373
    def list(self, filters=None, params={}):
374
        """Return all images available to the user"""
375
        images = list(self.iter(filters))
376
        key = itemgetter(params.get('sort_key', 'created_at'))
377
        reverse = params.get('sort_dir', 'desc') == 'desc'
378
        images.sort(key=key, reverse=reverse)
379
        return images
380

    
381
    def list_public(self, filters, params={}):
382
        """Return public images"""
383
        images = list(self.iter_public(filters))
384
        key = itemgetter(params.get('sort_key', 'created_at'))
385
        reverse = params.get('sort_dir', 'desc') == 'desc'
386
        images.sort(key=key, reverse=reverse)
387
        return images
388

    
389
    def list_users(self, image_id):
390
        image = self.get_image(image_id)
391
        assert image, "Image not found"
392

    
393
        permissions = self._get_permissions(image['location'])
394
        return [user for user in permissions.get('read', []) if user != '*']
395

    
396
    @handle_backend_exceptions
397
    def put(self, name, f, params):
398
        assert 'checksum' not in params, "Passing a checksum is not supported"
399
        assert 'id' not in params, "Passing an ID is not supported"
400
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
401
        disk_format = params.setdefault('disk_format',
402
                                        settings.DEFAULT_DISK_FORMAT)
403
        assert disk_format in settings.ALLOWED_DISK_FORMATS,\
404
            "Invalid disk_format"
405
        assert params.setdefault('container_format',
406
                settings.DEFAULT_CONTAINER_FORMAT) in \
407
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
408

    
409
        container = settings.DEFAULT_PLANKTON_CONTAINER
410
        filename = params.pop('filename', name)
411
        location = 'pithos://%s/%s/%s' % (self.user, container, filename)
412
        is_public = params.pop('is_public', False)
413
        permissions = {'read': ['*']} if is_public else {}
414
        size = params.pop('size', None)
415

    
416
        hashmap, size = self._store(f, size)
417

    
418
        meta = {}
419
        meta['properties'] = params.pop('properties', {})
420
        meta.update(name=name, status='available', **params)
421

    
422
        self._update(location, size, hashmap, meta, permissions)
423
        return self._get_image(location)
424

    
425
    @handle_backend_exceptions
426
    def register(self, name, location, params):
427
        assert 'id' not in params, "Passing an ID is not supported"
428
        assert location.startswith('pithos://'), "Invalid location"
429
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
430
        assert params.setdefault('disk_format',
431
                settings.DEFAULT_DISK_FORMAT) in \
432
                settings.ALLOWED_DISK_FORMATS, "Invalid disk_format"
433
        assert params.setdefault('container_format',
434
                settings.DEFAULT_CONTAINER_FORMAT) in \
435
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
436

    
437
        # user = self.user
438
        account, container, object = split_location(location)
439

    
440
        meta = self._get_meta(location)
441
        assert meta, "File not found"
442

    
443
        size = int(params.pop('size', meta['bytes']))
444
        if size != meta['bytes']:
445
            raise BackendException("Invalid size")
446

    
447
        checksum = params.pop('checksum', meta['hash'])
448
        if checksum != meta['hash']:
449
            raise BackendException("Invalid checksum")
450

    
451
        is_public = params.pop('is_public', False)
452
        if is_public:
453
            permissions = {'read': ['*']}
454
        else:
455
            permissions = {'read': [self.user]}
456

    
457
        meta = {}
458
        meta['properties'] = params.pop('properties', {})
459
        meta.update(name=name, status='available', **params)
460

    
461
        self._update_meta(location, meta)
462
        self._update_permissions(location, permissions)
463
        return self._get_image(location)
464

    
465
    @handle_backend_exceptions
466
    def remove_user(self, image_id, user):
467
        image = self.get_image(image_id)
468
        assert image, "Image not found"
469

    
470
        location = image['location']
471
        permissions = self._get_permissions(location)
472
        try:
473
            permissions.get('read', []).remove(user)
474
        except ValueError:
475
            return      # User did not have access anyway
476
        self._update_permissions(location, permissions)
477

    
478
    @handle_backend_exceptions
479
    def replace_users(self, image_id, users):
480
        image = self.get_image(image_id)
481
        assert image, "Image not found"
482

    
483
        location = image['location']
484
        permissions = self._get_permissions(location)
485
        permissions['read'] = users
486
        if image.get('is_public', False):
487
            permissions['read'].append('*')
488
        self._update_permissions(location, permissions)
489

    
490
    @handle_backend_exceptions
491
    def update(self, image_id, params):
492
        image = self.get_image(image_id)
493
        assert image, "Image not found"
494

    
495
        location = image['location']
496
        is_public = params.pop('is_public', None)
497
        if is_public is not None:
498
            permissions = self._get_permissions(location)
499
            read = set(permissions.get('read', []))
500
            if is_public:
501
                read.add('*')
502
            else:
503
                read.discard('*')
504
            permissions['read'] = list(read)
505
            self.backend._update_permissions(location, permissions)
506

    
507
        meta = {}
508
        meta['properties'] = params.pop('properties', {})
509
        meta.update(**params)
510

    
511
        self._update_meta(location, meta)
512
        return self.get_image(image_id)