Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ 8d9a3fbd

History | View | Annotate | Download (49.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43

    
44
# Stripped-down version of the HashMap class found in tools.
45
class HashMap(list):
46

    
47
    def __init__(self, blocksize, blockhash):
48
        super(HashMap, self).__init__()
49
        self.blocksize = blocksize
50
        self.blockhash = blockhash
51

    
52
    def _hash_raw(self, v):
53
        h = hashlib.new(self.blockhash)
54
        h.update(v)
55
        return h.digest()
56

    
57
    def hash(self):
58
        if len(self) == 0:
59
            return self._hash_raw('')
60
        if len(self) == 1:
61
            return self.__getitem__(0)
62

    
63
        h = list(self)
64
        s = 2
65
        while s < len(h):
66
            s = s * 2
67
        h += [('\x00' * len(h[0]))] * (s - len(h))
68
        while len(h) > 1:
69
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70
        return h[0]
71

    
72
# Default modules and settings.
73
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76
DEFAULT_BLOCK_PATH = 'data/'
77
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
78
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
79

    
80
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
81
QUEUE_CLIENT_ID = 'pithos'
82

    
83
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
84

    
85
inf = float('inf')
86

    
87
ULTIMATE_ANSWER = 42
88

    
89

    
90
logger = logging.getLogger(__name__)
91

    
92

    
93
def backend_method(func=None, autocommit=1):
94
    if func is None:
95
        def fn(func):
96
            return backend_method(func, autocommit)
97
        return fn
98

    
99
    if not autocommit:
100
        return func
101
    def fn(self, *args, **kw):
102
        self.wrapper.execute()
103
        try:
104
            self.messages = []
105
            ret = func(self, *args, **kw)
106
            self.wrapper.commit()
107
            for m in self.messages:
108
                self.queue.send(*m)
109
            return ret
110
        except:
111
            self.wrapper.rollback()
112
            raise
113
    return fn
114

    
115

    
116
class ModularBackend(BaseBackend):
117
    """A modular backend.
118
    
119
    Uses modules for SQL functions and storage.
120
    """
121
    
122
    def __init__(self, db_module=None, db_connection=None,
123
                 block_module=None, block_path=None,
124
                 queue_module=None, queue_connection=None):
125
        db_module = db_module or DEFAULT_DB_MODULE
126
        db_connection = db_connection or DEFAULT_DB_CONNECTION
127
        block_module = block_module or DEFAULT_BLOCK_MODULE
128
        block_path = block_path or DEFAULT_BLOCK_PATH
129
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
130
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
131
        
132
        self.hash_algorithm = 'sha256'
133
        self.block_size = 4 * 1024 * 1024 # 4MB
134
        
135
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
136
        
137
        def load_module(m):
138
            __import__(m)
139
            return sys.modules[m]
140
        
141
        self.db_module = load_module(db_module)
142
        self.wrapper = self.db_module.DBWrapper(db_connection)
143
        params = {'wrapper': self.wrapper}
144
        self.permissions = self.db_module.Permissions(**params)
145
        for x in ['READ', 'WRITE']:
146
            setattr(self, x, getattr(self.db_module, x))
147
        self.node = self.db_module.Node(**params)
148
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
149
            setattr(self, x, getattr(self.db_module, x))
150
        
151
        self.block_module = load_module(block_module)
152
        params = {'path': block_path,
153
                  'block_size': self.block_size,
154
                  'hash_algorithm': self.hash_algorithm}
155
        self.store = self.block_module.Store(**params)
156

    
157
        if queue_module and queue_connection:
158
            self.queue_module = load_module(queue_module)
159
            params = {'exchange': queue_connection,
160
                      'client_id': QUEUE_CLIENT_ID}
161
            self.queue = self.queue_module.Queue(**params)
162
        else:
163
            class NoQueue:
164
                def send(self, *args):
165
                    pass
166
                
167
                def close(self):
168
                    pass
169
            
170
            self.queue = NoQueue()
171
    
172
    def close(self):
173
        self.wrapper.close()
174
        self.queue.close()
175
    
176
    @backend_method
177
    def list_accounts(self, user, marker=None, limit=10000):
178
        """Return a list of accounts the user can access."""
179
        
180
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
181
        allowed = self._allowed_accounts(user)
182
        start, limit = self._list_limits(allowed, marker, limit)
183
        return allowed[start:start + limit]
184
    
185
    @backend_method
186
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
187
        """Return a dictionary with the account metadata for the domain."""
188
        
189
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
190
        path, node = self._lookup_account(account, user == account)
191
        if user != account:
192
            if until or node is None or account not in self._allowed_accounts(user):
193
                raise NotAllowedError
194
        try:
195
            props = self._get_properties(node, until)
196
            mtime = props[self.MTIME]
197
        except NameError:
198
            props = None
199
            mtime = until
200
        count, bytes, tstamp = self._get_statistics(node, until)
201
        tstamp = max(tstamp, mtime)
202
        if until is None:
203
            modified = tstamp
204
        else:
205
            modified = self._get_statistics(node)[2] # Overall last modification.
206
            modified = max(modified, mtime)
207
        
208
        if user != account:
209
            meta = {'name': account}
210
        else:
211
            meta = {}
212
            if props is not None and include_user_defined:
213
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
214
            if until is not None:
215
                meta.update({'until_timestamp': tstamp})
216
            meta.update({'name': account, 'count': count, 'bytes': bytes})
217
        meta.update({'modified': modified})
218
        return meta
219
    
220
    @backend_method
221
    def update_account_meta(self, user, account, domain, meta, replace=False):
222
        """Update the metadata associated with the account for the domain."""
223
        
224
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
225
        if user != account:
226
            raise NotAllowedError
227
        path, node = self._lookup_account(account, True)
228
        self._put_metadata(user, node, domain, meta, replace)
229
    
230
    @backend_method
231
    def get_account_groups(self, user, account):
232
        """Return a dictionary with the user groups defined for this account."""
233
        
234
        logger.debug("get_account_groups: %s", account)
235
        if user != account:
236
            if account not in self._allowed_accounts(user):
237
                raise NotAllowedError
238
            return {}
239
        self._lookup_account(account, True)
240
        return self.permissions.group_dict(account)
241
    
242
    @backend_method
243
    def update_account_groups(self, user, account, groups, replace=False):
244
        """Update the groups associated with the account."""
245
        
246
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
247
        if user != account:
248
            raise NotAllowedError
249
        self._lookup_account(account, True)
250
        self._check_groups(groups)
251
        if replace:
252
            self.permissions.group_destroy(account)
253
        for k, v in groups.iteritems():
254
            if not replace: # If not already deleted.
255
                self.permissions.group_delete(account, k)
256
            if v:
257
                self.permissions.group_addmany(account, k, v)
258
    
259
    @backend_method
260
    def get_account_policy(self, user, account):
261
        """Return a dictionary with the account policy."""
262
        
263
        logger.debug("get_account_policy: %s", account)
264
        if user != account:
265
            if account not in self._allowed_accounts(user):
266
                raise NotAllowedError
267
            return {}
268
        path, node = self._lookup_account(account, True)
269
        return self._get_policy(node)
270
    
271
    @backend_method
272
    def update_account_policy(self, user, account, policy, replace=False):
273
        """Update the policy associated with the account."""
274
        
275
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
276
        if user != account:
277
            raise NotAllowedError
278
        path, node = self._lookup_account(account, True)
279
        self._check_policy(policy)
280
        self._put_policy(node, policy, replace)
281
    
282
    @backend_method
283
    def put_account(self, user, account, policy={}):
284
        """Create a new account with the given name."""
285
        
286
        logger.debug("put_account: %s %s", account, policy)
287
        if user != account:
288
            raise NotAllowedError
289
        node = self.node.node_lookup(account)
290
        if node is not None:
291
            raise NameError('Account already exists')
292
        if policy:
293
            self._check_policy(policy)
294
        node = self._put_path(user, self.ROOTNODE, account)
295
        self._put_policy(node, policy, True)
296
    
297
    @backend_method
298
    def delete_account(self, user, account):
299
        """Delete the account with the given name."""
300
        
301
        logger.debug("delete_account: %s", account)
302
        if user != account:
303
            raise NotAllowedError
304
        node = self.node.node_lookup(account)
305
        if node is None:
306
            return
307
        if not self.node.node_remove(node):
308
            raise IndexError('Account is not empty')
309
        self.permissions.group_destroy(account)
310
    
311
    @backend_method
312
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
313
        """Return a list of containers existing under an account."""
314
        
315
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
316
        if user != account:
317
            if until or account not in self._allowed_accounts(user):
318
                raise NotAllowedError
319
            allowed = self._allowed_containers(user, account)
320
            start, limit = self._list_limits(allowed, marker, limit)
321
            return allowed[start:start + limit]
322
        if shared:
323
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
324
            allowed = list(set(allowed))
325
            start, limit = self._list_limits(allowed, marker, limit)
326
            return allowed[start:start + limit]
327
        node = self.node.node_lookup(account)
328
        return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
329
    
330
    @backend_method
331
    def list_container_meta(self, user, account, container, domain, until=None):
332
        """Return a list with all the container's object meta keys for the domain."""
333
        
334
        logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
335
        allowed = []
336
        if user != account:
337
            if until:
338
                raise NotAllowedError
339
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
340
            if not allowed:
341
                raise NotAllowedError
342
        path, node = self._lookup_container(account, container)
343
        before = until if until is not None else inf
344
        allowed = self._get_formatted_paths(allowed)
345
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
346
    
347
    @backend_method
348
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
349
        """Return a dictionary with the container metadata for the domain."""
350
        
351
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
352
        if user != account:
353
            if until or container not in self._allowed_containers(user, account):
354
                raise NotAllowedError
355
        path, node = self._lookup_container(account, container)
356
        props = self._get_properties(node, until)
357
        mtime = props[self.MTIME]
358
        count, bytes, tstamp = self._get_statistics(node, until)
359
        tstamp = max(tstamp, mtime)
360
        if until is None:
361
            modified = tstamp
362
        else:
363
            modified = self._get_statistics(node)[2] # Overall last modification.
364
            modified = max(modified, mtime)
365
        
366
        if user != account:
367
            meta = {'name': container}
368
        else:
369
            meta = {}
370
            if include_user_defined:
371
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
372
            if until is not None:
373
                meta.update({'until_timestamp': tstamp})
374
            meta.update({'name': container, 'count': count, 'bytes': bytes})
375
        meta.update({'modified': modified})
376
        return meta
377
    
378
    @backend_method
379
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
380
        """Update the metadata associated with the container for the domain."""
381
        
382
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
383
        if user != account:
384
            raise NotAllowedError
385
        path, node = self._lookup_container(account, container)
386
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
387
        if src_version_id is not None:
388
            versioning = self._get_policy(node)['versioning']
389
            if versioning != 'auto':
390
                self.node.version_remove(src_version_id)
391
    
392
    @backend_method
393
    def get_container_policy(self, user, account, container):
394
        """Return a dictionary with the container policy."""
395
        
396
        logger.debug("get_container_policy: %s %s", account, container)
397
        if user != account:
398
            if container not in self._allowed_containers(user, account):
399
                raise NotAllowedError
400
            return {}
401
        path, node = self._lookup_container(account, container)
402
        return self._get_policy(node)
403
    
404
    @backend_method
405
    def update_container_policy(self, user, account, container, policy, replace=False):
406
        """Update the policy associated with the container."""
407
        
408
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
409
        if user != account:
410
            raise NotAllowedError
411
        path, node = self._lookup_container(account, container)
412
        self._check_policy(policy)
413
        self._put_policy(node, policy, replace)
414
    
415
    @backend_method
416
    def put_container(self, user, account, container, policy={}):
417
        """Create a new container with the given name."""
418
        
419
        logger.debug("put_container: %s %s %s", account, container, policy)
420
        if user != account:
421
            raise NotAllowedError
422
        try:
423
            path, node = self._lookup_container(account, container)
424
        except NameError:
425
            pass
426
        else:
427
            raise NameError('Container already exists')
428
        if policy:
429
            self._check_policy(policy)
430
        path = '/'.join((account, container))
431
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
432
        self._put_policy(node, policy, True)
433
    
434
    @backend_method
435
    def delete_container(self, user, account, container, until=None):
436
        """Delete/purge the container with the given name."""
437
        
438
        logger.debug("delete_container: %s %s %s", account, container, until)
439
        if user != account:
440
            raise NotAllowedError
441
        path, node = self._lookup_container(account, container)
442
        
443
        if until is not None:
444
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
445
            for h in hashes:
446
                self.store.map_delete(h)
447
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
448
            self._report_size_change(user, account, -size, {'action': 'container purge'})
449
            return
450
        
451
        if self._get_statistics(node)[0] > 0:
452
            raise IndexError('Container is not empty')
453
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
454
        for h in hashes:
455
            self.store.map_delete(h)
456
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
457
        self.node.node_remove(node)
458
        self._report_size_change(user, account, -size, {'action': 'container delete'})
459
    
460
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
461
        if user != account and until:
462
            raise NotAllowedError
463
        allowed = self._list_object_permissions(user, account, container, prefix, shared)
464
        if shared and not allowed:
465
            return []
466
        path, node = self._lookup_container(account, container)
467
        allowed = self._get_formatted_paths(allowed)
468
        return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
469
    
470
    def _list_object_permissions(self, user, account, container, prefix, shared):
471
        allowed = []
472
        path = '/'.join((account, container, prefix)).rstrip('/')
473
        if user != account:
474
            allowed = self.permissions.access_list_paths(user, path)
475
            if not allowed:
476
                raise NotAllowedError
477
        else:
478
            if shared:
479
                allowed = self.permissions.access_list_shared(path)
480
                if not allowed:
481
                    return []
482
        return allowed
483
    
484
    @backend_method
485
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
486
        """Return a list of object (name, version_id) tuples existing under a container."""
487
        
488
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
489
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
490
    
491
    @backend_method
492
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
493
        """Return a list of object metadata dicts existing under a container."""
494
        
495
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
496
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
497
        objects = []
498
        for p in props:
499
            if len(p) == 2:
500
                objects.append({'subdir': p[0]})
501
            else:
502
                objects.append({'name': p[0],
503
                                'bytes': p[self.SIZE + 1],
504
                                'type': p[self.TYPE + 1],
505
                                'hash': p[self.HASH + 1],
506
                                'version': p[self.SERIAL + 1],
507
                                'version_timestamp': p[self.MTIME + 1],
508
                                'modified': p[self.MTIME + 1] if until is None else None,
509
                                'modified_by': p[self.MUSER + 1],
510
                                'uuid': p[self.UUID + 1],
511
                                'checksum': p[self.CHECKSUM + 1]})
512
        return objects
513
    
514
    @backend_method
515
    def list_object_permissions(self, user, account, container, prefix=''):
516
        """Return a list of paths that enforce permissions under a container."""
517
        
518
        logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
519
        return self._list_object_permissions(user, account, container, prefix, True)
520
    
521
    @backend_method
522
    def list_object_public(self, user, account, container, prefix=''):
523
        """Return a dict mapping paths to public ids for objects that are public under a container."""
524
        
525
        logger.debug("list_object_public: %s %s %s", account, container, prefix)
526
        public = {}
527
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
528
            public[path] = p + ULTIMATE_ANSWER
529
        return public
530
    
531
    @backend_method
532
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
533
        """Return a dictionary with the object metadata for the domain."""
534
        
535
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
536
        self._can_read(user, account, container, name)
537
        path, node = self._lookup_object(account, container, name)
538
        props = self._get_version(node, version)
539
        if version is None:
540
            modified = props[self.MTIME]
541
        else:
542
            try:
543
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
544
            except NameError: # Object may be deleted.
545
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
546
                if del_props is None:
547
                    raise NameError('Object does not exist')
548
                modified = del_props[self.MTIME]
549
        
550
        meta = {}
551
        if include_user_defined:
552
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
553
        meta.update({'name': name,
554
                     'bytes': props[self.SIZE],
555
                     'type': props[self.TYPE],
556
                     'hash': props[self.HASH],
557
                     'version': props[self.SERIAL],
558
                     'version_timestamp': props[self.MTIME],
559
                     'modified': modified,
560
                     'modified_by': props[self.MUSER],
561
                     'uuid': props[self.UUID],
562
                     'checksum': props[self.CHECKSUM]})
563
        return meta
564
    
565
    @backend_method
566
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
567
        """Update the metadata associated with the object for the domain and return the new version."""
568
        
569
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
570
        self._can_write(user, account, container, name)
571
        path, node = self._lookup_object(account, container, name)
572
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
573
        self._apply_versioning(account, container, src_version_id)
574
        return dest_version_id
575
    
576
    @backend_method
577
    def get_object_permissions(self, user, account, container, name):
578
        """Return the action allowed on the object, the path
579
        from which the object gets its permissions from,
580
        along with a dictionary containing the permissions."""
581
        
582
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
583
        allowed = 'write'
584
        permissions_path = self._get_permissions_path(account, container, name)
585
        if user != account:
586
            if self.permissions.access_check(permissions_path, self.WRITE, user):
587
                allowed = 'write'
588
            elif self.permissions.access_check(permissions_path, self.READ, user):
589
                allowed = 'read'
590
            else:
591
                raise NotAllowedError
592
        self._lookup_object(account, container, name)
593
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
594
    
595
    @backend_method
596
    def update_object_permissions(self, user, account, container, name, permissions):
597
        """Update the permissions associated with the object."""
598
        
599
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
600
        if user != account:
601
            raise NotAllowedError
602
        path = self._lookup_object(account, container, name)[0]
603
        self._check_permissions(path, permissions)
604
        self.permissions.access_set(path, permissions)
605
    
606
    @backend_method
607
    def get_object_public(self, user, account, container, name):
608
        """Return the public id of the object if applicable."""
609
        
610
        logger.debug("get_object_public: %s %s %s", account, container, name)
611
        self._can_read(user, account, container, name)
612
        path = self._lookup_object(account, container, name)[0]
613
        p = self.permissions.public_get(path)
614
        if p is not None:
615
            p += ULTIMATE_ANSWER
616
        return p
617
    
618
    @backend_method
619
    def update_object_public(self, user, account, container, name, public):
620
        """Update the public status of the object."""
621
        
622
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
623
        self._can_write(user, account, container, name)
624
        path = self._lookup_object(account, container, name)[0]
625
        if not public:
626
            self.permissions.public_unset(path)
627
        else:
628
            self.permissions.public_set(path)
629
    
630
    @backend_method
631
    def get_object_hashmap(self, user, account, container, name, version=None):
632
        """Return the object's size and a list with partial hashes."""
633
        
634
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
635
        self._can_read(user, account, container, name)
636
        path, node = self._lookup_object(account, container, name)
637
        props = self._get_version(node, version)
638
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
639
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
640
    
641
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
642
        if permissions is not None and user != account:
643
            raise NotAllowedError
644
        self._can_write(user, account, container, name)
645
        if permissions is not None:
646
            path = '/'.join((account, container, name))
647
            self._check_permissions(path, permissions)
648
        
649
        account_path, account_node = self._lookup_account(account, True)
650
        container_path, container_node = self._lookup_container(account, container)
651
        path, node = self._put_object_node(container_path, container_node, name)
652
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
653
        
654
        # Handle meta.
655
        if src_version_id is None:
656
            src_version_id = pre_version_id
657
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
658
        
659
        # Check quota.
660
        del_size = self._apply_versioning(account, container, pre_version_id)
661
        size_delta = size - del_size
662
        if size_delta > 0:
663
            account_quota = long(self._get_policy(account_node)['quota'])
664
            container_quota = long(self._get_policy(container_node)['quota'])
665
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
666
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
667
                # This must be executed in a transaction, so the version is never created if it fails.
668
                raise QuotaError
669
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
670
        
671
        if permissions is not None:
672
            self.permissions.access_set(path, permissions)
673
        
674
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
675
        return dest_version_id
676
    
677
    @backend_method
678
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
679
        """Create/update an object with the specified size and partial hashes."""
680
        
681
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
682
        if size == 0: # No such thing as an empty hashmap.
683
            hashmap = [self.put_block('')]
684
        map = HashMap(self.block_size, self.hash_algorithm)
685
        map.extend([binascii.unhexlify(x) for x in hashmap])
686
        missing = self.store.block_search(map)
687
        if missing:
688
            ie = IndexError()
689
            ie.data = [binascii.hexlify(x) for x in missing]
690
            raise ie
691
        
692
        hash = map.hash()
693
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
694
        self.store.map_put(hash, map)
695
        return dest_version_id
696
    
697
    @backend_method
698
    def update_object_checksum(self, user, account, container, name, version, checksum):
699
        """Update an object's checksum."""
700
        
701
        logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
702
        # Update objects with greater version and same hashmap and size (fix metadata updates).
703
        self._can_write(user, account, container, name)
704
        path, node = self._lookup_object(account, container, name)
705
        props = self._get_version(node, version)
706
        versions = self.node.node_get_versions(node)
707
        for x in versions:
708
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
709
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
710
    
711
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
712
        self._can_read(user, src_account, src_container, src_name)
713
        path, node = self._lookup_object(src_account, src_container, src_name)
714
        # TODO: Will do another fetch of the properties in duplicate version...
715
        props = self._get_version(node, src_version) # Check to see if source exists.
716
        src_version_id = props[self.SERIAL]
717
        hash = props[self.HASH]
718
        size = props[self.SIZE]
719
        
720
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
721
        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
722
        return dest_version_id
723
    
724
    @backend_method
725
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
726
        """Copy an object's data and metadata."""
727
        
728
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
729
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
730
        return dest_version_id
731
    
732
    @backend_method
733
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
734
        """Move an object's data and metadata."""
735
        
736
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
737
        if user != src_account:
738
            raise NotAllowedError
739
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
740
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
741
            self._delete_object(user, src_account, src_container, src_name)
742
        return dest_version_id
743
    
744
    def _delete_object(self, user, account, container, name, until=None):
745
        if user != account:
746
            raise NotAllowedError
747
        
748
        if until is not None:
749
            path = '/'.join((account, container, name))
750
            node = self.node.node_lookup(path)
751
            if node is None:
752
                return
753
            hashes = []
754
            size = 0
755
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
756
            hashes += h
757
            size += s
758
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
759
            hashes += h
760
            size += s
761
            for h in hashes:
762
                self.store.map_delete(h)
763
            self.node.node_purge(node, until, CLUSTER_DELETED)
764
            try:
765
                props = self._get_version(node)
766
            except NameError:
767
                self.permissions.access_clear(path)
768
            self._report_size_change(user, account, -size, {'action': 'object purge'})
769
            return
770
        
771
        path, node = self._lookup_object(account, container, name)
772
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
773
        del_size = self._apply_versioning(account, container, src_version_id)
774
        if del_size:
775
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
776
        self._report_object_change(user, account, path, details={'action': 'object delete'})
777
        self.permissions.access_clear(path)
778
    
779
    @backend_method
780
    def delete_object(self, user, account, container, name, until=None):
781
        """Delete/purge an object."""
782
        
783
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
784
        self._delete_object(user, account, container, name, until)
785
    
786
    @backend_method
787
    def list_versions(self, user, account, container, name):
788
        """Return a list of all (version, version_timestamp) tuples for an object."""
789
        
790
        logger.debug("list_versions: %s %s %s", account, container, name)
791
        self._can_read(user, account, container, name)
792
        path, node = self._lookup_object(account, container, name)
793
        versions = self.node.node_get_versions(node)
794
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
795
    
796
    @backend_method
797
    def get_uuid(self, user, uuid):
798
        """Return the (account, container, name) for the UUID given."""
799
        
800
        logger.debug("get_uuid: %s", uuid)
801
        info = self.node.latest_uuid(uuid)
802
        if info is None:
803
            raise NameError
804
        path, serial = info
805
        account, container, name = path.split('/', 2)
806
        self._can_read(user, account, container, name)
807
        return (account, container, name)
808
    
809
    @backend_method
810
    def get_public(self, user, public):
811
        """Return the (account, container, name) for the public id given."""
812
        
813
        logger.debug("get_public: %s", public)
814
        if public is None or public < ULTIMATE_ANSWER:
815
            raise NameError
816
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
817
        if path is None:
818
            raise NameError
819
        account, container, name = path.split('/', 2)
820
        self._can_read(user, account, container, name)
821
        return (account, container, name)
822
    
823
    @backend_method(autocommit=0)
824
    def get_block(self, hash):
825
        """Return a block's data."""
826
        
827
        logger.debug("get_block: %s", hash)
828
        block = self.store.block_get(binascii.unhexlify(hash))
829
        if not block:
830
            raise NameError('Block does not exist')
831
        return block
832
    
833
    @backend_method(autocommit=0)
834
    def put_block(self, data):
835
        """Store a block and return the hash."""
836
        
837
        logger.debug("put_block: %s", len(data))
838
        return binascii.hexlify(self.store.block_put(data))
839
    
840
    @backend_method(autocommit=0)
841
    def update_block(self, hash, data, offset=0):
842
        """Update a known block and return the hash."""
843
        
844
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
845
        if offset == 0 and len(data) == self.block_size:
846
            return self.put_block(data)
847
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
848
        return binascii.hexlify(h)
849
    
850
    # Path functions.
851
    
852
    def _generate_uuid(self):
853
        return str(uuidlib.uuid4())
854
    
855
    def _put_object_node(self, path, parent, name):
856
        path = '/'.join((path, name))
857
        node = self.node.node_lookup(path)
858
        if node is None:
859
            node = self.node.node_create(parent, path)
860
        return path, node
861
    
862
    def _put_path(self, user, parent, path):
863
        node = self.node.node_create(parent, path)
864
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
865
        return node
866
    
867
    def _lookup_account(self, account, create=True):
868
        node = self.node.node_lookup(account)
869
        if node is None and create:
870
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
871
        return account, node
872
    
873
    def _lookup_container(self, account, container):
874
        path = '/'.join((account, container))
875
        node = self.node.node_lookup(path)
876
        if node is None:
877
            raise NameError('Container does not exist')
878
        return path, node
879
    
880
    def _lookup_object(self, account, container, name):
881
        path = '/'.join((account, container, name))
882
        node = self.node.node_lookup(path)
883
        if node is None:
884
            raise NameError('Object does not exist')
885
        return path, node
886
    
887
    def _get_properties(self, node, until=None):
888
        """Return properties until the timestamp given."""
889
        
890
        before = until if until is not None else inf
891
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
892
        if props is None and until is not None:
893
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
894
        if props is None:
895
            raise NameError('Path does not exist')
896
        return props
897
    
898
    def _get_statistics(self, node, until=None):
899
        """Return count, sum of size and latest timestamp of everything under node."""
900
        
901
        if until is None:
902
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
903
        else:
904
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
905
        if stats is None:
906
            stats = (0, 0, 0)
907
        return stats
908
    
909
    def _get_version(self, node, version=None):
910
        if version is None:
911
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
912
            if props is None:
913
                raise NameError('Object does not exist')
914
        else:
915
            try:
916
                version = int(version)
917
            except ValueError:
918
                raise IndexError('Version does not exist')
919
            props = self.node.version_get_properties(version)
920
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
921
                raise IndexError('Version does not exist')
922
        return props
923
    
924
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
925
        """Create a new version of the node."""
926
        
927
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
928
        if props is not None:
929
            src_version_id = props[self.SERIAL]
930
            src_hash = props[self.HASH]
931
            src_size = props[self.SIZE]
932
            src_type = props[self.TYPE]
933
            src_checksum = props[self.CHECKSUM]
934
        else:
935
            src_version_id = None
936
            src_hash = None
937
            src_size = 0
938
            src_type = ''
939
            src_checksum = ''
940
        if size is None: # Set metadata.
941
            hash = src_hash # This way hash can be set to None (account or container).
942
            size = src_size
943
        if type is None:
944
            type = src_type
945
        if checksum is None:
946
            checksum = src_checksum
947
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
948
        
949
        if src_node is None:
950
            pre_version_id = src_version_id
951
        else:
952
            pre_version_id = None
953
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
954
            if props is not None:
955
                pre_version_id = props[self.SERIAL]
956
        if pre_version_id is not None:
957
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
958
        
959
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
960
        return pre_version_id, dest_version_id
961
    
962
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
963
        if src_version_id is not None:
964
            self.node.attribute_copy(src_version_id, dest_version_id)
965
        if not replace:
966
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
967
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
968
        else:
969
            self.node.attribute_del(dest_version_id, domain)
970
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
971
    
972
    def _put_metadata(self, user, node, domain, meta, replace=False):
973
        """Create a new version and store metadata."""
974
        
975
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
976
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
977
        return src_version_id, dest_version_id
978
    
979
    def _list_limits(self, listing, marker, limit):
980
        start = 0
981
        if marker:
982
            try:
983
                start = listing.index(marker) + 1
984
            except ValueError:
985
                pass
986
        if not limit or limit > 10000:
987
            limit = 10000
988
        return start, limit
989
    
990
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
991
        cont_prefix = path + '/'
992
        prefix = cont_prefix + prefix
993
        start = cont_prefix + marker if marker else None
994
        before = until if until is not None else inf
995
        filterq = keys if domain else []
996
        sizeq = size_range
997
        
998
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
999
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1000
        objects.sort(key=lambda x: x[0])
1001
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1002
        
1003
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1004
        return objects[start:start + limit]
1005
    
1006
    # Reporting functions.
1007
    
1008
    def _report_size_change(self, user, account, size, details={}):
1009
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1010
        account_node = self._lookup_account(account, True)[1]
1011
        total = self._get_statistics(account_node)[1]
1012
        details.update({'user': user, 'total': total})
1013
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, 'diskspace', size, details))
1014
    
