Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / plankton / backend.py @ 7856a37a

History | View | Annotate | Download (18.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
"""
35
The Plankton attributes are the following:
36
  - checksum: the 'hash' meta
37
  - container_format: stored as a user meta
38
  - created_at: the 'modified' meta of the first version
39
  - deleted_at: the timestamp of the last version
40
  - disk_format: stored as a user meta
41
  - id: the 'uuid' meta
42
  - is_public: True if there is a * entry for the read permission
43
  - location: generated based on the file's path
44
  - name: stored as a user meta
45
  - owner: the file's account
46
  - properties: stored as user meta prefixed with PROPERTY_PREFIX
47
  - size: the 'bytes' meta
48
  - status: stored as a system meta
49
  - store: is always 'pithos'
50
  - updated_at: the 'modified' meta
51
"""
52

    
53
import json
54
import warnings
55

    
56
from operator import itemgetter
57
from time import gmtime, strftime
58
from functools import wraps, partial
59

    
60
from django.conf import settings
61

    
62
from pithos.backends.base import NotAllowedError as PithosNotAllowedError
63
import synnefo.lib.astakos as lib_astakos
64
import logging
65

    
66
from synnefo.settings import (CYCLADES_USE_QUOTAHOLDER,
67
                              CYCLADES_QUOTAHOLDER_URL,
68
                              CYCLADES_QUOTAHOLDER_TOKEN)
69

    
70
logger = logging.getLogger(__name__)
71

    
72

    
73
PLANKTON_DOMAIN = 'plankton'
74
PLANKTON_PREFIX = 'plankton:'
75
PROPERTY_PREFIX = 'property:'
76

    
77
PLANKTON_META = ('container_format', 'disk_format', 'name', 'properties',
78
                 'status')
79

    
80
TRANSLATE_UUIDS = getattr(settings, 'TRANSLATE_UUIDS', False)
81

    
82

    
83
def get_displaynames(names):
84
    try:
85
        auth_url = settings.ASTAKOS_URL
86
        url = auth_url.replace('im/authenticate', 'service/api/user_catalogs')
87
        token = settings.CYCLADES_ASTAKOS_SERVICE_TOKEN
88
        uuids = lib_astakos.get_displaynames(token, names, url=url)
89
    except Exception, e:
90
        logger.exception(e)
91
        return {}
92

    
93
    return uuids
94

    
95

    
96
def get_location(account, container, object):
97
    assert '/' not in account, "Invalid account"
98
    assert '/' not in container, "Invalid container"
99
    return 'pithos://%s/%s/%s' % (account, container, object)
100

    
101

    
102
def split_location(location):
103
    """Returns (accout, container, object) from a location string"""
104
    t = location.split('/', 4)
105
    assert len(t) == 5, "Invalid location"
106
    return t[2:5]
107

    
108

    
109
class BackendException(Exception):
110
    pass
111

    
112

    
113
class NotAllowedError(BackendException):
114
    pass
115

    
116

    
117
from pithos.backends.util import PithosBackendPool
118
POOL_SIZE = 8
119
_pithos_backend_pool = \
120
    PithosBackendPool(POOL_SIZE,
121
                      quotaholder_enabled=CYCLADES_USE_QUOTAHOLDER,
122
                      quotaholder_url=CYCLADES_QUOTAHOLDER_URL,
123
                      quotaholder_token=CYCLADES_QUOTAHOLDER_TOKEN,
124
                      db_connection=settings.BACKEND_DB_CONNECTION,
125
                      block_path=settings.BACKEND_BLOCK_PATH)
126

    
127

    
128
def get_pithos_backend():
129
    return _pithos_backend_pool.pool_get()
130

    
131

    
132
def handle_backend_exceptions(func):
133
    @wraps(func)
134
    def wrapper(*args, **kwargs):
135
        try:
136
            return func(*args, **kwargs)
137
        except PithosNotAllowedError:
138
            raise NotAllowedError()
139
    return wrapper
140

    
141

    
142
class ImageBackend(object):
143
    """A wrapper arround the pithos backend to simplify image handling."""
144

    
145
    def __init__(self, user):
146
        self.user = user
147

    
148
        original_filters = warnings.filters
149
        warnings.simplefilter('ignore')         # Suppress SQLAlchemy warnings
150
        self.backend = get_pithos_backend()
151
        warnings.filters = original_filters     # Restore warnings
152

    
153
    @handle_backend_exceptions
154
    def _get_image(self, location):
155
        def format_timestamp(t):
156
            return strftime('%Y-%m-%d %H:%M:%S', gmtime(t))
157

    
158
        account, container, object = split_location(location)
159

    
160
        try:
161
            versions = self.backend.list_versions(self.user, account,
162
                                                  container, object)
163
        except NameError:
164
            return None
165

    
166
        image = {}
167

    
168
        meta = self._get_meta(location)
169
        if meta:
170
            image['deleted_at'] = ''
171
        else:
172
            # Object was deleted, use the latest version
173
            version, timestamp = versions[-1]
174
            meta = self._get_meta(location, version)
175
            image['deleted_at'] = format_timestamp(timestamp)
176

    
177
        if PLANKTON_PREFIX + 'name' not in meta:
178
            return None     # Not a Plankton image
179

    
180
        permissions = self._get_permissions(location)
181

    
182
        image['checksum'] = meta['hash']
183
        image['created_at'] = format_timestamp(versions[0][1])
184
        image['id'] = meta['uuid']
185
        image['is_public'] = '*' in permissions.get('read', [])
186
        image['location'] = location
187
        if TRANSLATE_UUIDS:
188
            displaynames = get_displaynames([account])
189
            if account in displaynames:
190
                display_account = displaynames[account]
191
            else:
192
                display_account = 'unknown'
193
            image['owner'] = display_account
194
        else:
195
            image['owner'] = account
196
        image['size'] = meta['bytes']
197
        image['store'] = 'pithos'
198
        image['updated_at'] = format_timestamp(meta['modified'])
199
        image['properties'] = {}
200

    
201
        for key, val in meta.items():
202
            if not key.startswith(PLANKTON_PREFIX):
203
                continue
204
            key = key[len(PLANKTON_PREFIX):]
205
            if key == 'properties':
206
                val = json.loads(val)
207
            if key in PLANKTON_META:
208
                image[key] = val
209

    
210
        return image
211

    
212
    @handle_backend_exceptions
213
    def _get_meta(self, location, version=None):
214
        account, container, object = split_location(location)
215
        try:
216
            return self.backend.get_object_meta(self.user, account, container,
217
                                                object, PLANKTON_DOMAIN,
218
                                                version)
219
        except NameError:
220
            return None
221

    
222
    @handle_backend_exceptions
223
    def _get_permissions(self, location):
224
        account, container, object = split_location(location)
225
        _a, _p, permissions = self.backend.get_object_permissions(self.user,
226
                                                                  account,
227
                                                                  container,
228
                                                                  object)
229
        return permissions
230

    
231
    @handle_backend_exceptions
232
    def _store(self, f, size=None):
233
        """Breaks data into blocks and stores them in the backend"""
234

    
235
        bytes = 0
236
        hashmap = []
237
        backend = self.backend
238
        blocksize = backend.block_size
239

    
240
        data = f.read(blocksize)
241
        while data:
242
            hash = backend.put_block(data)
243
            hashmap.append(hash)
244
            bytes += len(data)
245
            data = f.read(blocksize)
246

    
247
        if size and size != bytes:
248
            raise BackendException("Invalid size")
249

    
250
        return hashmap, bytes
251

    
252
    @handle_backend_exceptions
253
    def _update(self, location, size, hashmap, meta, permissions):
254
        account, container, object = split_location(location)
255
        self.backend.update_object_hashmap(self.user, account, container,
256
                                           object, size, hashmap, '',
257
                                           PLANKTON_DOMAIN,
258
                                           permissions=permissions)
259
        self._update_meta(location, meta, replace=True)
260

    
261
    @handle_backend_exceptions
262
    def _update_meta(self, location, meta, replace=False):
263
        account, container, object = split_location(location)
264

    
265
        prefixed = {}
266
        for key, val in meta.items():
267
            if key == 'properties':
268
                val = json.dumps(val)
269
            if key in PLANKTON_META:
270
                prefixed[PLANKTON_PREFIX + key] = val
271

    
272
        self.backend.update_object_meta(self.user, account, container, object,
273
                                        PLANKTON_DOMAIN, prefixed, replace)
274

    
275
    @handle_backend_exceptions
276
    def _update_permissions(self, location, permissions):
277
        account, container, object = split_location(location)
278
        self.backend.update_object_permissions(self.user, account, container,
279
                                               object, permissions)
280

    
281
    @handle_backend_exceptions
282
    def add_user(self, image_id, user):
283
        image = self.get_image(image_id)
284
        assert image, "Image not found"
285

    
286
        location = image['location']
287
        permissions = self._get_permissions(location)
288
        read = set(permissions.get('read', []))
289
        read.add(user)
290
        permissions['read'] = list(read)
291
        self._update_permissions(location, permissions)
292

    
293
    def close(self):
294
        self.backend.close()
295

    
296
    @handle_backend_exceptions
297
    def delete(self, image_id):
298
        image = self.get_image(image_id)
299
        account, container, object = split_location(image['location'])
300
        self.backend.delete_object(self.user, account, container, object)
301

    
302
    @handle_backend_exceptions
303
    def get_data(self, location):
304
        account, container, object = split_location(location)
305
        size, hashmap = self.backend.get_object_hashmap(self.user, account,
306
                                                        container, object)
307
        data = ''.join(self.backend.get_block(hash) for hash in hashmap)
308
        assert len(data) == size
309
        return data
310

    
311
    @handle_backend_exceptions
312
    def get_image(self, image_id):
313
        try:
314
            account, container, object = self.backend.get_uuid(self.user,
315
                                                               image_id)
316
        except NameError:
317
            return None
318

    
319
        location = get_location(account, container, object)
320
        return self._get_image(location)
321

    
322
    @handle_backend_exceptions
323
    def _iter(self, public=False, filters=None, shared_from=None):
324
        filters = filters or {}
325

    
326
        # Fix keys
327
        keys = [PLANKTON_PREFIX + 'name']
328
        size_range = (None, None)
329
        for key, val in filters.items():
330
            if key == 'size_min':
331
                size_range = (val, size_range[1])
332
            elif key == 'size_max':
333
                size_range = (size_range[0], val)
334
            else:
335
                keys.append('%s = %s' % (PLANKTON_PREFIX + key, val))
336

    
337
        backend = self.backend
338
        if shared_from:
339
            # To get shared images, we connect as shared_from member and
340
            # get the list shared by us
341
            user = shared_from
342
        else:
343
            user = None if public else self.user
344
            accounts = backend.list_accounts(user)
345

    
346
        for account in accounts:
347
            for container in backend.list_containers(user, account,
348
                                                     shared=True):
349
                for path, _ in backend.list_objects(user, account, container,
350
                                                    domain=PLANKTON_DOMAIN,
351
                                                    keys=keys, shared=True,
352
                                                    size_range=size_range):
353
                    location = get_location(account, container, path)
354
                    image = self._get_image(location)
355
                    if image:
356
                        yield image
357

    
358
    def iter(self, filters=None):
359
        """Iter over all images available to the user"""
360
        return self._iter(filters=filters)
361

    
362
    def iter_public(self, filters=None):
363
        """Iter over public images"""
364
        return self._iter(public=True, filters=filters)
365

    
366
    def iter_shared(self, filters=None, member=None):
367
        """Iter over images shared to member"""
368
        return self._iter(filters=filters, shared_from=member)
369

    
370
    def list(self, filters=None, params={}):
371
        """Return all images available to the user"""
372
        images = list(self.iter(filters))
373
        key = itemgetter(params.get('sort_key', 'created_at'))
374
        reverse = params.get('sort_dir', 'desc') == 'desc'
375
        images.sort(key=key, reverse=reverse)
376
        return images
377

    
378
    def list_public(self, filters, params={}):
379
        """Return public images"""
380
        images = list(self.iter_public(filters))
381
        key = itemgetter(params.get('sort_key', 'created_at'))
382
        reverse = params.get('sort_dir', 'desc') == 'desc'
383
        images.sort(key=key, reverse=reverse)
384
        return images
385

    
386
    def list_users(self, image_id):
387
        image = self.get_image(image_id)
388
        assert image, "Image not found"
389

    
390
        permissions = self._get_permissions(image['location'])
391
        return [user for user in permissions.get('read', []) if user != '*']
392

    
393
    @handle_backend_exceptions
394
    def put(self, name, f, params):
395
        assert 'checksum' not in params, "Passing a checksum is not supported"
396
        assert 'id' not in params, "Passing an ID is not supported"
397
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
398
        disk_format = params.setdefault('disk_format',
399
                                        settings.DEFAULT_DISK_FORMAT)
400
        assert disk_format in settings.ALLOWED_DISK_FORMATS,\
401
            "Invalid disk_format"
402
        assert params.setdefault('container_format',
403
                settings.DEFAULT_CONTAINER_FORMAT) in \
404
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
405

    
406
        container = settings.DEFAULT_PLANKTON_CONTAINER
407
        filename = params.pop('filename', name)
408
        location = 'pithos://%s/%s/%s' % (self.user, container, filename)
409
        is_public = params.pop('is_public', False)
410
        permissions = {'read': ['*']} if is_public else {}
411
        size = params.pop('size', None)
412

    
413
        hashmap, size = self._store(f, size)
414

    
415
        meta = {}
416
        meta['properties'] = params.pop('properties', {})
417
        meta.update(name=name, status='available', **params)
418

    
419
        self._update(location, size, hashmap, meta, permissions)
420
        return self._get_image(location)
421

    
422
    @handle_backend_exceptions
423
    def register(self, name, location, params):
424
        assert 'id' not in params, "Passing an ID is not supported"
425
        assert location.startswith('pithos://'), "Invalid location"
426
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
427
        assert params.setdefault('disk_format',
428
                settings.DEFAULT_DISK_FORMAT) in \
429
                settings.ALLOWED_DISK_FORMATS, "Invalid disk_format"
430
        assert params.setdefault('container_format',
431
                settings.DEFAULT_CONTAINER_FORMAT) in \
432
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
433

    
434
        # user = self.user
435
        account, container, object = split_location(location)
436

    
437
        meta = self._get_meta(location)
438
        assert meta, "File not found"
439

    
440
        size = int(params.pop('size', meta['bytes']))
441
        if size != meta['bytes']:
442
            raise BackendException("Invalid size")
443

    
444
        checksum = params.pop('checksum', meta['hash'])
445
        if checksum != meta['hash']:
446
            raise BackendException("Invalid checksum")
447

    
448
        is_public = params.pop('is_public', False)
449
        if is_public:
450
            permissions = {'read': ['*']}
451
        else:
452
            permissions = {'read': [self.user]}
453

    
454
        meta = {}
455
        meta['properties'] = params.pop('properties', {})
456
        meta.update(name=name, status='available', **params)
457

    
458
        self._update_meta(location, meta)
459
        self._update_permissions(location, permissions)
460
        return self._get_image(location)
461

    
462
    @handle_backend_exceptions
463
    def remove_user(self, image_id, user):
464
        image = self.get_image(image_id)
465
        assert image, "Image not found"
466

    
467
        location = image['location']
468
        permissions = self._get_permissions(location)
469
        try:
470
            permissions.get('read', []).remove(user)
471
        except ValueError:
472
            return      # User did not have access anyway
473
        self._update_permissions(location, permissions)
474

    
475
    @handle_backend_exceptions
476
    def replace_users(self, image_id, users):
477
        image = self.get_image(image_id)
478
        assert image, "Image not found"
479

    
480
        location = image['location']
481
        permissions = self._get_permissions(location)
482
        permissions['read'] = users
483
        if image.get('is_public', False):
484
            permissions['read'].append('*')
485
        self._update_permissions(location, permissions)
486

    
487
    @handle_backend_exceptions
488
    def update(self, image_id, params):
489
        image = self.get_image(image_id)
490
        assert image, "Image not found"
491

    
492
        location = image['location']
493
        is_public = params.pop('is_public', None)
494
        if is_public is not None:
495
            permissions = self._get_permissions(location)
496
            read = set(permissions.get('read', []))
497
            if is_public:
498
                read.add('*')
499
            else:
500
                read.discard('*')
501
            permissions['read'] = list(read)
502
            self.backend._update_permissions(location, permissions)
503

    
504
        meta = {}
505
        meta['properties'] = params.pop('properties', {})
506
        meta.update(**params)
507

    
508
        self._update_meta(location, meta)
509
        return self.get_image(image_id)