Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / plankton / backend.py @ f6ff4b40

History | View | Annotate | Download (19 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
"""
35
The Plankton attributes are the following:
36
  - checksum: the 'hash' meta
37
  - container_format: stored as a user meta
38
  - created_at: the 'modified' meta of the first version
39
  - deleted_at: the timestamp of the last version
40
  - disk_format: stored as a user meta
41
  - id: the 'uuid' meta
42
  - is_public: True if there is a * entry for the read permission
43
  - location: generated based on the file's path
44
  - name: stored as a user meta
45
  - owner: the file's account
46
  - properties: stored as user meta prefixed with PROPERTY_PREFIX
47
  - size: the 'bytes' meta
48
  - status: stored as a system meta
49
  - store: is always 'pithos'
50
  - updated_at: the 'modified' meta
51
"""
52

    
53
import json
54
import warnings
55

    
56
from operator import itemgetter
57
from time import gmtime, strftime
58
from functools import wraps, partial
59
from snf_django.lib.api import faults
60

    
61
from django.conf import settings
62

    
63
from pithos.backends.base import NotAllowedError
64

    
65
import synnefo.lib.astakos as lib_astakos
66
import logging
67

    
68
from synnefo.settings import (CYCLADES_USE_QUOTAHOLDER,
69
                              CYCLADES_QUOTAHOLDER_URL,
70
                              CYCLADES_QUOTAHOLDER_TOKEN,
71
                              CYCLADES_QUOTAHOLDER_POOLSIZE)
72

    
73
logger = logging.getLogger(__name__)
74

    
75

    
76
PLANKTON_DOMAIN = 'plankton'
77
PLANKTON_PREFIX = 'plankton:'
78
PROPERTY_PREFIX = 'property:'
79

    
80
PLANKTON_META = ('container_format', 'disk_format', 'name', 'properties',
81
                 'status')
82

    
83
TRANSLATE_UUIDS = getattr(settings, 'TRANSLATE_UUIDS', False)
84

    
85

    
86
def get_displaynames(names):
87
    try:
88
        auth_url = settings.ASTAKOS_URL
89
        url = auth_url.replace('im/authenticate', 'service/api/user_catalogs')
90
        token = settings.CYCLADES_ASTAKOS_SERVICE_TOKEN
91
        uuids = lib_astakos.get_displaynames(token, names, url=url)
92
    except Exception, e:
93
        logger.exception(e)
94
        return {}
95

    
96
    return uuids
97

    
98

    
99
def get_location(account, container, object):
100
    assert '/' not in account, "Invalid account"
101
    assert '/' not in container, "Invalid container"
102
    return 'pithos://%s/%s/%s' % (account, container, object)
103

    
104

    
105
def split_location(location):
106
    """Returns (accout, container, object) from a location string"""
107
    t = location.split('/', 4)
108
    assert len(t) == 5, "Invalid location"
109
    return t[2:5]
110

    
111

    
112
class BackendException(Exception):
113
    pass
114

    
115

    
116
from pithos.backends.util import PithosBackendPool
117
POOL_SIZE = 8
118
_pithos_backend_pool = \
119
    PithosBackendPool(
120
        POOL_SIZE,
121
        quotaholder_enabled=CYCLADES_USE_QUOTAHOLDER,
122
        quotaholder_url=CYCLADES_QUOTAHOLDER_URL,
123
        quotaholder_token=CYCLADES_QUOTAHOLDER_TOKEN,
124
        quotaholder_client_poolsize=CYCLADES_QUOTAHOLDER_POOLSIZE,
125
        db_connection=settings.BACKEND_DB_CONNECTION,
126
        block_path=settings.BACKEND_BLOCK_PATH)
127

    
128

    
129
def get_pithos_backend():
130
    return _pithos_backend_pool.pool_get()
131

    
132

    
133
def handle_backend_exceptions(func):
134
    @wraps(func)
135
    def wrapper(*args, **kwargs):
136
        try:
137
            return func(*args, **kwargs)
138
        except NotAllowedError:
139
            raise faults.Forbidden("Request not allowed")
140
    return wrapper
141

    
142

    
143
class ImageBackend(object):
144
    """A wrapper arround the pithos backend to simplify image handling."""
145

    
146
    def __init__(self, user):
147
        self.user = user
148

    
149
        original_filters = warnings.filters
150
        warnings.simplefilter('ignore')         # Suppress SQLAlchemy warnings
151
        self.backend = get_pithos_backend()
152
        warnings.filters = original_filters     # Restore warnings
153

    
154
    @handle_backend_exceptions
155
    def _get_image(self, location):
156
        def format_timestamp(t):
157
            return strftime('%Y-%m-%d %H:%M:%S', gmtime(t))
158

    
159
        account, container, object = split_location(location)
160

    
161
        try:
162
            versions = self.backend.list_versions(self.user, account,
163
                                                  container, object)
164
        except NameError:
165
            return None
166

    
167
        image = {}
168

    
169
        meta = self._get_meta(location)
170
        if meta:
171
            image['deleted_at'] = ''
172
        else:
173
            # Object was deleted, use the latest version
174
            version, timestamp = versions[-1]
175
            meta = self._get_meta(location, version)
176
            image['deleted_at'] = format_timestamp(timestamp)
177

    
178
        if PLANKTON_PREFIX + 'name' not in meta:
179
            return None     # Not a Plankton image
180

    
181
        permissions = self._get_permissions(location)
182

    
183
        image['checksum'] = meta['hash']
184
        image['created_at'] = format_timestamp(versions[0][1])
185
        image['id'] = meta['uuid']
186
        image['is_public'] = '*' in permissions.get('read', [])
187
        image['location'] = location
188
        if TRANSLATE_UUIDS:
189
            displaynames = get_displaynames([account])
190
            if account in displaynames:
191
                display_account = displaynames[account]
192
            else:
193
                display_account = 'unknown'
194
            image['owner'] = display_account
195
        else:
196
            image['owner'] = account
197
        image['size'] = meta['bytes']
198
        image['store'] = 'pithos'
199
        image['updated_at'] = format_timestamp(meta['modified'])
200
        image['properties'] = {}
201

    
202
        for key, val in meta.items():
203
            if not key.startswith(PLANKTON_PREFIX):
204
                continue
205
            key = key[len(PLANKTON_PREFIX):]
206
            if key == 'properties':
207
                val = json.loads(val)
208
            if key in PLANKTON_META:
209
                image[key] = val
210

    
211
        return image
212

    
213
    @handle_backend_exceptions
214
    def _get_meta(self, location, version=None):
215
        account, container, object = split_location(location)
216
        try:
217
            return self.backend.get_object_meta(self.user, account, container,
218
                                                object, PLANKTON_DOMAIN,
219
                                                version)
220
        except NameError:
221
            return None
222

    
223
    @handle_backend_exceptions
224
    def _get_permissions(self, location):
225
        account, container, object = split_location(location)
226
        _a, _p, permissions = self.backend.get_object_permissions(self.user,
227
                                                                  account,
228
                                                                  container,
229
                                                                  object)
230
        return permissions
231

    
232
    @handle_backend_exceptions
233
    def _store(self, f, size=None):
234
        """Breaks data into blocks and stores them in the backend"""
235

    
236
        bytes = 0
237
        hashmap = []
238
        backend = self.backend
239
        blocksize = backend.block_size
240

    
241
        data = f.read(blocksize)
242
        while data:
243
            hash = backend.put_block(data)
244
            hashmap.append(hash)
245
            bytes += len(data)
246
            data = f.read(blocksize)
247

    
248
        if size and size != bytes:
249
            raise BackendException("Invalid size")
250

    
251
        return hashmap, bytes
252

    
253
    @handle_backend_exceptions
254
    def _update(self, location, size, hashmap, meta, permissions):
255
        account, container, object = split_location(location)
256
        self.backend.update_object_hashmap(self.user, account, container,
257
                                           object, size, hashmap, '',
258
                                           PLANKTON_DOMAIN,
259
                                           permissions=permissions)
260
        self._update_meta(location, meta, replace=True)
261

    
262
    @handle_backend_exceptions
263
    def _update_meta(self, location, meta, replace=False):
264
        account, container, object = split_location(location)
265

    
266
        prefixed = {}
267
        for key, val in meta.items():
268
            if key == 'properties':
269
                val = json.dumps(val)
270
            if key in PLANKTON_META:
271
                prefixed[PLANKTON_PREFIX + key] = val
272

    
273
        self.backend.update_object_meta(self.user, account, container, object,
274
                                        PLANKTON_DOMAIN, prefixed, replace)
275

    
276
    @handle_backend_exceptions
277
    def _update_permissions(self, location, permissions):
278
        account, container, object = split_location(location)
279
        self.backend.update_object_permissions(self.user, account, container,
280
                                               object, permissions)
281

    
282
    @handle_backend_exceptions
283
    def add_user(self, image_id, user):
284
        image = self.get_image(image_id)
285
        if not image:
286
            raise faults.ItemNotFound
287

    
288
        location = image['location']
289
        permissions = self._get_permissions(location)
290
        read = set(permissions.get('read', []))
291
        read.add(user)
292
        permissions['read'] = list(read)
293
        self._update_permissions(location, permissions)
294

    
295
    def close(self):
296
        self.backend.close()
297

    
298
    @handle_backend_exceptions
299
    def _delete(self, image_id):
300
        """Delete an Image.
301

302
        This method will delete the Image from the Storage backend.
303

304
        """
305
        image = self.get_image(image_id)
306
        account, container, object = split_location(image['location'])
307
        self.backend.delete_object(self.user, account, container, object)
308

    
309
    @handle_backend_exceptions
310
    def get_data(self, location):
311
        account, container, object = split_location(location)
312
        size, hashmap = self.backend.get_object_hashmap(self.user, account,
313
                                                        container, object)
314
        data = ''.join(self.backend.get_block(hash) for hash in hashmap)
315
        assert len(data) == size
316
        return data
317

    
318
    @handle_backend_exceptions
319
    def get_image(self, image_id):
320
        try:
321
            account, container, object = self.backend.get_uuid(self.user,
322
                                                               image_id)
323
        except NameError:
324
            return None
325

    
326
        location = get_location(account, container, object)
327
        return self._get_image(location)
328

    
329
    @handle_backend_exceptions
330
    def _iter(self, public=False, filters=None, shared_from=None):
331
        filters = filters or {}
332

    
333
        # Fix keys
334
        keys = [PLANKTON_PREFIX + 'name']
335
        size_range = (None, None)
336
        for key, val in filters.items():
337
            if key == 'size_min':
338
                size_range = (val, size_range[1])
339
            elif key == 'size_max':
340
                size_range = (size_range[0], val)
341
            else:
342
                keys.append('%s = %s' % (PLANKTON_PREFIX + key, val))
343

    
344
        backend = self.backend
345
        if shared_from:
346
            # To get shared images, we connect as shared_from member and
347
            # get the list shared by us
348
            user = shared_from
349
            accounts = [self.user]
350
        else:
351
            user = None if public else self.user
352
            accounts = backend.list_accounts(user)
353

    
354
        for account in accounts:
355
            for container in backend.list_containers(user, account,
356
                                                     shared=True):
357
                for path, _ in backend.list_objects(user, account, container,
358
                                                    domain=PLANKTON_DOMAIN,
359
                                                    keys=keys, shared=True,
360
                                                    size_range=size_range):
361
                    location = get_location(account, container, path)
362
                    image = self._get_image(location)
363
                    if image:
364
                        yield image
365

    
366
    def iter(self, filters=None):
367
        """Iter over all images available to the user"""
368
        return self._iter(filters=filters)
369

    
370
    def iter_public(self, filters=None):
371
        """Iter over public images"""
372
        return self._iter(public=True, filters=filters)
373

    
374
    def iter_shared(self, filters=None, member=None):
375
        """Iter over images shared to member"""
376
        return self._iter(filters=filters, shared_from=member)
377

    
378
    def list(self, filters=None, params={}):
379
        """Return all images available to the user"""
380
        images = list(self.iter(filters))
381
        key = itemgetter(params.get('sort_key', 'created_at'))
382
        reverse = params.get('sort_dir', 'desc') == 'desc'
383
        images.sort(key=key, reverse=reverse)
384
        return images
385

    
386
    def list_public(self, filters, params={}):
387
        """Return public images"""
388
        images = list(self.iter_public(filters))
389
        key = itemgetter(params.get('sort_key', 'created_at'))
390
        reverse = params.get('sort_dir', 'desc') == 'desc'
391
        images.sort(key=key, reverse=reverse)
392
        return images
393

    
394
    def list_users(self, image_id):
395
        image = self.get_image(image_id)
396
        if not image:
397
            raise faults.ItemNotFound
398

    
399
        permissions = self._get_permissions(image['location'])
400
        return [user for user in permissions.get('read', []) if user != '*']
401

    
402
    @handle_backend_exceptions
403
    def put(self, name, f, params):
404
        assert 'checksum' not in params, "Passing a checksum is not supported"
405
        assert 'id' not in params, "Passing an ID is not supported"
406
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
407
        disk_format = params.setdefault('disk_format',
408
                                        settings.DEFAULT_DISK_FORMAT)
409
        assert disk_format in settings.ALLOWED_DISK_FORMATS,\
410
            "Invalid disk_format"
411
        assert params.setdefault('container_format',
412
                settings.DEFAULT_CONTAINER_FORMAT) in \
413
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
414

    
415
        container = settings.DEFAULT_PLANKTON_CONTAINER
416
        filename = params.pop('filename', name)
417
        location = 'pithos://%s/%s/%s' % (self.user, container, filename)
418
        is_public = params.pop('is_public', False)
419
        permissions = {'read': ['*']} if is_public else {}
420
        size = params.pop('size', None)
421

    
422
        hashmap, size = self._store(f, size)
423

    
424
        meta = {}
425
        meta['properties'] = params.pop('properties', {})
426
        meta.update(name=name, status='available', **params)
427

    
428
        self._update(location, size, hashmap, meta, permissions)
429
        return self._get_image(location)
430

    
431
    @handle_backend_exceptions
432
    def register(self, name, location, params):
433
        assert 'id' not in params, "Passing an ID is not supported"
434
        assert location.startswith('pithos://'), "Invalid location"
435
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
436
        assert params.setdefault('disk_format',
437
                settings.DEFAULT_DISK_FORMAT) in \
438
                settings.ALLOWED_DISK_FORMATS, "Invalid disk_format"
439
        assert params.setdefault('container_format',
440
                settings.DEFAULT_CONTAINER_FORMAT) in \
441
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
442

    
443
        # user = self.user
444
        account, container, object = split_location(location)
445

    
446
        meta = self._get_meta(location)
447
        assert meta, "File not found"
448

    
449
        size = int(params.pop('size', meta['bytes']))
450
        if size != meta['bytes']:
451
            raise BackendException("Invalid size")
452

    
453
        checksum = params.pop('checksum', meta['hash'])
454
        if checksum != meta['hash']:
455
            raise BackendException("Invalid checksum")
456

    
457
        is_public = params.pop('is_public', False)
458
        if is_public:
459
            permissions = {'read': ['*']}
460
        else:
461
            permissions = {'read': [self.user]}
462

    
463
        meta = {}
464
        meta['properties'] = params.pop('properties', {})
465
        meta.update(name=name, status='available', **params)
466

    
467
        self._update_meta(location, meta)
468
        self._update_permissions(location, permissions)
469
        return self._get_image(location)
470

    
471
    @handle_backend_exceptions
472
    def remove_user(self, image_id, user):
473
        image = self.get_image(image_id)
474
        if not image:
475
            raise faults.ItemNotFound
476

    
477
        location = image['location']
478
        permissions = self._get_permissions(location)
479
        try:
480
            permissions.get('read', []).remove(user)
481
        except ValueError:
482
            return      # User did not have access anyway
483
        self._update_permissions(location, permissions)
484

    
485
    @handle_backend_exceptions
486
    def replace_users(self, image_id, users):
487
        image = self.get_image(image_id)
488
        if not image:
489
            raise faults.ItemNotFound
490

    
491
        location = image['location']
492
        permissions = self._get_permissions(location)
493
        permissions['read'] = users
494
        if image.get('is_public', False):
495
            permissions['read'].append('*')
496
        self._update_permissions(location, permissions)
497

    
498
    @handle_backend_exceptions
499
    def update(self, image_id, params):
500
        image = self.get_image(image_id)
501
        assert image, "Image not found"
502
        if not image:
503
            raise faults.ItemNotFound
504

    
505
        location = image['location']
506
        is_public = params.pop('is_public', None)
507
        if is_public is not None:
508
            permissions = self._get_permissions(location)
509
            read = set(permissions.get('read', []))
510
            if is_public:
511
                read.add('*')
512
            else:
513
                read.discard('*')
514
            permissions['read'] = list(read)
515
            self.backend._update_permissions(location, permissions)
516

    
517
        meta = {}
518
        meta['properties'] = params.pop('properties', {})
519
        meta.update(**params)
520

    
521
        self._update_meta(location, meta)
522
        return self.get_image(image_id)
523

    
524
    @handle_backend_exceptions
525
    def unregister(self, image_id):
526
        """Unregister an image."""
527
        image = self.get_image(image_id)
528
        if not image:
529
            raise faults.ItemNotFound
530

    
531
        location = image["location"]
532
        # Unregister the image by removing all metadata from domain
533
        # 'PLANKTON_DOMAIN'
534
        meta = self._get_meta(location)
535
        for k in meta.keys():
536
            meta[k] = ""
537
        self._update_meta(location, meta, False)