Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 73673127

History | View | Annotate | Download (49.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43

    
44
# Stripped-down version of the HashMap class found in tools.
45
class HashMap(list):
46

    
47
    def __init__(self, blocksize, blockhash):
48
        super(HashMap, self).__init__()
49
        self.blocksize = blocksize
50
        self.blockhash = blockhash
51

    
52
    def _hash_raw(self, v):
53
        h = hashlib.new(self.blockhash)
54
        h.update(v)
55
        return h.digest()
56

    
57
    def hash(self):
58
        if len(self) == 0:
59
            return self._hash_raw('')
60
        if len(self) == 1:
61
            return self.__getitem__(0)
62

    
63
        h = list(self)
64
        s = 2
65
        while s < len(h):
66
            s = s * 2
67
        h += [('\x00' * len(h[0]))] * (s - len(h))
68
        while len(h) > 1:
69
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70
        return h[0]
71

    
72
# Default modules and settings.
73
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76
DEFAULT_BLOCK_PATH = 'data/'
77
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
78
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
79

    
80
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
81
QUEUE_CLIENT_ID = 'pithos'
82
QUEUE_INSTANCE_ID = '1'
83

    
84
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
85

    
86
inf = float('inf')
87

    
88
ULTIMATE_ANSWER = 42
89

    
90

    
91
logger = logging.getLogger(__name__)
92

    
93

    
94
def backend_method(func=None, autocommit=1):
95
    if func is None:
96
        def fn(func):
97
            return backend_method(func, autocommit)
98
        return fn
99

    
100
    if not autocommit:
101
        return func
102
    def fn(self, *args, **kw):
103
        self.wrapper.execute()
104
        try:
105
            self.messages = []
106
            ret = func(self, *args, **kw)
107
            self.wrapper.commit()
108
            for m in self.messages:
109
                self.queue.send(*m)
110
            return ret
111
        except:
112
            self.wrapper.rollback()
113
            raise
114
    return fn
115

    
116

    
117
class ModularBackend(BaseBackend):
118
    """A modular backend.
119
    
120
    Uses modules for SQL functions and storage.
121
    """
122
    
123
    def __init__(self, db_module=None, db_connection=None,
124
                 block_module=None, block_path=None,
125
                 queue_module=None, queue_connection=None):
126
        db_module = db_module or DEFAULT_DB_MODULE
127
        db_connection = db_connection or DEFAULT_DB_CONNECTION
128
        block_module = block_module or DEFAULT_BLOCK_MODULE
129
        block_path = block_path or DEFAULT_BLOCK_PATH
130
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
131
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
132
        
133
        self.hash_algorithm = 'sha256'
134
        self.block_size = 4 * 1024 * 1024 # 4MB
135
        
136
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
137
        
138
        def load_module(m):
139
            __import__(m)
140
            return sys.modules[m]
141
        
142
        self.db_module = load_module(db_module)
143
        self.wrapper = self.db_module.DBWrapper(db_connection)
144
        params = {'wrapper': self.wrapper}
145
        self.permissions = self.db_module.Permissions(**params)
146
        for x in ['READ', 'WRITE']:
147
            setattr(self, x, getattr(self.db_module, x))
148
        self.node = self.db_module.Node(**params)
149
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
150
            setattr(self, x, getattr(self.db_module, x))
151
        
152
        self.block_module = load_module(block_module)
153
        params = {'path': block_path,
154
                  'block_size': self.block_size,
155
                  'hash_algorithm': self.hash_algorithm}
156
        self.store = self.block_module.Store(**params)
157

    
158
        if queue_module and queue_connection:
159
            self.queue_module = load_module(queue_module)
160
            params = {'exchange': queue_connection,
161
                      'client_id': QUEUE_CLIENT_ID}
162
            self.queue = self.queue_module.Queue(**params)
163
        else:
164
            class NoQueue:
165
                def send(self, *args):
166
                    pass
167
                
168
                def close(self):
169
                    pass
170
            
171
            self.queue = NoQueue()
172
    
173
    def close(self):
174
        self.wrapper.close()
175
        self.queue.close()
176
    
177
    @backend_method
178
    def list_accounts(self, user, marker=None, limit=10000):
179
        """Return a list of accounts the user can access."""
180
        
181
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
182
        allowed = self._allowed_accounts(user)
183
        start, limit = self._list_limits(allowed, marker, limit)
184
        return allowed[start:start + limit]
185
    
186
    @backend_method
187
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
188
        """Return a dictionary with the account metadata for the domain."""
189
        
190
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
191
        path, node = self._lookup_account(account, user == account)
192
        if user != account:
193
            if until or node is None or account not in self._allowed_accounts(user):
194
                raise NotAllowedError
195
        try:
196
            props = self._get_properties(node, until)
197
            mtime = props[self.MTIME]
198
        except NameError:
199
            props = None
200
            mtime = until
201
        count, bytes, tstamp = self._get_statistics(node, until)
202
        tstamp = max(tstamp, mtime)
203
        if until is None:
204
            modified = tstamp
205
        else:
206
            modified = self._get_statistics(node)[2] # Overall last modification.
207
            modified = max(modified, mtime)
208
        
209
        if user != account:
210
            meta = {'name': account}
211
        else:
212
            meta = {}
213
            if props is not None and include_user_defined:
214
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
215
            if until is not None:
216
                meta.update({'until_timestamp': tstamp})
217
            meta.update({'name': account, 'count': count, 'bytes': bytes})
218
        meta.update({'modified': modified})
219
        return meta
220
    
221
    @backend_method
222
    def update_account_meta(self, user, account, domain, meta, replace=False):
223
        """Update the metadata associated with the account for the domain."""
224
        
225
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
226
        if user != account:
227
            raise NotAllowedError
228
        path, node = self._lookup_account(account, True)
229
        self._put_metadata(user, node, domain, meta, replace)
230
    
231
    @backend_method
232
    def get_account_groups(self, user, account):
233
        """Return a dictionary with the user groups defined for this account."""
234
        
235
        logger.debug("get_account_groups: %s", account)
236
        if user != account:
237
            if account not in self._allowed_accounts(user):
238
                raise NotAllowedError
239
            return {}
240
        self._lookup_account(account, True)
241
        return self.permissions.group_dict(account)
242
    
243
    @backend_method
244
    def update_account_groups(self, user, account, groups, replace=False):
245
        """Update the groups associated with the account."""
246
        
247
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
248
        if user != account:
249
            raise NotAllowedError
250
        self._lookup_account(account, True)
251
        self._check_groups(groups)
252
        if replace:
253
            self.permissions.group_destroy(account)
254
        for k, v in groups.iteritems():
255
            if not replace: # If not already deleted.
256
                self.permissions.group_delete(account, k)
257
            if v:
258
                self.permissions.group_addmany(account, k, v)
259
    
260
    @backend_method
261
    def get_account_policy(self, user, account):
262
        """Return a dictionary with the account policy."""
263
        
264
        logger.debug("get_account_policy: %s", account)
265
        if user != account:
266
            if account not in self._allowed_accounts(user):
267
                raise NotAllowedError
268
            return {}
269
        path, node = self._lookup_account(account, True)
270
        return self._get_policy(node)
271
    
272
    @backend_method
273
    def update_account_policy(self, user, account, policy, replace=False):
274
        """Update the policy associated with the account."""
275
        
276
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
277
        if user != account:
278
            raise NotAllowedError
279
        path, node = self._lookup_account(account, True)
280
        self._check_policy(policy)
281
        self._put_policy(node, policy, replace)
282
    
283
    @backend_method
284
    def put_account(self, user, account, policy={}):
285
        """Create a new account with the given name."""
286
        
287
        logger.debug("put_account: %s %s", account, policy)
288
        if user != account:
289
            raise NotAllowedError
290
        node = self.node.node_lookup(account)
291
        if node is not None:
292
            raise NameError('Account already exists')
293
        if policy:
294
            self._check_policy(policy)
295
        node = self._put_path(user, self.ROOTNODE, account)
296
        self._put_policy(node, policy, True)
297
    
298
    @backend_method
299
    def delete_account(self, user, account):
300
        """Delete the account with the given name."""
301
        
302
        logger.debug("delete_account: %s", account)
303
        if user != account:
304
            raise NotAllowedError
305
        node = self.node.node_lookup(account)
306
        if node is None:
307
            return
308
        if not self.node.node_remove(node):
309
            raise IndexError('Account is not empty')
310
        self.permissions.group_destroy(account)
311
    
312
    @backend_method
313
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
314
        """Return a list of containers existing under an account."""
315
        
316
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
317
        if user != account:
318
            if until or account not in self._allowed_accounts(user):
319
                raise NotAllowedError
320
            allowed = self._allowed_containers(user, account)
321
            start, limit = self._list_limits(allowed, marker, limit)
322
            return allowed[start:start + limit]
323
        if shared:
324
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
325
            allowed = list(set(allowed))
326
            start, limit = self._list_limits(allowed, marker, limit)
327
            return allowed[start:start + limit]
328
        node = self.node.node_lookup(account)
329
        return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
330
    
331
    @backend_method
332
    def list_container_meta(self, user, account, container, domain, until=None):
333
        """Return a list with all the container's object meta keys for the domain."""
334
        
335
        logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
336
        allowed = []
337
        if user != account:
338
            if until:
339
                raise NotAllowedError
340
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
341
            if not allowed:
342
                raise NotAllowedError
343
        path, node = self._lookup_container(account, container)
344
        before = until if until is not None else inf
345
        allowed = self._get_formatted_paths(allowed)
346
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
347
    
348
    @backend_method
349
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
350
        """Return a dictionary with the container metadata for the domain."""
351
        
352
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
353
        if user != account:
354
            if until or container not in self._allowed_containers(user, account):
355
                raise NotAllowedError
356
        path, node = self._lookup_container(account, container)
357
        props = self._get_properties(node, until)
358
        mtime = props[self.MTIME]
359
        count, bytes, tstamp = self._get_statistics(node, until)
360
        tstamp = max(tstamp, mtime)
361
        if until is None:
362
            modified = tstamp
363
        else:
364
            modified = self._get_statistics(node)[2] # Overall last modification.
365
            modified = max(modified, mtime)
366
        
367
        if user != account:
368
            meta = {'name': container}
369
        else:
370
            meta = {}
371
            if include_user_defined:
372
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
373
            if until is not None:
374
                meta.update({'until_timestamp': tstamp})
375
            meta.update({'name': container, 'count': count, 'bytes': bytes})
376
        meta.update({'modified': modified})
377
        return meta
378
    
379
    @backend_method
380
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
381
        """Update the metadata associated with the container for the domain."""
382
        
383
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
384
        if user != account:
385
            raise NotAllowedError
386
        path, node = self._lookup_container(account, container)
387
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
388
        if src_version_id is not None:
389
            versioning = self._get_policy(node)['versioning']
390
            if versioning != 'auto':
391
                self.node.version_remove(src_version_id)
392
    
393
    @backend_method
394
    def get_container_policy(self, user, account, container):
395
        """Return a dictionary with the container policy."""
396
        
397
        logger.debug("get_container_policy: %s %s", account, container)
398
        if user != account:
399
            if container not in self._allowed_containers(user, account):
400
                raise NotAllowedError
401
            return {}
402
        path, node = self._lookup_container(account, container)
403
        return self._get_policy(node)
404
    
405
    @backend_method
406
    def update_container_policy(self, user, account, container, policy, replace=False):
407
        """Update the policy associated with the container."""
408
        
409
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
410
        if user != account:
411
            raise NotAllowedError
412
        path, node = self._lookup_container(account, container)
413
        self._check_policy(policy)
414
        self._put_policy(node, policy, replace)
415
    
416
    @backend_method
417
    def put_container(self, user, account, container, policy={}):
418
        """Create a new container with the given name."""
419
        
420
        logger.debug("put_container: %s %s %s", account, container, policy)
421
        if user != account:
422
            raise NotAllowedError
423
        try:
424
            path, node = self._lookup_container(account, container)
425
        except NameError:
426
            pass
427
        else:
428
            raise NameError('Container already exists')
429
        if policy:
430
            self._check_policy(policy)
431
        path = '/'.join((account, container))
432
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
433
        self._put_policy(node, policy, True)
434
    
435
    @backend_method
436
    def delete_container(self, user, account, container, until=None):
437
        """Delete/purge the container with the given name."""
438
        
439
        logger.debug("delete_container: %s %s %s", account, container, until)
440
        if user != account:
441
            raise NotAllowedError
442
        path, node = self._lookup_container(account, container)
443
        
444
        if until is not None:
445
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
446
            for h in hashes:
447
                self.store.map_delete(h)
448
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
449
            self._report_size_change(user, account, -size, {'action': 'container purge'})
450
            return
451
        
452
        if self._get_statistics(node)[0] > 0:
453
            raise IndexError('Container is not empty')
454
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
455
        for h in hashes:
456
            self.store.map_delete(h)
457
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
458
        self.node.node_remove(node)
459
        self._report_size_change(user, account, -size, {'action': 'container delete'})
460
    
461
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
462
        if user != account and until:
463
            raise NotAllowedError
464
        allowed = self._list_object_permissions(user, account, container, prefix, shared)
465
        if shared and not allowed:
466
            return []
467
        path, node = self._lookup_container(account, container)
468
        allowed = self._get_formatted_paths(allowed)
469
        return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
470
    
471
    def _list_object_permissions(self, user, account, container, prefix, shared):
472
        allowed = []
473
        path = '/'.join((account, container, prefix)).rstrip('/')
474
        if user != account:
475
            allowed = self.permissions.access_list_paths(user, path)
476
            if not allowed:
477
                raise NotAllowedError
478
        else:
479
            if shared:
480
                allowed = self.permissions.access_list_shared(path)
481
                if not allowed:
482
                    return []
483
        return allowed
484
    
485
    @backend_method
486
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
487
        """Return a list of object (name, version_id) tuples existing under a container."""
488
        
489
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
490
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
491
    
492
    @backend_method
493
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
494
        """Return a list of object metadata dicts existing under a container."""
495
        
496
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
497
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
498
        objects = []
499
        for p in props:
500
            if len(p) == 2:
501
                objects.append({'subdir': p[0]})
502
            else:
503
                objects.append({'name': p[0],
504
                                'bytes': p[self.SIZE + 1],
505
                                'type': p[self.TYPE + 1],
506
                                'hash': p[self.HASH + 1],
507
                                'version': p[self.SERIAL + 1],
508
                                'version_timestamp': p[self.MTIME + 1],
509
                                'modified': p[self.MTIME + 1] if until is None else None,
510
                                'modified_by': p[self.MUSER + 1],
511
                                'uuid': p[self.UUID + 1],
512
                                'checksum': p[self.CHECKSUM + 1]})
513
        return objects
514
    
515
    @backend_method
516
    def list_object_permissions(self, user, account, container, prefix=''):
517
        """Return a list of paths that enforce permissions under a container."""
518
        
519
        logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
520
        return self._list_object_permissions(user, account, container, prefix, True)
521
    
522
    @backend_method
523
    def list_object_public(self, user, account, container, prefix=''):
524
        """Return a dict mapping paths to public ids for objects that are public under a container."""
525
        
526
        logger.debug("list_object_public: %s %s %s", account, container, prefix)
527
        public = {}
528
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
529
            public[path] = p + ULTIMATE_ANSWER
530
        return public
531
    
532
    @backend_method
533
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
534
        """Return a dictionary with the object metadata for the domain."""
535
        
536
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
537
        self._can_read(user, account, container, name)
538
        path, node = self._lookup_object(account, container, name)
539
        props = self._get_version(node, version)
540
        if version is None:
541
            modified = props[self.MTIME]
542
        else:
543
            try:
544
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
545
            except NameError: # Object may be deleted.
546
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
547
                if del_props is None:
548
                    raise NameError('Object does not exist')
549
                modified = del_props[self.MTIME]
550
        
551
        meta = {}
552
        if include_user_defined:
553
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
554
        meta.update({'name': name,
555
                     'bytes': props[self.SIZE],
556
                     'type': props[self.TYPE],
557
                     'hash': props[self.HASH],
558
                     'version': props[self.SERIAL],
559
                     'version_timestamp': props[self.MTIME],
560
                     'modified': modified,
561
                     'modified_by': props[self.MUSER],
562
                     'uuid': props[self.UUID],
563
                     'checksum': props[self.CHECKSUM]})
564
        return meta
565
    
566
    @backend_method
567
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
568
        """Update the metadata associated with the object for the domain and return the new version."""
569
        
570
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
571
        self._can_write(user, account, container, name)
572
        path, node = self._lookup_object(account, container, name)
573
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
574
        self._apply_versioning(account, container, src_version_id)
575
        return dest_version_id
576
    
577
    @backend_method
578
    def get_object_permissions(self, user, account, container, name):
579
        """Return the action allowed on the object, the path
580
        from which the object gets its permissions from,
581
        along with a dictionary containing the permissions."""
582
        
583
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
584
        allowed = 'write'
585
        permissions_path = self._get_permissions_path(account, container, name)
586
        if user != account:
587
            if self.permissions.access_check(permissions_path, self.WRITE, user):
588
                allowed = 'write'
589
            elif self.permissions.access_check(permissions_path, self.READ, user):
590
                allowed = 'read'
591
            else:
592
                raise NotAllowedError
593
        self._lookup_object(account, container, name)
594
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
595
    
596
    @backend_method
597
    def update_object_permissions(self, user, account, container, name, permissions):
598
        """Update the permissions associated with the object."""
599
        
600
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
601
        if user != account:
602
            raise NotAllowedError
603
        path = self._lookup_object(account, container, name)[0]
604
        self._check_permissions(path, permissions)
605
        self.permissions.access_set(path, permissions)
606
    
607
    @backend_method
608
    def get_object_public(self, user, account, container, name):
609
        """Return the public id of the object if applicable."""
610
        
611
        logger.debug("get_object_public: %s %s %s", account, container, name)
612
        self._can_read(user, account, container, name)
613
        path = self._lookup_object(account, container, name)[0]
614
        p = self.permissions.public_get(path)
615
        if p is not None:
616
            p += ULTIMATE_ANSWER
617
        return p
618
    
619
    @backend_method
620
    def update_object_public(self, user, account, container, name, public):
621
        """Update the public status of the object."""
622
        
623
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
624
        self._can_write(user, account, container, name)
625
        path = self._lookup_object(account, container, name)[0]
626
        if not public:
627
            self.permissions.public_unset(path)
628
        else:
629
            self.permissions.public_set(path)
630
    
631
    @backend_method
632
    def get_object_hashmap(self, user, account, container, name, version=None):
633
        """Return the object's size and a list with partial hashes."""
634
        
635
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
636
        self._can_read(user, account, container, name)
637
        path, node = self._lookup_object(account, container, name)
638
        props = self._get_version(node, version)
639
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
640
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
641
    
642
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
643
        if permissions is not None and user != account:
644
            raise NotAllowedError
645
        self._can_write(user, account, container, name)
646
        if permissions is not None:
647
            path = '/'.join((account, container, name))
648
            self._check_permissions(path, permissions)
649
        
650
        account_path, account_node = self._lookup_account(account, True)
651
        container_path, container_node = self._lookup_container(account, container)
652
        path, node = self._put_object_node(container_path, container_node, name)
653
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
654
        
655
        # Handle meta.
656
        if src_version_id is None:
657
            src_version_id = pre_version_id
658
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
659
        
660
        # Check quota.
661
        del_size = self._apply_versioning(account, container, pre_version_id)
662
        size_delta = size - del_size
663
        if size_delta > 0:
664
            account_quota = long(self._get_policy(account_node)['quota'])
665
            container_quota = long(self._get_policy(container_node)['quota'])
666
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
667
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
668
                # This must be executed in a transaction, so the version is never created if it fails.
669
                raise QuotaError
670
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
671
        
672
        if permissions is not None:
673
            self.permissions.access_set(path, permissions)
674
        
675
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
676
        return dest_version_id
677
    
678
    @backend_method
679
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
680
        """Create/update an object with the specified size and partial hashes."""
681
        
682
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
683
        if size == 0: # No such thing as an empty hashmap.
684
            hashmap = [self.put_block('')]
685
        map = HashMap(self.block_size, self.hash_algorithm)
686
        map.extend([binascii.unhexlify(x) for x in hashmap])
687
        missing = self.store.block_search(map)
688
        if missing:
689
            ie = IndexError()
690
            ie.data = [binascii.hexlify(x) for x in missing]
691
            raise ie
692
        
693
        hash = map.hash()
694
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
695
        self.store.map_put(hash, map)
696
        return dest_version_id
697
    
698
    @backend_method
699
    def update_object_checksum(self, user, account, container, name, version, checksum):
700
        """Update an object's checksum."""
701
        
702
        logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
703
        # Update objects with greater version and same hashmap and size (fix metadata updates).
704
        self._can_write(user, account, container, name)
705
        path, node = self._lookup_object(account, container, name)
706
        props = self._get_version(node, version)
707
        versions = self.node.node_get_versions(node)
708
        for x in versions:
709
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
710
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
711
    
712
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
713
        self._can_read(user, src_account, src_container, src_name)
714
        path, node = self._lookup_object(src_account, src_container, src_name)
715
        # TODO: Will do another fetch of the properties in duplicate version...
716
        props = self._get_version(node, src_version) # Check to see if source exists.
717
        src_version_id = props[self.SERIAL]
718
        hash = props[self.HASH]
719
        size = props[self.SIZE]
720
        
721
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
722
        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
723
        return dest_version_id
724
    
725
    @backend_method
726
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
727
        """Copy an object's data and metadata."""
728
        
729
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
730
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
731
        return dest_version_id
732
    
733
    @backend_method
734
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
735
        """Move an object's data and metadata."""
736
        
737
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
738
        if user != src_account:
739
            raise NotAllowedError
740
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
741
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
742
            self._delete_object(user, src_account, src_container, src_name)
743
        return dest_version_id
744
    
745
    def _delete_object(self, user, account, container, name, until=None):
746
        if user != account:
747
            raise NotAllowedError
748
        
749
        if until is not None:
750
            path = '/'.join((account, container, name))
751
            node = self.node.node_lookup(path)
752
            if node is None:
753
                return
754
            hashes = []
755
            size = 0
756
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
757
            hashes += h
758
            size += s
759
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
760
            hashes += h
761
            size += s
762
            for h in hashes:
763
                self.store.map_delete(h)
764
            self.node.node_purge(node, until, CLUSTER_DELETED)
765
            try:
766
                props = self._get_version(node)
767
            except NameError:
768
                self.permissions.access_clear(path)
769
            self._report_size_change(user, account, -size, {'action': 'object purge'})
770
            return
771
        
772
        path, node = self._lookup_object(account, container, name)
773
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
774
        del_size = self._apply_versioning(account, container, src_version_id)
775
        if del_size:
776
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
777
        self._report_object_change(user, account, path, details={'action': 'object delete'})
778
        self.permissions.access_clear(path)
779
    
780
    @backend_method
781
    def delete_object(self, user, account, container, name, until=None):
782
        """Delete/purge an object."""
783
        
784
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
785
        self._delete_object(user, account, container, name, until)
786
    
787
    @backend_method
788
    def list_versions(self, user, account, container, name):
789
        """Return a list of all (version, version_timestamp) tuples for an object."""
790
        
791
        logger.debug("list_versions: %s %s %s", account, container, name)
792
        self._can_read(user, account, container, name)
793
        path, node = self._lookup_object(account, container, name)
794
        versions = self.node.node_get_versions(node)
795
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
796
    
797
    @backend_method
798
    def get_uuid(self, user, uuid):
799
        """Return the (account, container, name) for the UUID given."""
800
        
801
        logger.debug("get_uuid: %s", uuid)
802
        info = self.node.latest_uuid(uuid)
803
        if info is None:
804
            raise NameError
805
        path, serial = info
806
        account, container, name = path.split('/', 2)
807
        self._can_read(user, account, container, name)
808
        return (account, container, name)
809
    
810
    @backend_method
811
    def get_public(self, user, public):
812
        """Return the (account, container, name) for the public id given."""
813
        
814
        logger.debug("get_public: %s", public)
815
        if public is None or public < ULTIMATE_ANSWER:
816
            raise NameError
817
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
818
        if path is None:
819
            raise NameError
820
        account, container, name = path.split('/', 2)
821
        self._can_read(user, account, container, name)
822
        return (account, container, name)
823
    
824
    @backend_method(autocommit=0)
825
    def get_block(self, hash):
826
        """Return a block's data."""
827
        
828
        logger.debug("get_block: %s", hash)
829
        block = self.store.block_get(binascii.unhexlify(hash))
830
        if not block:
831
            raise NameError('Block does not exist')
832
        return block
833
    
834
    @backend_method(autocommit=0)
835
    def put_block(self, data):
836
        """Store a block and return the hash."""
837
        
838
        logger.debug("put_block: %s", len(data))
839
        return binascii.hexlify(self.store.block_put(data))
840
    
841
    @backend_method(autocommit=0)
842
    def update_block(self, hash, data, offset=0):
843
        """Update a known block and return the hash."""
844
        
845
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
846
        if offset == 0 and len(data) == self.block_size:
847
            return self.put_block(data)
848
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
849
        return binascii.hexlify(h)
850
    
851
    # Path functions.
852
    
853
    def _generate_uuid(self):
854
        return str(uuidlib.uuid4())
855
    
856
    def _put_object_node(self, path, parent, name):
857
        path = '/'.join((path, name))
858
        node = self.node.node_lookup(path)
859
        if node is None:
860
            node = self.node.node_create(parent, path)
861
        return path, node
862
    
863
    def _put_path(self, user, parent, path):
864
        node = self.node.node_create(parent, path)
865
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
866
        return node
867
    
868
    def _lookup_account(self, account, create=True):
869
        node = self.node.node_lookup(account)
870
        if node is None and create:
871
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
872
        return account, node
873
    
874
    def _lookup_container(self, account, container):
875
        path = '/'.join((account, container))
876
        node = self.node.node_lookup(path)
877
        if node is None:
878
            raise NameError('Container does not exist')
879
        return path, node
880
    
881
    def _lookup_object(self, account, container, name):
882
        path = '/'.join((account, container, name))
883
        node = self.node.node_lookup(path)
884
        if node is None:
885
            raise NameError('Object does not exist')
886
        return path, node
887
    
888
    def _get_properties(self, node, until=None):
889
        """Return properties until the timestamp given."""
890
        
891
        before = until if until is not None else inf
892
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
893
        if props is None and until is not None:
894
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
895
        if props is None:
896
            raise NameError('Path does not exist')
897
        return props
898
    
899
    def _get_statistics(self, node, until=None):
900
        """Return count, sum of size and latest timestamp of everything under node."""
901
        
902
        if until is None:
903
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
904
        else:
905
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
906
        if stats is None:
907
            stats = (0, 0, 0)
908
        return stats
909
    
910
    def _get_version(self, node, version=None):
911
        if version is None:
912
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
913
            if props is None:
914
                raise NameError('Object does not exist')
915
        else:
916
            try:
917
                version = int(version)
918
            except ValueError:
919
                raise IndexError('Version does not exist')
920
            props = self.node.version_get_properties(version)
921
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
922
                raise IndexError('Version does not exist')
923
        return props
924
    
925
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
926
        """Create a new version of the node."""
927
        
928
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
929
        if props is not None:
930
            src_version_id = props[self.SERIAL]
931
            src_hash = props[self.HASH]
932
            src_size = props[self.SIZE]
933
            src_type = props[self.TYPE]
934
            src_checksum = props[self.CHECKSUM]
935
        else:
936
            src_version_id = None
937
            src_hash = None
938
            src_size = 0
939
            src_type = ''
940
            src_checksum = ''
941
        if size is None: # Set metadata.
942
            hash = src_hash # This way hash can be set to None (account or container).
943
            size = src_size
944
        if type is None:
945
            type = src_type
946
        if checksum is None:
947
            checksum = src_checksum
948
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
949
        
950
        if src_node is None:
951
            pre_version_id = src_version_id
952
        else:
953
            pre_version_id = None
954
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
955
            if props is not None:
956
                pre_version_id = props[self.SERIAL]
957
        if pre_version_id is not None:
958
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
959
        
960
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
961
        return pre_version_id, dest_version_id
962
    
963
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
964
        if src_version_id is not None:
965
            self.node.attribute_copy(src_version_id, dest_version_id)
966
        if not replace:
967
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
968
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
969
        else:
970
            self.node.attribute_del(dest_version_id, domain)
971
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
972
    
973
    def _put_metadata(self, user, node, domain, meta, replace=False):
974
        """Create a new version and store metadata."""
975
        
976
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
977
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
978
        return src_version_id, dest_version_id
979
    
980
    def _list_limits(self, listing, marker, limit):
981
        start = 0
982
        if marker:
983
            try:
984
                start = listing.index(marker) + 1
985
            except ValueError:
986
                pass
987
        if not limit or limit > 10000:
988
            limit = 10000
989
        return start, limit
990
    
991
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
992
        cont_prefix = path + '/'
993
        prefix = cont_prefix + prefix
994
        start = cont_prefix + marker if marker else None
995
        before = until if until is not None else inf
996
        filterq = keys if domain else []
997
        sizeq = size_range
998
        
999
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1000
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1001
        objects.sort(key=lambda x: x[0])
1002
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1003
        
1004
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1005
        return objects[start:start + limit]
1006
    
1007
    # Reporting functions.
1008
    
1009
    def _report_size_change(self, user, account, size, details={}):
1010
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1011
        account_node = self._lookup_account(account, True)[1]
1012
        total = self._get_statistics(account_node)[1]
1013
        details.update({'user': user, 'total': total})
1014
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1015
    
1016
    def _report_object_change(self, user, account, path, details={}):
1017
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1018
        details.update({'user': user})
1019
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1020
    
1021
    # Policy functions.
1022
    
1023
    def _check_policy(self, policy):
1024
        for k in policy.keys():
1025
            if policy[k] == '':
1026
                policy[k] = self.default_policy.get(k)
1027
        for k, v in policy.iteritems():
1028
            if k == 'quota':
1029
                q = int(v) # May raise ValueError.
1030
                if q < 0:
1031
                    raise ValueError
1032
            elif k == 'versioning':
1033
                if v not in ['auto', 'none']:
1034
                    raise ValueError
1035
            else:
1036
                raise ValueError
1037
    
1038
    def _put_policy(self, node, policy, replace):
1039
        if replace:
1040
            for k, v in self.default_policy.iteritems():
1041
                if k not in policy:
1042
                    policy[k] = v
1043
        self.node.policy_set(node, policy)
1044
    
1045
    def _get_policy(self, node):
1046
        policy = self.default_policy.copy()
1047
        policy.update(self.node.policy_get(node))
1048
        return policy
1049
    
1050
    def _apply_versioning(self, account, container, version_id):
1051
        """Delete the provided version if such is the policy.
1052
           Return size of object removed.
1053
        """
1054
        
1055
        if version_id is None:
1056
            return 0
1057
        path, node = self._lookup_container(account, container)
1058
        versioning = self._get_policy(node)['versioning']
1059
        if versioning != 'auto':
1060
            hash, size = self.node.version_remove(version_id)
1061
            self.store.map_delete(hash)
1062
            return size
1063
        return 0
1064
    
1065
    # Access control functions.
1066
    
1067
    def _check_groups(self, groups):
1068
        # raise ValueError('Bad characters in groups')
1069
        pass
1070
    
1071
    def _check_permissions(self, path, permissions):
1072
        # raise ValueError('Bad characters in permissions')
1073
        pass
1074
    
1075
    def _get_formatted_paths(self, paths):
1076
        formatted = []
1077
        for p in paths:
1078
            node = self.node.node_lookup(p)
1079
            if node is not None:
1080
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1081
            if props is not None:
1082
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1083
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1084
                formatted.append((p, self.MATCH_EXACT))
1085
        return formatted
1086
    
1087
    def _get_permissions_path(self, account, container, name):
1088
        path = '/'.join((account, container, name))
1089
        permission_paths = self.permissions.access_inherit(path)
1090
        permission_paths.sort()
1091
        permission_paths.reverse()
1092
        for p in permission_paths:
1093
            if p == path:
1094
                return p
1095
            else:
1096
                if p.count('/') < 2:
1097
                    continue
1098
                node = self.node.node_lookup(p)
1099
                if node is not None:
1100
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1101
                if props is not None:
1102
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1103
                        return p
1104
        return None
1105
    
1106
    def _can_read(self, user, account, container, name):
1107
        if user == account:
1108
            return True
1109
        path = '/'.join((account, container, name))
1110
        if self.permissions.public_get(path) is not None:
1111
            return True
1112
        path = self._get_permissions_path(account, container, name)
1113
        if not path:
1114
            raise NotAllowedError
1115
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1116
            raise NotAllowedError
1117
    
1118
    def _can_write(self, user, account, container, name):
1119
        if user == account:
1120
            return True
1121
        path = '/'.join((account, container, name))
1122
        path = self._get_permissions_path(account, container, name)
1123
        if not path:
1124
            raise NotAllowedError
1125
        if not self.permissions.access_check(path, self.WRITE, user):
1126
            raise NotAllowedError
1127
    
1128
    def _allowed_accounts(self, user):
1129
        allow = set()
1130
        for path in self.permissions.access_list_paths(user):
1131
            allow.add(path.split('/', 1)[0])
1132
        return sorted(allow)
1133
    
1134
    def _allowed_containers(self, user, account):
1135
        allow = set()
1136
        for path in self.permissions.access_list_paths(user, account):
1137
            allow.add(path.split('/', 2)[1])
1138
        return sorted(allow)