Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ f9ea264b

History | View | Annotate | Download (49 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43

    
44
# Stripped-down version of the HashMap class found in tools.
45
class HashMap(list):
46

    
47
    def __init__(self, blocksize, blockhash):
48
        super(HashMap, self).__init__()
49
        self.blocksize = blocksize
50
        self.blockhash = blockhash
51

    
52
    def _hash_raw(self, v):
53
        h = hashlib.new(self.blockhash)
54
        h.update(v)
55
        return h.digest()
56

    
57
    def hash(self):
58
        if len(self) == 0:
59
            return self._hash_raw('')
60
        if len(self) == 1:
61
            return self.__getitem__(0)
62

    
63
        h = list(self)
64
        s = 2
65
        while s < len(h):
66
            s = s * 2
67
        h += [('\x00' * len(h[0]))] * (s - len(h))
68
        while len(h) > 1:
69
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70
        return h[0]
71

    
72
# Default modules and settings.
73
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76
DEFAULT_BLOCK_PATH = 'data/'
77
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
78
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
79

    
80
QUEUE_MESSAGE_KEY = '#'
81
QUEUE_CLIENT_ID = 2 # Pithos.
82

    
83
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
84

    
85
inf = float('inf')
86

    
87
ULTIMATE_ANSWER = 42
88

    
89

    
90
logger = logging.getLogger(__name__)
91

    
92

    
93
def backend_method(func=None, autocommit=1):
94
    if func is None:
95
        def fn(func):
96
            return backend_method(func, autocommit)
97
        return fn
98

    
99
    if not autocommit:
100
        return func
101
    def fn(self, *args, **kw):
102
        self.wrapper.execute()
103
        try:
104
            ret = func(self, *args, **kw)
105
            self.wrapper.commit()
106
            return ret
107
        except:
108
            self.wrapper.rollback()
109
            raise
110
    return fn
111

    
112

    
113
class ModularBackend(BaseBackend):
114
    """A modular backend.
115
    
116
    Uses modules for SQL functions and storage.
117
    """
118
    
119
    def __init__(self, db_module=None, db_connection=None,
120
                 block_module=None, block_path=None,
121
                 queue_module=None, queue_connection=None):
122
        db_module = db_module or DEFAULT_DB_MODULE
123
        db_connection = db_connection or DEFAULT_DB_CONNECTION
124
        block_module = block_module or DEFAULT_BLOCK_MODULE
125
        block_path = block_path or DEFAULT_BLOCK_PATH
126
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
127
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
128
        
129
        self.hash_algorithm = 'sha256'
130
        self.block_size = 4 * 1024 * 1024 # 4MB
131
        
132
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
133
        
134
        def load_module(m):
135
            __import__(m)
136
            return sys.modules[m]
137
        
138
        self.db_module = load_module(db_module)
139
        self.wrapper = self.db_module.DBWrapper(db_connection)
140
        params = {'wrapper': self.wrapper}
141
        self.permissions = self.db_module.Permissions(**params)
142
        for x in ['READ', 'WRITE']:
143
            setattr(self, x, getattr(self.db_module, x))
144
        self.node = self.db_module.Node(**params)
145
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
146
            setattr(self, x, getattr(self.db_module, x))
147
        
148
        self.block_module = load_module(block_module)
149
        params = {'path': block_path,
150
                  'block_size': self.block_size,
151
                  'hash_algorithm': self.hash_algorithm}
152
        self.store = self.block_module.Store(**params)
153

    
154
        if queue_module and queue_connection:
155
            self.queue_module = load_module(queue_module)
156
            params = {'exchange': queue_connection,
157
                      'message_key': QUEUE_MESSAGE_KEY,
158
                      'client_id': QUEUE_CLIENT_ID}
159
            self.queue = self.queue_module.Queue(**params)
160
        else:
161
            class NoQueue:
162
                def send(self, *args):
163
                    pass
164
                
165
                def close(self):
166
                    pass
167
            
168
            self.queue = NoQueue()
169
    
170
    def close(self):
171
        self.wrapper.close()
172
        self.queue.close()
173
    
174
    @backend_method
175
    def list_accounts(self, user, marker=None, limit=10000):
176
        """Return a list of accounts the user can access."""
177
        
178
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
179
        allowed = self._allowed_accounts(user)
180
        start, limit = self._list_limits(allowed, marker, limit)
181
        return allowed[start:start + limit]
182
    
183
    @backend_method
184
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
185
        """Return a dictionary with the account metadata for the domain."""
186
        
187
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
188
        path, node = self._lookup_account(account, user == account)
189
        if user != account:
190
            if until or node is None or account not in self._allowed_accounts(user):
191
                raise NotAllowedError
192
        try:
193
            props = self._get_properties(node, until)
194
            mtime = props[self.MTIME]
195
        except NameError:
196
            props = None
197
            mtime = until
198
        count, bytes, tstamp = self._get_statistics(node, until)
199
        tstamp = max(tstamp, mtime)
200
        if until is None:
201
            modified = tstamp
202
        else:
203
            modified = self._get_statistics(node)[2] # Overall last modification.
204
            modified = max(modified, mtime)
205
        
206
        if user != account:
207
            meta = {'name': account}
208
        else:
209
            meta = {}
210
            if props is not None and include_user_defined:
211
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
212
            if until is not None:
213
                meta.update({'until_timestamp': tstamp})
214
            meta.update({'name': account, 'count': count, 'bytes': bytes})
215
        meta.update({'modified': modified})
216
        return meta
217
    
218
    @backend_method
219
    def update_account_meta(self, user, account, domain, meta, replace=False):
220
        """Update the metadata associated with the account for the domain."""
221
        
222
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
223
        if user != account:
224
            raise NotAllowedError
225
        path, node = self._lookup_account(account, True)
226
        self._put_metadata(user, node, domain, meta, replace)
227
    
228
    @backend_method
229
    def get_account_groups(self, user, account):
230
        """Return a dictionary with the user groups defined for this account."""
231
        
232
        logger.debug("get_account_groups: %s", account)
233
        if user != account:
234
            if account not in self._allowed_accounts(user):
235
                raise NotAllowedError
236
            return {}
237
        self._lookup_account(account, True)
238
        return self.permissions.group_dict(account)
239
    
240
    @backend_method
241
    def update_account_groups(self, user, account, groups, replace=False):
242
        """Update the groups associated with the account."""
243
        
244
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
245
        if user != account:
246
            raise NotAllowedError
247
        self._lookup_account(account, True)
248
        self._check_groups(groups)
249
        if replace:
250
            self.permissions.group_destroy(account)
251
        for k, v in groups.iteritems():
252
            if not replace: # If not already deleted.
253
                self.permissions.group_delete(account, k)
254
            if v:
255
                self.permissions.group_addmany(account, k, v)
256
    
257
    @backend_method
258
    def get_account_policy(self, user, account):
259
        """Return a dictionary with the account policy."""
260
        
261
        logger.debug("get_account_policy: %s", account)
262
        if user != account:
263
            if account not in self._allowed_accounts(user):
264
                raise NotAllowedError
265
            return {}
266
        path, node = self._lookup_account(account, True)
267
        return self._get_policy(node)
268
    
269
    @backend_method
270
    def update_account_policy(self, user, account, policy, replace=False):
271
        """Update the policy associated with the account."""
272
        
273
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
274
        if user != account:
275
            raise NotAllowedError
276
        path, node = self._lookup_account(account, True)
277
        self._check_policy(policy)
278
        self._put_policy(node, policy, replace)
279
    
280
    @backend_method
281
    def put_account(self, user, account, policy={}):
282
        """Create a new account with the given name."""
283
        
284
        logger.debug("put_account: %s %s", account, policy)
285
        if user != account:
286
            raise NotAllowedError
287
        node = self.node.node_lookup(account)
288
        if node is not None:
289
            raise NameError('Account already exists')
290
        if policy:
291
            self._check_policy(policy)
292
        node = self._put_path(user, self.ROOTNODE, account)
293
        self._put_policy(node, policy, True)
294
    
295
    @backend_method
296
    def delete_account(self, user, account):
297
        """Delete the account with the given name."""
298
        
299
        logger.debug("delete_account: %s", account)
300
        if user != account:
301
            raise NotAllowedError
302
        node = self.node.node_lookup(account)
303
        if node is None:
304
            return
305
        if not self.node.node_remove(node):
306
            raise IndexError('Account is not empty')
307
        self.permissions.group_destroy(account)
308
    
309
    @backend_method
310
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
311
        """Return a list of containers existing under an account."""
312
        
313
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
314
        if user != account:
315
            if until or account not in self._allowed_accounts(user):
316
                raise NotAllowedError
317
            allowed = self._allowed_containers(user, account)
318
            start, limit = self._list_limits(allowed, marker, limit)
319
            return allowed[start:start + limit]
320
        if shared:
321
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
322
            allowed = list(set(allowed))
323
            start, limit = self._list_limits(allowed, marker, limit)
324
            return allowed[start:start + limit]
325
        node = self.node.node_lookup(account)
326
        return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
327
    
328
    @backend_method
329
    def list_container_meta(self, user, account, container, domain, until=None):
330
        """Return a list with all the container's object meta keys for the domain."""
331
        
332
        logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
333
        allowed = []
334
        if user != account:
335
            if until:
336
                raise NotAllowedError
337
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
338
            if not allowed:
339
                raise NotAllowedError
340
        path, node = self._lookup_container(account, container)
341
        before = until if until is not None else inf
342
        allowed = self._get_formatted_paths(allowed)
343
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
344
    
345
    @backend_method
346
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
347
        """Return a dictionary with the container metadata for the domain."""
348
        
349
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
350
        if user != account:
351
            if until or container not in self._allowed_containers(user, account):
352
                raise NotAllowedError
353
        path, node = self._lookup_container(account, container)
354
        props = self._get_properties(node, until)
355
        mtime = props[self.MTIME]
356
        count, bytes, tstamp = self._get_statistics(node, until)
357
        tstamp = max(tstamp, mtime)
358
        if until is None:
359
            modified = tstamp
360
        else:
361
            modified = self._get_statistics(node)[2] # Overall last modification.
362
            modified = max(modified, mtime)
363
        
364
        if user != account:
365
            meta = {'name': container}
366
        else:
367
            meta = {}
368
            if include_user_defined:
369
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
370
            if until is not None:
371
                meta.update({'until_timestamp': tstamp})
372
            meta.update({'name': container, 'count': count, 'bytes': bytes})
373
        meta.update({'modified': modified})
374
        return meta
375
    
376
    @backend_method
377
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
378
        """Update the metadata associated with the container for the domain."""
379
        
380
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
381
        if user != account:
382
            raise NotAllowedError
383
        path, node = self._lookup_container(account, container)
384
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
385
        if src_version_id is not None:
386
            versioning = self._get_policy(node)['versioning']
387
            if versioning != 'auto':
388
                self.node.version_remove(src_version_id)
389
    
390
    @backend_method
391
    def get_container_policy(self, user, account, container):
392
        """Return a dictionary with the container policy."""
393
        
394
        logger.debug("get_container_policy: %s %s", account, container)
395
        if user != account:
396
            if container not in self._allowed_containers(user, account):
397
                raise NotAllowedError
398
            return {}
399
        path, node = self._lookup_container(account, container)
400
        return self._get_policy(node)
401
    
402
    @backend_method
403
    def update_container_policy(self, user, account, container, policy, replace=False):
404
        """Update the policy associated with the container."""
405
        
406
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
407
        if user != account:
408
            raise NotAllowedError
409
        path, node = self._lookup_container(account, container)
410
        self._check_policy(policy)
411
        self._put_policy(node, policy, replace)
412
    
413
    @backend_method
414
    def put_container(self, user, account, container, policy={}):
415
        """Create a new container with the given name."""
416
        
417
        logger.debug("put_container: %s %s %s", account, container, policy)
418
        if user != account:
419
            raise NotAllowedError
420
        try:
421
            path, node = self._lookup_container(account, container)
422
        except NameError:
423
            pass
424
        else:
425
            raise NameError('Container already exists')
426
        if policy:
427
            self._check_policy(policy)
428
        path = '/'.join((account, container))
429
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
430
        self._put_policy(node, policy, True)
431
    
432
    @backend_method
433
    def delete_container(self, user, account, container, until=None):
434
        """Delete/purge the container with the given name."""
435
        
436
        logger.debug("delete_container: %s %s %s", account, container, until)
437
        if user != account:
438
            raise NotAllowedError
439
        path, node = self._lookup_container(account, container)
440
        
441
        if until is not None:
442
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
443
            for h in hashes:
444
                self.store.map_delete(h)
445
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
446
            self._report_size_change(user, account, -size, {'action': 'container purge'})
447
            return
448
        
449
        if self._get_statistics(node)[0] > 0:
450
            raise IndexError('Container is not empty')
451
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
452
        for h in hashes:
453
            self.store.map_delete(h)
454
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
455
        self.node.node_remove(node)
456
        self._report_size_change(user, account, -size, {'action': 'container delete'})
457
    
458
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
459
        if user != account and until:
460
            raise NotAllowedError
461
        allowed = self._list_object_permissions(user, account, container, prefix, shared)
462
        if shared and not allowed:
463
            return []
464
        path, node = self._lookup_container(account, container)
465
        allowed = self._get_formatted_paths(allowed)
466
        return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
467
    
468
    def _list_object_permissions(self, user, account, container, prefix, shared):
469
        allowed = []
470
        path = '/'.join((account, container, prefix)).rstrip('/')
471
        if user != account:
472
            allowed = self.permissions.access_list_paths(user, path)
473
            if not allowed:
474
                raise NotAllowedError
475
        else:
476
            if shared:
477
                allowed = self.permissions.access_list_shared(path)
478
                if not allowed:
479
                    return []
480
        return allowed
481
    
482
    @backend_method
483
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
484
        """Return a list of object (name, version_id) tuples existing under a container."""
485
        
486
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
487
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
488
    
489
    @backend_method
490
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
491
        """Return a list of object metadata dicts existing under a container."""
492
        
493
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
494
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
495
        objects = []
496
        for p in props:
497
            if len(p) == 2:
498
                objects.append({'subdir': p[0]})
499
            else:
500
                objects.append({'name': p[0],
501
                                'bytes': p[self.SIZE + 1],
502
                                'type': p[self.TYPE + 1],
503
                                'hash': p[self.HASH + 1],
504
                                'version': p[self.SERIAL + 1],
505
                                'version_timestamp': p[self.MTIME + 1],
506
                                'modified': p[self.MTIME + 1] if until is None else None,
507
                                'modified_by': p[self.MUSER + 1],
508
                                'uuid': p[self.UUID + 1],
509
                                'checksum': p[self.CHECKSUM + 1]})
510
        return objects
511
    
512
    @backend_method
513
    def list_object_permissions(self, user, account, container, prefix=''):
514
        """Return a list of paths that enforce permissions under a container."""
515
        
516
        logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
517
        return self._list_object_permissions(user, account, container, prefix, True)
518
    
519
    @backend_method
520
    def list_object_public(self, user, account, container, prefix=''):
521
        """Return a dict mapping paths to public ids for objects that are public under a container."""
522
        
523
        logger.debug("list_object_public: %s %s %s", account, container, prefix)
524
        public = {}
525
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
526
            public[path] = p + ULTIMATE_ANSWER
527
        return public
528
    
529
    @backend_method
530
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
531
        """Return a dictionary with the object metadata for the domain."""
532
        
533
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
534
        self._can_read(user, account, container, name)
535
        path, node = self._lookup_object(account, container, name)
536
        props = self._get_version(node, version)
537
        if version is None:
538
            modified = props[self.MTIME]
539
        else:
540
            try:
541
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
542
            except NameError: # Object may be deleted.
543
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
544
                if del_props is None:
545
                    raise NameError('Object does not exist')
546
                modified = del_props[self.MTIME]
547
        
548
        meta = {}
549
        if include_user_defined:
550
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
551
        meta.update({'name': name,
552
                     'bytes': props[self.SIZE],
553
                     'type': props[self.TYPE],
554
                     'hash': props[self.HASH],
555
                     'version': props[self.SERIAL],
556
                     'version_timestamp': props[self.MTIME],
557
                     'modified': modified,
558
                     'modified_by': props[self.MUSER],
559
                     'uuid': props[self.UUID],
560
                     'checksum': props[self.CHECKSUM]})
561
        return meta
562
    
563
    @backend_method
564
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
565
        """Update the metadata associated with the object for the domain and return the new version."""
566
        
567
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
568
        self._can_write(user, account, container, name)
569
        path, node = self._lookup_object(account, container, name)
570
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
571
        self._apply_versioning(account, container, src_version_id)
572
        return dest_version_id
573
    
574
    @backend_method
575
    def get_object_permissions(self, user, account, container, name):
576
        """Return the action allowed on the object, the path
577
        from which the object gets its permissions from,
578
        along with a dictionary containing the permissions."""
579
        
580
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
581
        allowed = 'write'
582
        permissions_path = self._get_permissions_path(account, container, name)
583
        if user != account:
584
            if self.permissions.access_check(permissions_path, self.WRITE, user):
585
                allowed = 'write'
586
            elif self.permissions.access_check(permissions_path, self.READ, user):
587
                allowed = 'read'
588
            else:
589
                raise NotAllowedError
590
        self._lookup_object(account, container, name)
591
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
592
    
593
    @backend_method
594
    def update_object_permissions(self, user, account, container, name, permissions):
595
        """Update the permissions associated with the object."""
596
        
597
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
598
        if user != account:
599
            raise NotAllowedError
600
        path = self._lookup_object(account, container, name)[0]
601
        self._check_permissions(path, permissions)
602
        self.permissions.access_set(path, permissions)
603
    
604
    @backend_method
605
    def get_object_public(self, user, account, container, name):
606
        """Return the public id of the object if applicable."""
607
        
608
        logger.debug("get_object_public: %s %s %s", account, container, name)
609
        self._can_read(user, account, container, name)
610
        path = self._lookup_object(account, container, name)[0]
611
        p = self.permissions.public_get(path)
612
        if p is not None:
613
            p += ULTIMATE_ANSWER
614
        return p
615
    
616
    @backend_method
617
    def update_object_public(self, user, account, container, name, public):
618
        """Update the public status of the object."""
619
        
620
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
621
        self._can_write(user, account, container, name)
622
        path = self._lookup_object(account, container, name)[0]
623
        if not public:
624
            self.permissions.public_unset(path)
625
        else:
626
            self.permissions.public_set(path)
627
    
628
    @backend_method
629
    def get_object_hashmap(self, user, account, container, name, version=None):
630
        """Return the object's size and a list with partial hashes."""
631
        
632
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
633
        self._can_read(user, account, container, name)
634
        path, node = self._lookup_object(account, container, name)
635
        props = self._get_version(node, version)
636
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
637
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
638
    
639
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
640
        if permissions is not None and user != account:
641
            raise NotAllowedError
642
        self._can_write(user, account, container, name)
643
        if permissions is not None:
644
            path = '/'.join((account, container, name))
645
            self._check_permissions(path, permissions)
646
        
647
        account_path, account_node = self._lookup_account(account, True)
648
        container_path, container_node = self._lookup_container(account, container)
649
        path, node = self._put_object_node(container_path, container_node, name)
650
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
651
        
652
        # Handle meta.
653
        if src_version_id is None:
654
            src_version_id = pre_version_id
655
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
656
        
657
        # Check quota.
658
        del_size = self._apply_versioning(account, container, pre_version_id)
659
        size_delta = size - del_size
660
        if size_delta > 0:
661
            account_quota = long(self._get_policy(account_node)['quota'])
662
            container_quota = long(self._get_policy(container_node)['quota'])
663
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
664
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
665
                # This must be executed in a transaction, so the version is never created if it fails.
666
                raise QuotaError
667
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
668
        
669
        if permissions is not None:
670
            self.permissions.access_set(path, permissions)
671
        return dest_version_id
672
    
673
    @backend_method
674
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
675
        """Create/update an object with the specified size and partial hashes."""
676
        
677
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
678
        if size == 0: # No such thing as an empty hashmap.
679
            hashmap = [self.put_block('')]
680
        map = HashMap(self.block_size, self.hash_algorithm)
681
        map.extend([binascii.unhexlify(x) for x in hashmap])
682
        missing = self.store.block_search(map)
683
        if missing:
684
            ie = IndexError()
685
            ie.data = [binascii.hexlify(x) for x in missing]
686
            raise ie
687
        
688
        hash = map.hash()
689
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
690
        self.store.map_put(hash, map)
691
        return dest_version_id
692
    
693
    @backend_method
694
    def update_object_checksum(self, user, account, container, name, version, checksum):
695
        """Update an object's checksum."""
696
        
697
        logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
698
        # Update objects with greater version and same hashmap and size (fix metadata updates).
699
        self._can_write(user, account, container, name)
700
        path, node = self._lookup_object(account, container, name)
701
        props = self._get_version(node, version)
702
        versions = self.node.node_get_versions(node)
703
        for x in versions:
704
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
705
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
706
    
707
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
708
        self._can_read(user, src_account, src_container, src_name)
709
        path, node = self._lookup_object(src_account, src_container, src_name)
710
        # TODO: Will do another fetch of the properties in duplicate version...
711
        props = self._get_version(node, src_version) # Check to see if source exists.
712
        src_version_id = props[self.SERIAL]
713
        hash = props[self.HASH]
714
        size = props[self.SIZE]
715
        
716
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
717
        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
718
        return dest_version_id
719
    
720
    @backend_method
721
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
722
        """Copy an object's data and metadata."""
723
        
724
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
725
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
726
        return dest_version_id
727
    
728
    @backend_method
729
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
730
        """Move an object's data and metadata."""
731
        
732
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
733
        if user != src_account:
734
            raise NotAllowedError
735
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
736
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
737
            self._delete_object(user, src_account, src_container, src_name)
738
        return dest_version_id
739
    
740
    def _delete_object(self, user, account, container, name, until=None):
741
        if user != account:
742
            raise NotAllowedError
743
        
744
        if until is not None:
745
            path = '/'.join((account, container, name))
746
            node = self.node.node_lookup(path)
747
            if node is None:
748
                return
749
            hashes = []
750
            size = 0
751
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
752
            hashes += h
753
            size += s
754
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
755
            hashes += h
756
            size += s
757
            for h in hashes:
758
                self.store.map_delete(h)
759
            self.node.node_purge(node, until, CLUSTER_DELETED)
760
            try:
761
                props = self._get_version(node)
762
            except NameError:
763
                self.permissions.access_clear(path)
764
            self._report_size_change(user, account, -size, {'action': 'object purge'})
765
            return
766
        
767
        path, node = self._lookup_object(account, container, name)
768
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
769
        del_size = self._apply_versioning(account, container, src_version_id)
770
        if del_size:
771
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
772
        self.permissions.access_clear(path)
773
    
774
    @backend_method
775
    def delete_object(self, user, account, container, name, until=None):
776
        """Delete/purge an object."""
777
        
778
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
779
        self._delete_object(user, account, container, name, until)
780
    
781
    @backend_method
782
    def list_versions(self, user, account, container, name):
783
        """Return a list of all (version, version_timestamp) tuples for an object."""
784
        
785
        logger.debug("list_versions: %s %s %s", account, container, name)
786
        self._can_read(user, account, container, name)
787
        path, node = self._lookup_object(account, container, name)
788
        versions = self.node.node_get_versions(node)
789
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
790
    
791
    @backend_method
792
    def get_uuid(self, user, uuid):
793
        """Return the (account, container, name) for the UUID given."""
794
        
795
        logger.debug("get_uuid: %s", uuid)
796
        info = self.node.latest_uuid(uuid)
797
        if info is None:
798
            raise NameError
799
        path, serial = info
800
        account, container, name = path.split('/', 2)
801
        self._can_read(user, account, container, name)
802
        return (account, container, name)
803
    
804
    @backend_method
805
    def get_public(self, user, public):
806
        """Return the (account, container, name) for the public id given."""
807
        
808
        logger.debug("get_public: %s", public)
809
        if public is None or public < ULTIMATE_ANSWER:
810
            raise NameError
811
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
812
        if path is None:
813
            raise NameError
814
        account, container, name = path.split('/', 2)
815
        self._can_read(user, account, container, name)
816
        return (account, container, name)
817
    
818
    @backend_method(autocommit=0)
819
    def get_block(self, hash):
820
        """Return a block's data."""
821
        
822
        logger.debug("get_block: %s", hash)
823
        block = self.store.block_get(binascii.unhexlify(hash))
824
        if not block:
825
            raise NameError('Block does not exist')
826
        return block
827
    
828
    @backend_method(autocommit=0)
829
    def put_block(self, data):
830
        """Store a block and return the hash."""
831
        
832
        logger.debug("put_block: %s", len(data))
833
        return binascii.hexlify(self.store.block_put(data))
834
    
835
    @backend_method(autocommit=0)
836
    def update_block(self, hash, data, offset=0):
837
        """Update a known block and return the hash."""
838
        
839
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
840
        if offset == 0 and len(data) == self.block_size:
841
            return self.put_block(data)
842
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
843
        return binascii.hexlify(h)
844
    
845
    # Path functions.
846
    
847
    def _generate_uuid(self):
848
        return str(uuidlib.uuid4())
849
    
850
    def _put_object_node(self, path, parent, name):
851
        path = '/'.join((path, name))
852
        node = self.node.node_lookup(path)
853
        if node is None:
854
            node = self.node.node_create(parent, path)
855
        return path, node
856
    
857
    def _put_path(self, user, parent, path):
858
        node = self.node.node_create(parent, path)
859
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
860
        return node
861
    
862
    def _lookup_account(self, account, create=True):
863
        node = self.node.node_lookup(account)
864
        if node is None and create:
865
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
866
        return account, node
867
    
868
    def _lookup_container(self, account, container):
869
        path = '/'.join((account, container))
870
        node = self.node.node_lookup(path)
871
        if node is None:
872
            raise NameError('Container does not exist')
873
        return path, node
874
    
875
    def _lookup_object(self, account, container, name):
876
        path = '/'.join((account, container, name))
877
        node = self.node.node_lookup(path)
878
        if node is None:
879
            raise NameError('Object does not exist')
880
        return path, node
881
    
882
    def _get_properties(self, node, until=None):
883
        """Return properties until the timestamp given."""
884
        
885
        before = until if until is not None else inf
886
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
887
        if props is None and until is not None:
888
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
889
        if props is None:
890
            raise NameError('Path does not exist')
891
        return props
892
    
893
    def _get_statistics(self, node, until=None):
894
        """Return count, sum of size and latest timestamp of everything under node."""
895
        
896
        if until is None:
897
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
898
        else:
899
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
900
        if stats is None:
901
            stats = (0, 0, 0)
902
        return stats
903
    
904
    def _get_version(self, node, version=None):
905
        if version is None:
906
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
907
            if props is None:
908
                raise NameError('Object does not exist')
909
        else:
910
            try:
911
                version = int(version)
912
            except ValueError:
913
                raise IndexError('Version does not exist')
914
            props = self.node.version_get_properties(version)
915
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
916
                raise IndexError('Version does not exist')
917
        return props
918
    
919
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
920
        """Create a new version of the node."""
921
        
922
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
923
        if props is not None:
924
            src_version_id = props[self.SERIAL]
925
            src_hash = props[self.HASH]
926
            src_size = props[self.SIZE]
927
            src_type = props[self.TYPE]
928
            src_checksum = props[self.CHECKSUM]
929
        else:
930
            src_version_id = None
931
            src_hash = None
932
            src_size = 0
933
            src_type = ''
934
            src_checksum = ''
935
        if size is None: # Set metadata.
936
            hash = src_hash # This way hash can be set to None (account or container).
937
            size = src_size
938
        if type is None:
939
            type = src_type
940
        if checksum is None:
941
            checksum = src_checksum
942
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
943
        
944
        if src_node is None:
945
            pre_version_id = src_version_id
946
        else:
947
            pre_version_id = None
948
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
949
            if props is not None:
950
                pre_version_id = props[self.SERIAL]
951
        if pre_version_id is not None:
952
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
953
        
954
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
955
        return pre_version_id, dest_version_id
956
    
957
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
958
        if src_version_id is not None:
959
            self.node.attribute_copy(src_version_id, dest_version_id)
960
        if not replace:
961
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
962
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
963
        else:
964
            self.node.attribute_del(dest_version_id, domain)
965
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
966
    
967
    def _put_metadata(self, user, node, domain, meta, replace=False):
968
        """Create a new version and store metadata."""
969
        
970
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
971
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
972
        return src_version_id, dest_version_id
973
    
974
    def _list_limits(self, listing, marker, limit):
975
        start = 0
976
        if marker:
977
            try:
978
                start = listing.index(marker) + 1
979
            except ValueError:
980
                pass
981
        if not limit or limit > 10000:
982
            limit = 10000
983
        return start, limit
984
    
985
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
986
        cont_prefix = path + '/'
987
        prefix = cont_prefix + prefix
988
        start = cont_prefix + marker if marker else None
989
        before = until if until is not None else inf
990
        filterq = keys if domain else []
991
        sizeq = size_range
992
        
993
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
994
        objects.extend([(p, None) for p in prefixes] if virtual else [])
995
        objects.sort(key=lambda x: x[0])
996
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
997
        
998
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
999
        return objects[start:start + limit]
1000
    
1001
    # Reporting functions.
1002
    
1003
    def _report_size_change(self, user, account, size, details={}):
1004
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1005
        account_node = self._lookup_account(account, True)[1]
1006
        total = self._get_statistics(account_node)[1]
1007
        details.update({'user': user, 'total': total})
1008
        self.queue.send(account, 'diskspace', size, details)
1009
    
1010
    # Policy functions.
1011
    
1012
    def _check_policy(self, policy):
1013
        for k in policy.keys():
1014
            if policy[k] == '':
1015
                policy[k] = self.default_policy.get(k)
1016
        for k, v in policy.iteritems():
1017
            if k == 'quota':
1018
                q = int(v) # May raise ValueError.
1019
                if q < 0:
1020
                    raise ValueError
1021
            elif k == 'versioning':
1022
                if v not in ['auto', 'none']:
1023
                    raise ValueError
1024
            else:
1025
                raise ValueError
1026
    
1027
    def _put_policy(self, node, policy, replace):
1028
        if replace:
1029
            for k, v in self.default_policy.iteritems():
1030
                if k not in policy:
1031
                    policy[k] = v
1032
        self.node.policy_set(node, policy)
1033
    
1034
    def _get_policy(self, node):
1035
        policy = self.default_policy.copy()
1036
        policy.update(self.node.policy_get(node))
1037
        return policy
1038
    
1039
    def _apply_versioning(self, account, container, version_id):
1040
        """Delete the provided version if such is the policy.
1041
           Return size of object removed.
1042
        """
1043
        
1044
        if version_id is None:
1045
            return 0
1046
        path, node = self._lookup_container(account, container)
1047
        versioning = self._get_policy(node)['versioning']
1048
        if versioning != 'auto':
1049
            hash, size = self.node.version_remove(version_id)
1050
            self.store.map_delete(hash)
1051
            return size
1052
        return 0
1053
    
1054
    # Access control functions.
1055
    
1056
    def _check_groups(self, groups):
1057
        # raise ValueError('Bad characters in groups')
1058
        pass
1059
    
1060
    def _check_permissions(self, path, permissions):
1061
        # raise ValueError('Bad characters in permissions')
1062
        pass
1063
    
1064
    def _get_formatted_paths(self, paths):
1065
        formatted = []
1066
        for p in paths:
1067
            node = self.node.node_lookup(p)
1068
            if node is not None:
1069
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1070
            if props is not None:
1071
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1072
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1073
                formatted.append((p, self.MATCH_EXACT))
1074
        return formatted
1075
    
1076
    def _get_permissions_path(self, account, container, name):
1077
        path = '/'.join((account, container, name))
1078
        permission_paths = self.permissions.access_inherit(path)
1079
        permission_paths.sort()
1080
        permission_paths.reverse()
1081
        for p in permission_paths:
1082
            if p == path:
1083
                return p
1084
            else:
1085
                if p.count('/') < 2:
1086
                    continue
1087
                node = self.node.node_lookup(p)
1088
                if node is not None:
1089
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1090
                if props is not None:
1091
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1092
                        return p
1093
        return None
1094
    
1095
    def _can_read(self, user, account, container, name):
1096
        if user == account:
1097
            return True
1098
        path = '/'.join((account, container, name))
1099
        if self.permissions.public_get(path) is not None:
1100
            return True
1101
        path = self._get_permissions_path(account, container, name)
1102
        if not path:
1103
            raise NotAllowedError
1104
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1105
            raise NotAllowedError
1106
    
1107
    def _can_write(self, user, account, container, name):
1108
        if user == account:
1109
            return True
1110
        path = '/'.join((account, container, name))
1111
        path = self._get_permissions_path(account, container, name)
1112
        if not path:
1113
            raise NotAllowedError
1114
        if not self.permissions.access_check(path, self.WRITE, user):
1115
            raise NotAllowedError
1116
    
1117
    def _allowed_accounts(self, user):
1118
        allow = set()
1119
        for path in self.permissions.access_list_paths(user):
1120
            allow.add(path.split('/', 1)[0])
1121
        return sorted(allow)
1122
    
1123
    def _allowed_containers(self, user, account):
1124
        allow = set()
1125
        for path in self.permissions.access_list_paths(user, account):
1126
            allow.add(path.split('/', 2)[1])
1127
        return sorted(allow)