Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / plankton / backend.py @ bd40abfa

History | View | Annotate | Download (19.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
"""
35
The Plankton attributes are the following:
36
  - checksum: the 'hash' meta
37
  - container_format: stored as a user meta
38
  - created_at: the 'modified' meta of the first version
39
  - deleted_at: the timestamp of the last version
40
  - disk_format: stored as a user meta
41
  - id: the 'uuid' meta
42
  - is_public: True if there is a * entry for the read permission
43
  - location: generated based on the file's path
44
  - name: stored as a user meta
45
  - owner: the file's account
46
  - properties: stored as user meta prefixed with PROPERTY_PREFIX
47
  - size: the 'bytes' meta
48
  - status: stored as a system meta
49
  - store: is always 'pithos'
50
  - updated_at: the 'modified' meta
51
"""
52

    
53
import json
54
import warnings
55

    
56
from operator import itemgetter
57
from time import gmtime, strftime
58
from functools import wraps, partial
59
from snf_django.lib.api import faults
60

    
61
from django.conf import settings
62

    
63
from pithos.backends.base import NotAllowedError as PithosNotAllowedError
64

    
65
import synnefo.lib.astakos as lib_astakos
66
import logging
67

    
68
from synnefo.settings import (CYCLADES_USE_QUOTAHOLDER,
69
                              CYCLADES_QUOTAHOLDER_URL,
70
                              CYCLADES_QUOTAHOLDER_TOKEN,
71
                              CYCLADES_QUOTAHOLDER_POOLSIZE)
72

    
73
logger = logging.getLogger(__name__)
74

    
75

    
76
PLANKTON_DOMAIN = 'plankton'
77
PLANKTON_PREFIX = 'plankton:'
78
PROPERTY_PREFIX = 'property:'
79

    
80
PLANKTON_META = ('container_format', 'disk_format', 'name', 'properties',
81
                 'status')
82

    
83
TRANSLATE_UUIDS = getattr(settings, 'TRANSLATE_UUIDS', False)
84

    
85

    
86
def get_displaynames(names):
87
    try:
88
        auth_url = settings.ASTAKOS_URL
89
        url = auth_url.replace('im/authenticate', 'service/api/user_catalogs')
90
        token = settings.CYCLADES_ASTAKOS_SERVICE_TOKEN
91
        uuids = lib_astakos.get_displaynames(token, names, url=url)
92
    except Exception, e:
93
        logger.exception(e)
94
        return {}
95

    
96
    return uuids
97

    
98

    
99
def get_location(account, container, object):
100
    assert '/' not in account, "Invalid account"
101
    assert '/' not in container, "Invalid container"
102
    return 'pithos://%s/%s/%s' % (account, container, object)
103

    
104

    
105
def split_location(location):
106
    """Returns (accout, container, object) from a location string"""
107
    t = location.split('/', 4)
108
    assert len(t) == 5, "Invalid location"
109
    return t[2:5]
110

    
111

    
112
class BackendException(Exception):
113
    pass
114

    
115

    
116
class NotAllowedError(BackendException):
117
    pass
118

    
119

    
120
from pithos.backends.util import PithosBackendPool
121
POOL_SIZE = 8
122
_pithos_backend_pool = \
123
    PithosBackendPool(
124
        POOL_SIZE,
125
        quotaholder_enabled=CYCLADES_USE_QUOTAHOLDER,
126
        quotaholder_url=CYCLADES_QUOTAHOLDER_URL,
127
        quotaholder_token=CYCLADES_QUOTAHOLDER_TOKEN,
128
        quotaholder_client_poolsize=CYCLADES_QUOTAHOLDER_POOLSIZE,
129
        db_connection=settings.BACKEND_DB_CONNECTION,
130
        block_path=settings.BACKEND_BLOCK_PATH)
131

    
132

    
133
def get_pithos_backend():
134
    return _pithos_backend_pool.pool_get()
135

    
136

    
137
def handle_backend_exceptions(func):
138
    @wraps(func)
139
    def wrapper(*args, **kwargs):
140
        try:
141
            return func(*args, **kwargs)
142
        except PithosNotAllowedError:
143
            raise NotAllowedError()
144
    return wrapper
145

    
146

    
147
class ImageBackend(object):
148
    """A wrapper arround the pithos backend to simplify image handling."""
149

    
150
    def __init__(self, user):
151
        self.user = user
152

    
153
        original_filters = warnings.filters
154
        warnings.simplefilter('ignore')         # Suppress SQLAlchemy warnings
155
        self.backend = get_pithos_backend()
156
        warnings.filters = original_filters     # Restore warnings
157

    
158
    @handle_backend_exceptions
159
    def _get_image(self, location):
160
        def format_timestamp(t):
161
            return strftime('%Y-%m-%d %H:%M:%S', gmtime(t))
162

    
163
        account, container, object = split_location(location)
164

    
165
        try:
166
            versions = self.backend.list_versions(self.user, account,
167
                                                  container, object)
168
        except NameError:
169
            return None
170

    
171
        image = {}
172

    
173
        meta = self._get_meta(location)
174
        if meta:
175
            image['deleted_at'] = ''
176
        else:
177
            # Object was deleted, use the latest version
178
            version, timestamp = versions[-1]
179
            meta = self._get_meta(location, version)
180
            image['deleted_at'] = format_timestamp(timestamp)
181

    
182
        if PLANKTON_PREFIX + 'name' not in meta:
183
            return None     # Not a Plankton image
184

    
185
        permissions = self._get_permissions(location)
186

    
187
        image['checksum'] = meta['hash']
188
        image['created_at'] = format_timestamp(versions[0][1])
189
        image['id'] = meta['uuid']
190
        image['is_public'] = '*' in permissions.get('read', [])
191
        image['location'] = location
192
        if TRANSLATE_UUIDS:
193
            displaynames = get_displaynames([account])
194
            if account in displaynames:
195
                display_account = displaynames[account]
196
            else:
197
                display_account = 'unknown'
198
            image['owner'] = display_account
199
        else:
200
            image['owner'] = account
201
        image['size'] = meta['bytes']
202
        image['store'] = 'pithos'
203
        image['updated_at'] = format_timestamp(meta['modified'])
204
        image['properties'] = {}
205

    
206
        for key, val in meta.items():
207
            if not key.startswith(PLANKTON_PREFIX):
208
                continue
209
            key = key[len(PLANKTON_PREFIX):]
210
            if key == 'properties':
211
                val = json.loads(val)
212
            if key in PLANKTON_META:
213
                image[key] = val
214

    
215
        return image
216

    
217
    @handle_backend_exceptions
218
    def _get_meta(self, location, version=None):
219
        account, container, object = split_location(location)
220
        try:
221
            return self.backend.get_object_meta(self.user, account, container,
222
                                                object, PLANKTON_DOMAIN,
223
                                                version)
224
        except NameError:
225
            return None
226

    
227
    @handle_backend_exceptions
228
    def _get_permissions(self, location):
229
        account, container, object = split_location(location)
230
        _a, _p, permissions = self.backend.get_object_permissions(self.user,
231
                                                                  account,
232
                                                                  container,
233
                                                                  object)
234
        return permissions
235

    
236
    @handle_backend_exceptions
237
    def _store(self, f, size=None):
238
        """Breaks data into blocks and stores them in the backend"""
239

    
240
        bytes = 0
241
        hashmap = []
242
        backend = self.backend
243
        blocksize = backend.block_size
244

    
245
        data = f.read(blocksize)
246
        while data:
247
            hash = backend.put_block(data)
248
            hashmap.append(hash)
249
            bytes += len(data)
250
            data = f.read(blocksize)
251

    
252
        if size and size != bytes:
253
            raise BackendException("Invalid size")
254

    
255
        return hashmap, bytes
256

    
257
    @handle_backend_exceptions
258
    def _update(self, location, size, hashmap, meta, permissions):
259
        account, container, object = split_location(location)
260
        self.backend.update_object_hashmap(self.user, account, container,
261
                                           object, size, hashmap, '',
262
                                           PLANKTON_DOMAIN,
263
                                           permissions=permissions)
264
        self._update_meta(location, meta, replace=True)
265

    
266
    @handle_backend_exceptions
267
    def _update_meta(self, location, meta, replace=False):
268
        account, container, object = split_location(location)
269

    
270
        prefixed = {}
271
        for key, val in meta.items():
272
            if key == 'properties':
273
                val = json.dumps(val)
274
            if key in PLANKTON_META:
275
                prefixed[PLANKTON_PREFIX + key] = val
276

    
277
        self.backend.update_object_meta(self.user, account, container, object,
278
                                        PLANKTON_DOMAIN, prefixed, replace)
279

    
280
    @handle_backend_exceptions
281
    def _update_permissions(self, location, permissions):
282
        account, container, object = split_location(location)
283
        self.backend.update_object_permissions(self.user, account, container,
284
                                               object, permissions)
285

    
286
    @handle_backend_exceptions
287
    def add_user(self, image_id, user):
288
        image = self.get_image(image_id)
289
        if not image:
290
            raise faults.ItemNotFound
291

    
292
        location = image['location']
293
        permissions = self._get_permissions(location)
294
        read = set(permissions.get('read', []))
295
        read.add(user)
296
        permissions['read'] = list(read)
297
        self._update_permissions(location, permissions)
298

    
299
    def close(self):
300
        self.backend.close()
301

    
302
    @handle_backend_exceptions
303
    def _delete(self, image_id):
304
        """Delete an Image.
305

306
        This method will delete the Image from the Storage backend.
307

308
        """
309
        image = self.get_image(image_id)
310
        account, container, object = split_location(image['location'])
311
        self.backend.delete_object(self.user, account, container, object)
312

    
313
    @handle_backend_exceptions
314
    def get_data(self, location):
315
        account, container, object = split_location(location)
316
        size, hashmap = self.backend.get_object_hashmap(self.user, account,
317
                                                        container, object)
318
        data = ''.join(self.backend.get_block(hash) for hash in hashmap)
319
        assert len(data) == size
320
        return data
321

    
322
    @handle_backend_exceptions
323
    def get_image(self, image_id):
324
        try:
325
            account, container, object = self.backend.get_uuid(self.user,
326
                                                               image_id)
327
        except NameError:
328
            return None
329

    
330
        location = get_location(account, container, object)
331
        return self._get_image(location)
332

    
333
    @handle_backend_exceptions
334
    def _iter(self, public=False, filters=None, shared_from=None):
335
        filters = filters or {}
336

    
337
        # Fix keys
338
        keys = [PLANKTON_PREFIX + 'name']
339
        size_range = (None, None)
340
        for key, val in filters.items():
341
            if key == 'size_min':
342
                size_range = (val, size_range[1])
343
            elif key == 'size_max':
344
                size_range = (size_range[0], val)
345
            else:
346
                keys.append('%s = %s' % (PLANKTON_PREFIX + key, val))
347

    
348
        backend = self.backend
349
        if shared_from:
350
            # To get shared images, we connect as shared_from member and
351
            # get the list shared by us
352
            user = shared_from
353
            accounts = [self.user]
354
        else:
355
            user = None if public else self.user
356
            accounts = backend.list_accounts(user)
357

    
358
        for account in accounts:
359
            for container in backend.list_containers(user, account,
360
                                                     shared=True):
361
                for path, _ in backend.list_objects(user, account, container,
362
                                                    domain=PLANKTON_DOMAIN,
363
                                                    keys=keys, shared=True,
364
                                                    size_range=size_range):
365
                    location = get_location(account, container, path)
366
                    image = self._get_image(location)
367
                    if image:
368
                        yield image
369

    
370
    def iter(self, filters=None):
371
        """Iter over all images available to the user"""
372
        return self._iter(filters=filters)
373

    
374
    def iter_public(self, filters=None):
375
        """Iter over public images"""
376
        return self._iter(public=True, filters=filters)
377

    
378
    def iter_shared(self, filters=None, member=None):
379
        """Iter over images shared to member"""
380
        return self._iter(filters=filters, shared_from=member)
381

    
382
    def list(self, filters=None, params={}):
383
        """Return all images available to the user"""
384
        images = list(self.iter(filters))
385
        key = itemgetter(params.get('sort_key', 'created_at'))
386
        reverse = params.get('sort_dir', 'desc') == 'desc'
387
        images.sort(key=key, reverse=reverse)
388
        return images
389

    
390
    def list_public(self, filters, params={}):
391
        """Return public images"""
392
        images = list(self.iter_public(filters))
393
        key = itemgetter(params.get('sort_key', 'created_at'))
394
        reverse = params.get('sort_dir', 'desc') == 'desc'
395
        images.sort(key=key, reverse=reverse)
396
        return images
397

    
398
    def list_users(self, image_id):
399
        image = self.get_image(image_id)
400
        if not image:
401
            raise faults.ItemNotFound
402

    
403
        permissions = self._get_permissions(image['location'])
404
        return [user for user in permissions.get('read', []) if user != '*']
405

    
406
    @handle_backend_exceptions
407
    def put(self, name, f, params):
408
        assert 'checksum' not in params, "Passing a checksum is not supported"
409
        assert 'id' not in params, "Passing an ID is not supported"
410
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
411
        disk_format = params.setdefault('disk_format',
412
                                        settings.DEFAULT_DISK_FORMAT)
413
        assert disk_format in settings.ALLOWED_DISK_FORMATS,\
414
            "Invalid disk_format"
415
        assert params.setdefault('container_format',
416
                settings.DEFAULT_CONTAINER_FORMAT) in \
417
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
418

    
419
        container = settings.DEFAULT_PLANKTON_CONTAINER
420
        filename = params.pop('filename', name)
421
        location = 'pithos://%s/%s/%s' % (self.user, container, filename)
422
        is_public = params.pop('is_public', False)
423
        permissions = {'read': ['*']} if is_public else {}
424
        size = params.pop('size', None)
425

    
426
        hashmap, size = self._store(f, size)
427

    
428
        meta = {}
429
        meta['properties'] = params.pop('properties', {})
430
        meta.update(name=name, status='available', **params)
431

    
432
        self._update(location, size, hashmap, meta, permissions)
433
        return self._get_image(location)
434

    
435
    @handle_backend_exceptions
436
    def register(self, name, location, params):
437
        assert 'id' not in params, "Passing an ID is not supported"
438
        assert location.startswith('pithos://'), "Invalid location"
439
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
440
        assert params.setdefault('disk_format',
441
                settings.DEFAULT_DISK_FORMAT) in \
442
                settings.ALLOWED_DISK_FORMATS, "Invalid disk_format"
443
        assert params.setdefault('container_format',
444
                settings.DEFAULT_CONTAINER_FORMAT) in \
445
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
446

    
447
        # user = self.user
448
        account, container, object = split_location(location)
449

    
450
        meta = self._get_meta(location)
451
        assert meta, "File not found"
452

    
453
        size = int(params.pop('size', meta['bytes']))
454
        if size != meta['bytes']:
455
            raise BackendException("Invalid size")
456

    
457
        checksum = params.pop('checksum', meta['hash'])
458
        if checksum != meta['hash']:
459
            raise BackendException("Invalid checksum")
460

    
461
        is_public = params.pop('is_public', False)
462
        if is_public:
463
            permissions = {'read': ['*']}
464
        else:
465
            permissions = {'read': [self.user]}
466

    
467
        meta = {}
468
        meta['properties'] = params.pop('properties', {})
469
        meta.update(name=name, status='available', **params)
470

    
471
        self._update_meta(location, meta)
472
        self._update_permissions(location, permissions)
473
        return self._get_image(location)
474

    
475
    @handle_backend_exceptions
476
    def remove_user(self, image_id, user):
477
        image = self.get_image(image_id)
478
        if not image:
479
            raise faults.ItemNotFound
480

    
481
        location = image['location']
482
        permissions = self._get_permissions(location)
483
        try:
484
            permissions.get('read', []).remove(user)
485
        except ValueError:
486
            return      # User did not have access anyway
487
        self._update_permissions(location, permissions)
488

    
489
    @handle_backend_exceptions
490
    def replace_users(self, image_id, users):
491
        image = self.get_image(image_id)
492
        if not image:
493
            raise faults.ItemNotFound
494

    
495
        location = image['location']
496
        permissions = self._get_permissions(location)
497
        permissions['read'] = users
498
        if image.get('is_public', False):
499
            permissions['read'].append('*')
500
        self._update_permissions(location, permissions)
501

    
502
    @handle_backend_exceptions
503
    def update(self, image_id, params):
504
        image = self.get_image(image_id)
505
        assert image, "Image not found"
506
        if not image:
507
            raise faults.ItemNotFound
508

    
509
        location = image['location']
510
        is_public = params.pop('is_public', None)
511
        if is_public is not None:
512
            permissions = self._get_permissions(location)
513
            read = set(permissions.get('read', []))
514
            if is_public:
515
                read.add('*')
516
            else:
517
                read.discard('*')
518
            permissions['read'] = list(read)
519
            self.backend._update_permissions(location, permissions)
520

    
521
        meta = {}
522
        meta['properties'] = params.pop('properties', {})
523
        meta.update(**params)
524

    
525
        self._update_meta(location, meta)
526
        return self.get_image(image_id)
527

    
528
    @handle_backend_exceptions
529
    def unregister(self, image_id):
530
        """Unregister an image."""
531
        image = self.get_image(image_id)
532
        if not image:
533
            raise faults.ItemNotFound
534

    
535
        location = image["location"]
536
        # Unregister the image by removing all metadata from domain
537
        # 'PLANKTON_DOMAIN'
538
        meta = self._get_meta(location)
539
        for k in meta.keys():
540
            meta[k] = ""
541
        self._update_meta(location, meta, False)