1015
    def _report_object_change(self, user, account, path, details={}):
1016
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1017
        details.update({'user': user})
1018
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, 'object', path, details))
1019
    
1020
    # Policy functions.
1021
    
1022
    def _check_policy(self, policy):
1023
        for k in policy.keys():
1024
            if policy[k] == '':
1025
                policy[k] = self.default_policy.get(k)
1026
        for k, v in policy.iteritems():
1027
            if k == 'quota':
1028
                q = int(v) # May raise ValueError.
1029
                if q < 0:
1030
                    raise ValueError
1031
            elif k == 'versioning':
1032
                if v not in ['auto', 'none']:
1033
                    raise ValueError
1034
            else:
1035
                raise ValueError
1036
    
1037
    def _put_policy(self, node, policy, replace):
1038
        if replace:
1039
            for k, v in self.default_policy.iteritems():
1040
                if k not in policy:
1041
                    policy[k] = v
1042
        self.node.policy_set(node, policy)
1043
    
1044
    def _get_policy(self, node):
1045
        policy = self.default_policy.copy()
1046
        policy.update(self.node.policy_get(node))
1047
        return policy
1048
    
1049
    def _apply_versioning(self, account, container, version_id):
1050
        """Delete the provided version if such is the policy.
1051
           Return size of object removed.
1052
        """
