Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / plankton / backend.py @ 4fb602b1

History | View | Annotate | Download (19.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
"""
35
The Plankton attributes are the following:
36
  - checksum: the 'hash' meta
37
  - container_format: stored as a user meta
38
  - created_at: the 'modified' meta of the first version
39
  - deleted_at: the timestamp of the last version
40
  - disk_format: stored as a user meta
41
  - id: the 'uuid' meta
42
  - is_public: True if there is a * entry for the read permission
43
  - location: generated based on the file's path
44
  - name: stored as a user meta
45
  - owner: the file's account
46
  - properties: stored as user meta prefixed with PROPERTY_PREFIX
47
  - size: the 'bytes' meta
48
  - status: stored as a system meta
49
  - store: is always 'pithos'
50
  - updated_at: the 'modified' meta
51
"""
52

    
53
import json
54
import warnings
55

    
56
from operator import itemgetter
57
from time import gmtime, strftime
58
from functools import wraps, partial
59
from synnefo.api import faults
60

    
61
from django.conf import settings
62

    
63
from pithos.backends.base import NotAllowedError as PithosNotAllowedError
64
import synnefo.lib.astakos as lib_astakos
65
import logging
66

    
67
from synnefo.settings import (CYCLADES_USE_QUOTAHOLDER,
68
                              CYCLADES_QUOTAHOLDER_URL,
69
                              CYCLADES_QUOTAHOLDER_TOKEN,
70
                              CYCLADES_QUOTAHOLDER_POOLSIZE)
71

    
72
logger = logging.getLogger(__name__)
73

    
74

    
75
PLANKTON_DOMAIN = 'plankton'
76
PLANKTON_PREFIX = 'plankton:'
77
PROPERTY_PREFIX = 'property:'
78

    
79
PLANKTON_META = ('container_format', 'disk_format', 'name', 'properties',
80
                 'status')
81

    
82
TRANSLATE_UUIDS = getattr(settings, 'TRANSLATE_UUIDS', False)
83

    
84

    
85
def get_displaynames(names):
86
    try:
87
        auth_url = settings.ASTAKOS_URL
88
        url = auth_url.replace('im/authenticate', 'service/api/user_catalogs')
89
        token = settings.CYCLADES_ASTAKOS_SERVICE_TOKEN
90
        uuids = lib_astakos.get_displaynames(token, names, url=url)
91
    except Exception, e:
92
        logger.exception(e)
93
        return {}
94

    
95
    return uuids
96

    
97

    
98
def get_location(account, container, object):
99
    assert '/' not in account, "Invalid account"
100
    assert '/' not in container, "Invalid container"
101
    return 'pithos://%s/%s/%s' % (account, container, object)
102

    
103

    
104
def split_location(location):
105
    """Returns (accout, container, object) from a location string"""
106
    t = location.split('/', 4)
107
    assert len(t) == 5, "Invalid location"
108
    return t[2:5]
109

    
110

    
111
class BackendException(Exception):
112
    pass
113

    
114

    
115
class NotAllowedError(BackendException):
116
    pass
117

    
118

    
119
from pithos.backends.util import PithosBackendPool
120
POOL_SIZE = 8
121
_pithos_backend_pool = \
122
    PithosBackendPool(
123
        POOL_SIZE,
124
        quotaholder_enabled=CYCLADES_USE_QUOTAHOLDER,
125
        quotaholder_url=CYCLADES_QUOTAHOLDER_URL,
126
        quotaholder_token=CYCLADES_QUOTAHOLDER_TOKEN,
127
        quotaholder_client_poolsize=CYCLADES_QUOTAHOLDER_POOLSIZE,
128
        db_connection=settings.BACKEND_DB_CONNECTION,
129
        block_path=settings.BACKEND_BLOCK_PATH)
130

    
131

    
132
def get_pithos_backend():
133
    return _pithos_backend_pool.pool_get()
134

    
135

    
136
def handle_backend_exceptions(func):
137
    @wraps(func)
138
    def wrapper(*args, **kwargs):
139
        try:
140
            return func(*args, **kwargs)
141
        except PithosNotAllowedError:
142
            raise NotAllowedError()
143
    return wrapper
144

    
145

    
146
class ImageBackend(object):
147
    """A wrapper arround the pithos backend to simplify image handling."""
148

    
149
    def __init__(self, user):
150
        self.user = user
151

    
152
        original_filters = warnings.filters
153
        warnings.simplefilter('ignore')         # Suppress SQLAlchemy warnings
154
        self.backend = get_pithos_backend()
155
        warnings.filters = original_filters     # Restore warnings
156

    
157
    @handle_backend_exceptions
158
    def _get_image(self, location):
159
        def format_timestamp(t):
160
            return strftime('%Y-%m-%d %H:%M:%S', gmtime(t))
161

    
162
        account, container, object = split_location(location)
163

    
164
        try:
165
            versions = self.backend.list_versions(self.user, account,
166
                                                  container, object)
167
        except NameError:
168
            return None
169

    
170
        image = {}
171

    
172
        meta = self._get_meta(location)
173
        if meta:
174
            image['deleted_at'] = ''
175
        else:
176
            # Object was deleted, use the latest version
177
            version, timestamp = versions[-1]
178
            meta = self._get_meta(location, version)
179
            image['deleted_at'] = format_timestamp(timestamp)
180

    
181
        if PLANKTON_PREFIX + 'name' not in meta:
182
            return None     # Not a Plankton image
183

    
184
        permissions = self._get_permissions(location)
185

    
186
        image['checksum'] = meta['hash']
187
        image['created_at'] = format_timestamp(versions[0][1])
188
        image['id'] = meta['uuid']
189
        image['is_public'] = '*' in permissions.get('read', [])
190
        image['location'] = location
191
        if TRANSLATE_UUIDS:
192
            displaynames = get_displaynames([account])
193
            if account in displaynames:
194
                display_account = displaynames[account]
195
            else:
196
                display_account = 'unknown'
197
            image['owner'] = display_account
198
        else:
199
            image['owner'] = account
200
        image['size'] = meta['bytes']
201
        image['store'] = 'pithos'
202
        image['updated_at'] = format_timestamp(meta['modified'])
203
        image['properties'] = {}
204

    
205
        for key, val in meta.items():
206
            if not key.startswith(PLANKTON_PREFIX):
207
                continue
208
            key = key[len(PLANKTON_PREFIX):]
209
            if key == 'properties':
210
                val = json.loads(val)
211
            if key in PLANKTON_META:
212
                image[key] = val
213

    
214
        return image
215

    
216
    @handle_backend_exceptions
217
    def _get_meta(self, location, version=None):
218
        account, container, object = split_location(location)
219
        try:
220
            return self.backend.get_object_meta(self.user, account, container,
221
                                                object, PLANKTON_DOMAIN,
222
                                                version)
223
        except NameError:
224
            return None
225

    
226
    @handle_backend_exceptions
227
    def _get_permissions(self, location):
228
        account, container, object = split_location(location)
229
        _a, _p, permissions = self.backend.get_object_permissions(self.user,
230
                                                                  account,
231
                                                                  container,
232
                                                                  object)
233
        return permissions
234

    
235
    @handle_backend_exceptions
236
    def _store(self, f, size=None):
237
        """Breaks data into blocks and stores them in the backend"""
238

    
239
        bytes = 0
240
        hashmap = []
241
        backend = self.backend
242
        blocksize = backend.block_size
243

    
244
        data = f.read(blocksize)
245
        while data:
246
            hash = backend.put_block(data)
247
            hashmap.append(hash)
248
            bytes += len(data)
249
            data = f.read(blocksize)
250

    
251
        if size and size != bytes:
252
            raise BackendException("Invalid size")
253

    
254
        return hashmap, bytes
255

    
256
    @handle_backend_exceptions
257
    def _update(self, location, size, hashmap, meta, permissions):
258
        account, container, object = split_location(location)
259
        self.backend.update_object_hashmap(self.user, account, container,
260
                                           object, size, hashmap, '',
261
                                           PLANKTON_DOMAIN,
262
                                           permissions=permissions)
263
        self._update_meta(location, meta, replace=True)
264

    
265
    @handle_backend_exceptions
266
    def _update_meta(self, location, meta, replace=False):
267
        account, container, object = split_location(location)
268

    
269
        prefixed = {}
270
        for key, val in meta.items():
271
            if key == 'properties':
272
                val = json.dumps(val)
273
            if key in PLANKTON_META:
274
                prefixed[PLANKTON_PREFIX + key] = val
275

    
276
        self.backend.update_object_meta(self.user, account, container, object,
277
                                        PLANKTON_DOMAIN, prefixed, replace)
278

    
279
    @handle_backend_exceptions
280
    def _update_permissions(self, location, permissions):
281
        account, container, object = split_location(location)
282
        self.backend.update_object_permissions(self.user, account, container,
283
                                               object, permissions)
284

    
285
    @handle_backend_exceptions
286
    def add_user(self, image_id, user):
287
        image = self.get_image(image_id)
288
        if not image:
289
            raise faults.ItemNotFound
290

    
291
        location = image['location']
292
        permissions = self._get_permissions(location)
293
        read = set(permissions.get('read', []))
294
        read.add(user)
295
        permissions['read'] = list(read)
296
        self._update_permissions(location, permissions)
297

    
298
    def close(self):
299
        self.backend.close()
300

    
301
    @handle_backend_exceptions
302
    def _delete(self, image_id):
303
        """Delete an Image.
304

305
        This method will delete the Image from the Storage backend.
306

307
        """
308
        image = self.get_image(image_id)
309
        account, container, object = split_location(image['location'])
310
        self.backend.delete_object(self.user, account, container, object)
311

    
312
    @handle_backend_exceptions
313
    def get_data(self, location):
314
        account, container, object = split_location(location)
315
        size, hashmap = self.backend.get_object_hashmap(self.user, account,
316
                                                        container, object)
317
        data = ''.join(self.backend.get_block(hash) for hash in hashmap)
318
        assert len(data) == size
319
        return data
320

    
321
    @handle_backend_exceptions
322
    def get_image(self, image_id):
323
        try:
324
            account, container, object = self.backend.get_uuid(self.user,
325
                                                               image_id)
326
        except NameError:
327
            return None
328

    
329
        location = get_location(account, container, object)
330
        return self._get_image(location)
331

    
332
    @handle_backend_exceptions
333
    def _iter(self, public=False, filters=None, shared_from=None):
334
        filters = filters or {}
335

    
336
        # Fix keys
337
        keys = [PLANKTON_PREFIX + 'name']
338
        size_range = (None, None)
339
        for key, val in filters.items():
340
            if key == 'size_min':
341
                size_range = (val, size_range[1])
342
            elif key == 'size_max':
343
                size_range = (size_range[0], val)
344
            else:
345
                keys.append('%s = %s' % (PLANKTON_PREFIX + key, val))
346

    
347
        backend = self.backend
348
        if shared_from:
349
            # To get shared images, we connect as shared_from member and
350
            # get the list shared by us
351
            user = shared_from
352
            accounts = [self.user]
353
        else:
354
            user = None if public else self.user
355
            accounts = backend.list_accounts(user)
356

    
357
        for account in accounts:
358
            for container in backend.list_containers(user, account,
359
                                                     shared=True):
360
                for path, _ in backend.list_objects(user, account, container,
361
                                                    domain=PLANKTON_DOMAIN,
362
                                                    keys=keys, shared=True,
363
                                                    size_range=size_range):
364
                    location = get_location(account, container, path)
365
                    image = self._get_image(location)
366
                    if image:
367
                        yield image
368

    
369
    def iter(self, filters=None):
370
        """Iter over all images available to the user"""
371
        return self._iter(filters=filters)
372

    
373
    def iter_public(self, filters=None):
374
        """Iter over public images"""
375
        return self._iter(public=True, filters=filters)
376

    
377
    def iter_shared(self, filters=None, member=None):
378
        """Iter over images shared to member"""
379
        return self._iter(filters=filters, shared_from=member)
380

    
381
    def list(self, filters=None, params={}):
382
        """Return all images available to the user"""
383
        images = list(self.iter(filters))
384
        key = itemgetter(params.get('sort_key', 'created_at'))
385
        reverse = params.get('sort_dir', 'desc') == 'desc'
386
        images.sort(key=key, reverse=reverse)
387
        return images
388

    
389
    def list_public(self, filters, params={}):
390
        """Return public images"""
391
        images = list(self.iter_public(filters))
392
        key = itemgetter(params.get('sort_key', 'created_at'))
393
        reverse = params.get('sort_dir', 'desc') == 'desc'
394
        images.sort(key=key, reverse=reverse)
395
        return images
396

    
397
    def list_users(self, image_id):
398
        image = self.get_image(image_id)
399
        if not image:
400
            raise faults.ItemNotFound
401

    
402
        permissions = self._get_permissions(image['location'])
403
        return [user for user in permissions.get('read', []) if user != '*']
404

    
405
    @handle_backend_exceptions
406
    def put(self, name, f, params):
407
        assert 'checksum' not in params, "Passing a checksum is not supported"
408
        assert 'id' not in params, "Passing an ID is not supported"
409
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
410
        disk_format = params.setdefault('disk_format',
411
                                        settings.DEFAULT_DISK_FORMAT)
412
        assert disk_format in settings.ALLOWED_DISK_FORMATS,\
413
            "Invalid disk_format"
414
        assert params.setdefault('container_format',
415
                settings.DEFAULT_CONTAINER_FORMAT) in \
416
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
417

    
418
        container = settings.DEFAULT_PLANKTON_CONTAINER
419
        filename = params.pop('filename', name)
420
        location = 'pithos://%s/%s/%s' % (self.user, container, filename)
421
        is_public = params.pop('is_public', False)
422
        permissions = {'read': ['*']} if is_public else {}
423
        size = params.pop('size', None)
424

    
425
        hashmap, size = self._store(f, size)
426

    
427
        meta = {}
428
        meta['properties'] = params.pop('properties', {})
429
        meta.update(name=name, status='available', **params)
430

    
431
        self._update(location, size, hashmap, meta, permissions)
432
        return self._get_image(location)
433

    
434
    @handle_backend_exceptions
435
    def register(self, name, location, params):
436
        assert 'id' not in params, "Passing an ID is not supported"
437
        assert location.startswith('pithos://'), "Invalid location"
438
        assert params.pop('store', 'pithos') == 'pithos', "Invalid store"
439
        assert params.setdefault('disk_format',
440
                settings.DEFAULT_DISK_FORMAT) in \
441
                settings.ALLOWED_DISK_FORMATS, "Invalid disk_format"
442
        assert params.setdefault('container_format',
443
                settings.DEFAULT_CONTAINER_FORMAT) in \
444
                settings.ALLOWED_CONTAINER_FORMATS, "Invalid container_format"
445

    
446
        # user = self.user
447
        account, container, object = split_location(location)
448

    
449
        meta = self._get_meta(location)
450
        assert meta, "File not found"
451

    
452
        size = int(params.pop('size', meta['bytes']))
453
        if size != meta['bytes']:
454
            raise BackendException("Invalid size")
455

    
456
        checksum = params.pop('checksum', meta['hash'])
457
        if checksum != meta['hash']:
458
            raise BackendException("Invalid checksum")
459

    
460
        is_public = params.pop('is_public', False)
461
        if is_public:
462
            permissions = {'read': ['*']}
463
        else:
464
            permissions = {'read': [self.user]}
465

    
466
        meta = {}
467
        meta['properties'] = params.pop('properties', {})
468
        meta.update(name=name, status='available', **params)
469

    
470
        self._update_meta(location, meta)
471
        self._update_permissions(location, permissions)
472
        return self._get_image(location)
473

    
474
    @handle_backend_exceptions
475
    def remove_user(self, image_id, user):
476
        image = self.get_image(image_id)
477
        if not image:
478
            raise faults.ItemNotFound
479

    
480
        location = image['location']
481
        permissions = self._get_permissions(location)
482
        try:
483
            permissions.get('read', []).remove(user)
484
        except ValueError:
485
            return      # User did not have access anyway
486
        self._update_permissions(location, permissions)
487

    
488
    @handle_backend_exceptions
489
    def replace_users(self, image_id, users):
490
        image = self.get_image(image_id)
491
        if not image:
492
            raise faults.ItemNotFound
493

    
494
        location = image['location']
495
        permissions = self._get_permissions(location)
496
        permissions['read'] = users
497
        if image.get('is_public', False):
498
            permissions['read'].append('*')
499
        self._update_permissions(location, permissions)
500

    
501
    @handle_backend_exceptions
502
    def update(self, image_id, params):
503
        image = self.get_image(image_id)
504
        assert image, "Image not found"
505
        if not image:
506
            raise faults.ItemNotFound
507

    
508
        location = image['location']
509
        is_public = params.pop('is_public', None)
510
        if is_public is not None:
511
            permissions = self._get_permissions(location)
512
            read = set(permissions.get('read', []))
513
            if is_public:
514
                read.add('*')
515
            else:
516
                read.discard('*')
517
            permissions['read'] = list(read)
518
            self.backend._update_permissions(location, permissions)
519

    
520
        meta = {}
521
        meta['properties'] = params.pop('properties', {})
522
        meta.update(**params)
523

    
524
        self._update_meta(location, meta)
525
        return self.get_image(image_id)
526

    
527
    @handle_backend_exceptions
528
    def unregister(self, image_id):
529
        """Unregister an image."""
530
        image = self.get_image(image_id)
531
        if not image:
532
            raise faults.ItemNotFound
533

    
534
        location = image["location"]
535
        # Unregister the image by removing all metadata from domain
536
        # 'PLANKTON_DOMAIN'
537
        meta = self._get_meta(location)
538
        for k in meta.keys():
539
            meta[k] = ""
540
        self._update_meta(location, meta, False)