1053
        
1054
        if version_id is None:
1055
            return 0
1056
        path, node = self._lookup_container(account, container)
1057
        versioning = self._get_policy(node)['versioning']
1058
        if versioning != 'auto':
1059
            hash, size = self.node.version_remove(version_id)
1060
            self.store.map_delete(hash)
1061
            return size
1062
        return 0
1063
    
1064
    # Access control functions.
1065
    
1066
    def _check_groups(self, groups):
1067
        # raise ValueError('Bad characters in groups')
1068
        pass
1069
    
1070
    def _check_permissions(self, path, permissions):
1071
        # raise ValueError('Bad characters in permissions')
1072
        pass
1073
    
1074
    def _get_formatted_paths(self, paths):
1075
        formatted = []
1076
        for p in paths:
1077
            node = self.node.node_lookup(p)
1078
            if node is not None:
1079
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1080
            if props is not None:
1081
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1082
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1083
                formatted.append((p, self.MATCH_EXACT))
1084
        return formatted
1085
    
1086
    def _get_permissions_path(self, account, container, name):
1087
        path = '/'.join((account, container, name))
1088
        permission_paths = self.permissions.access_inherit(path)
1089
        permission_paths.sort()
1090
        permission_paths.reverse()
1091
        for p in permission_paths:
1092
            if p == path:
1093
                return p
1094
            else:
1095
                if p.count('/') < 2:
1096
                    continue
1097
                node = self.node.node_lookup(p)
1098
                if node is not None:
1099
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1100
                if props is not None:
1101
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1102
                        return p
1103
        return None
1104
    
1105
    def _can_read(self, user, account, container, name):
1106
        if user == account:
1107
            return True
1108
        path = '/'.join((account, container, name))
1109
        if self.permissions.public_get(path) is not None:
1110
            return True
1111
        path = self._get_permissions_path(account, container, name)
1112
        if not path:
1113
            raise NotAllowedError
1114
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1115
            raise NotAllowedError
1116
    
1117
    def _can_write(self, user, account, container, name):
1118
        if user == account:
1119
            return True
1120
        path = '/'.join((account, container, name))
1121
        path = self._get_permissions_path(account, container, name)
1122
        if not path:
1123
            raise NotAllowedError
1124
        if not self.permissions.access_check(path, self.WRITE, user):
1125
            raise NotAllowedError
1126
    
1127
    def _allowed_accounts(self, user):
1128
        allow = set()
1129
        for path in self.permissions.access_list_paths(user):
1130
            allow.add(path.split('/', 1)[0])
1131
        return sorted(allow)
1132
    
1133
    def _allowed_containers(self, user, account):
1134
        allow = set()
1135
        for path in self.permissions.access_list_paths(user, account):
1136
            allow.add(path.split('/', 2)[1])
1137
        return sorted(allow)