Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ f3b65e8f

History | View | Annotate | Download (50.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43

    
44
# Stripped-down version of the HashMap class found in tools.
45
class HashMap(list):
46

    
47
    def __init__(self, blocksize, blockhash):
48
        super(HashMap, self).__init__()
49
        self.blocksize = blocksize
50
        self.blockhash = blockhash
51

    
52
    def _hash_raw(self, v):
53
        h = hashlib.new(self.blockhash)
54
        h.update(v)
55
        return h.digest()
56

    
57
    def hash(self):
58
        if len(self) == 0:
59
            return self._hash_raw('')
60
        if len(self) == 1:
61
            return self.__getitem__(0)
62

    
63
        h = list(self)
64
        s = 2
65
        while s < len(h):
66
            s = s * 2
67
        h += [('\x00' * len(h[0]))] * (s - len(h))
68
        while len(h) > 1:
69
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70
        return h[0]
71

    
72
# Default modules and settings.
73
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76
DEFAULT_BLOCK_PATH = 'data/'
77
DEFAULT_BLOCK_UMASK = 0o022
78
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80

    
81
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82
QUEUE_CLIENT_ID = 'pithos'
83
QUEUE_INSTANCE_ID = '1'
84

    
85
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86

    
87
inf = float('inf')
88

    
89
ULTIMATE_ANSWER = 42
90

    
91

    
92
logger = logging.getLogger(__name__)
93

    
94

    
95
def backend_method(func=None, autocommit=1):
96
    if func is None:
97
        def fn(func):
98
            return backend_method(func, autocommit)
99
        return fn
100

    
101
    if not autocommit:
102
        return func
103
    def fn(self, *args, **kw):
104
        self.wrapper.execute()
105
        try:
106
            self.messages = []
107
            ret = func(self, *args, **kw)
108
            for m in self.messages:
109
                self.queue.send(*m)
110
            self.wrapper.commit()
111
            return ret
112
        except:
113
            self.wrapper.rollback()
114
            raise
115
    return fn
116

    
117

    
118
class ModularBackend(BaseBackend):
119
    """A modular backend.
120
    
121
    Uses modules for SQL functions and storage.
122
    """
123
    
124
    def __init__(self, db_module=None, db_connection=None,
125
                 block_module=None, block_path=None, block_umask=None,
126
                 queue_module=None, queue_connection=None):
127
        db_module = db_module or DEFAULT_DB_MODULE
128
        db_connection = db_connection or DEFAULT_DB_CONNECTION
129
        block_module = block_module or DEFAULT_BLOCK_MODULE
130
        block_path = block_path or DEFAULT_BLOCK_PATH
131
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
132
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134
        
135
        self.hash_algorithm = 'sha256'
136
        self.block_size = 4 * 1024 * 1024 # 4MB
137
        
138
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139
        
140
        def load_module(m):
141
            __import__(m)
142
            return sys.modules[m]
143
        
144
        self.db_module = load_module(db_module)
145
        self.wrapper = self.db_module.DBWrapper(db_connection)
146
        params = {'wrapper': self.wrapper}
147
        self.permissions = self.db_module.Permissions(**params)
148
        for x in ['READ', 'WRITE']:
149
            setattr(self, x, getattr(self.db_module, x))
150
        self.node = self.db_module.Node(**params)
151
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152
            setattr(self, x, getattr(self.db_module, x))
153
        
154
        self.block_module = load_module(block_module)
155
        params = {'path': block_path,
156
                  'block_size': self.block_size,
157
                  'hash_algorithm': self.hash_algorithm,
158
                  'umask': block_umask}
159
        self.store = self.block_module.Store(**params)
160

    
161
        if queue_module and queue_connection:
162
            self.queue_module = load_module(queue_module)
163
            params = {'exchange': queue_connection,
164
                      'client_id': QUEUE_CLIENT_ID}
165
            self.queue = self.queue_module.Queue(**params)
166
        else:
167
            class NoQueue:
168
                def send(self, *args):
169
                    pass
170
                
171
                def close(self):
172
                    pass
173
            
174
            self.queue = NoQueue()
175
    
176
    def close(self):
177
        self.wrapper.close()
178
        self.queue.close()
179
    
180
    @backend_method
181
    def list_accounts(self, user, marker=None, limit=10000):
182
        """Return a list of accounts the user can access."""
183
        
184
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
185
        allowed = self._allowed_accounts(user)
186
        start, limit = self._list_limits(allowed, marker, limit)
187
        return allowed[start:start + limit]
188
    
189
    @backend_method
190
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191
        """Return a dictionary with the account metadata for the domain."""
192
        
193
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
194
        path, node = self._lookup_account(account, user == account)
195
        if user != account:
196
            if until or node is None or account not in self._allowed_accounts(user):
197
                raise NotAllowedError
198
        try:
199
            props = self._get_properties(node, until)
200
            mtime = props[self.MTIME]
201
        except NameError:
202
            props = None
203
            mtime = until
204
        count, bytes, tstamp = self._get_statistics(node, until)
205
        tstamp = max(tstamp, mtime)
206
        if until is None:
207
            modified = tstamp
208
        else:
209
            modified = self._get_statistics(node)[2] # Overall last modification.
210
            modified = max(modified, mtime)
211
        
212
        if user != account:
213
            meta = {'name': account}
214
        else:
215
            meta = {}
216
            if props is not None and include_user_defined:
217
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218
            if until is not None:
219
                meta.update({'until_timestamp': tstamp})
220
            meta.update({'name': account, 'count': count, 'bytes': bytes})
221
        meta.update({'modified': modified})
222
        return meta
223
    
224
    @backend_method
225
    def update_account_meta(self, user, account, domain, meta, replace=False):
226
        """Update the metadata associated with the account for the domain."""
227
        
228
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
229
        if user != account:
230
            raise NotAllowedError
231
        path, node = self._lookup_account(account, True)
232
        self._put_metadata(user, node, domain, meta, replace)
233
    
234
    @backend_method
235
    def get_account_groups(self, user, account):
236
        """Return a dictionary with the user groups defined for this account."""
237
        
238
        logger.debug("get_account_groups: %s", account)
239
        if user != account:
240
            if account not in self._allowed_accounts(user):
241
                raise NotAllowedError
242
            return {}
243
        self._lookup_account(account, True)
244
        return self.permissions.group_dict(account)
245
    
246
    @backend_method
247
    def update_account_groups(self, user, account, groups, replace=False):
248
        """Update the groups associated with the account."""
249
        
250
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
251
        if user != account:
252
            raise NotAllowedError
253
        self._lookup_account(account, True)
254
        self._check_groups(groups)
255
        if replace:
256
            self.permissions.group_destroy(account)
257
        for k, v in groups.iteritems():
258
            if not replace: # If not already deleted.
259
                self.permissions.group_delete(account, k)
260
            if v:
261
                self.permissions.group_addmany(account, k, v)
262
    
263
    @backend_method
264
    def get_account_policy(self, user, account):
265
        """Return a dictionary with the account policy."""
266
        
267
        logger.debug("get_account_policy: %s", account)
268
        if user != account:
269
            if account not in self._allowed_accounts(user):
270
                raise NotAllowedError
271
            return {}
272
        path, node = self._lookup_account(account, True)
273
        return self._get_policy(node)
274
    
275
    @backend_method
276
    def update_account_policy(self, user, account, policy, replace=False):
277
        """Update the policy associated with the account."""
278
        
279
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
280
        if user != account:
281
            raise NotAllowedError
282
        path, node = self._lookup_account(account, True)
283
        self._check_policy(policy)
284
        self._put_policy(node, policy, replace)
285
    
286
    @backend_method
287
    def put_account(self, user, account, policy={}):
288
        """Create a new account with the given name."""
289
        
290
        logger.debug("put_account: %s %s", account, policy)
291
        if user != account:
292
            raise NotAllowedError
293
        node = self.node.node_lookup(account)
294
        if node is not None:
295
            raise NameError('Account already exists')
296
        if policy:
297
            self._check_policy(policy)
298
        node = self._put_path(user, self.ROOTNODE, account)
299
        self._put_policy(node, policy, True)
300
    
301
    @backend_method
302
    def delete_account(self, user, account):
303
        """Delete the account with the given name."""
304
        
305
        logger.debug("delete_account: %s", account)
306
        if user != account:
307
            raise NotAllowedError
308
        node = self.node.node_lookup(account)
309
        if node is None:
310
            return
311
        if not self.node.node_remove(node):
312
            raise IndexError('Account is not empty')
313
        self.permissions.group_destroy(account)
314
    
315
    @backend_method
316
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
317
        """Return a list of containers existing under an account."""
318
        
319
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
320
        if user != account:
321
            if until or account not in self._allowed_accounts(user):
322
                raise NotAllowedError
323
            allowed = self._allowed_containers(user, account)
324
            start, limit = self._list_limits(allowed, marker, limit)
325
            return allowed[start:start + limit]
326
        if shared:
327
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
328
            allowed = list(set(allowed))
329
            start, limit = self._list_limits(allowed, marker, limit)
330
            return allowed[start:start + limit]
331
        node = self.node.node_lookup(account)
332
        return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
333
    
334
    @backend_method
335
    def list_container_meta(self, user, account, container, domain, until=None):
336
        """Return a list with all the container's object meta keys for the domain."""
337
        
338
        logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
339
        allowed = []
340
        if user != account:
341
            if until:
342
                raise NotAllowedError
343
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
344
            if not allowed:
345
                raise NotAllowedError
346
        path, node = self._lookup_container(account, container)
347
        before = until if until is not None else inf
348
        allowed = self._get_formatted_paths(allowed)
349
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
350
    
351
    @backend_method
352
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
353
        """Return a dictionary with the container metadata for the domain."""
354
        
355
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
356
        if user != account:
357
            if until or container not in self._allowed_containers(user, account):
358
                raise NotAllowedError
359
        path, node = self._lookup_container(account, container)
360
        props = self._get_properties(node, until)
361
        mtime = props[self.MTIME]
362
        count, bytes, tstamp = self._get_statistics(node, until)
363
        tstamp = max(tstamp, mtime)
364
        if until is None:
365
            modified = tstamp
366
        else:
367
            modified = self._get_statistics(node)[2] # Overall last modification.
368
            modified = max(modified, mtime)
369
        
370
        if user != account:
371
            meta = {'name': container}
372
        else:
373
            meta = {}
374
            if include_user_defined:
375
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
376
            if until is not None:
377
                meta.update({'until_timestamp': tstamp})
378
            meta.update({'name': container, 'count': count, 'bytes': bytes})
379
        meta.update({'modified': modified})
380
        return meta
381
    
382
    @backend_method
383
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
384
        """Update the metadata associated with the container for the domain."""
385
        
386
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
387
        if user != account:
388
            raise NotAllowedError
389
        path, node = self._lookup_container(account, container)
390
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
391
        if src_version_id is not None:
392
            versioning = self._get_policy(node)['versioning']
393
            if versioning != 'auto':
394
                self.node.version_remove(src_version_id)
395
    
396
    @backend_method
397
    def get_container_policy(self, user, account, container):
398
        """Return a dictionary with the container policy."""
399
        
400
        logger.debug("get_container_policy: %s %s", account, container)
401
        if user != account:
402
            if container not in self._allowed_containers(user, account):
403
                raise NotAllowedError
404
            return {}
405
        path, node = self._lookup_container(account, container)
406
        return self._get_policy(node)
407
    
408
    @backend_method
409
    def update_container_policy(self, user, account, container, policy, replace=False):
410
        """Update the policy associated with the container."""
411
        
412
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
413
        if user != account:
414
            raise NotAllowedError
415
        path, node = self._lookup_container(account, container)
416
        self._check_policy(policy)
417
        self._put_policy(node, policy, replace)
418
    
419
    @backend_method
420
    def put_container(self, user, account, container, policy={}):
421
        """Create a new container with the given name."""
422
        
423
        logger.debug("put_container: %s %s %s", account, container, policy)
424
        if user != account:
425
            raise NotAllowedError
426
        try:
427
            path, node = self._lookup_container(account, container)
428
        except NameError:
429
            pass
430
        else:
431
            raise NameError('Container already exists')
432
        if policy:
433
            self._check_policy(policy)
434
        path = '/'.join((account, container))
435
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
436
        self._put_policy(node, policy, True)
437
    
438
    @backend_method
439
    def delete_container(self, user, account, container, until=None):
440
        """Delete/purge the container with the given name."""
441
        
442
        logger.debug("delete_container: %s %s %s", account, container, until)
443
        if user != account:
444
            raise NotAllowedError
445
        path, node = self._lookup_container(account, container)
446
        
447
        if until is not None:
448
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
449
            for h in hashes:
450
                self.store.map_delete(h)
451
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
452
            self._report_size_change(user, account, -size, {'action': 'container purge'})
453
            return
454
        
455
        if self._get_statistics(node)[0] > 0:
456
            raise IndexError('Container is not empty')
457
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
458
        for h in hashes:
459
            self.store.map_delete(h)
460
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
461
        self.node.node_remove(node)
462
        self._report_size_change(user, account, -size, {'action': 'container delete'})
463
    
464
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
465
        if user != account and until:
466
            raise NotAllowedError
467
        allowed = self._list_object_permissions(user, account, container, prefix, shared)
468
        if shared and not allowed:
469
            return []
470
        path, node = self._lookup_container(account, container)
471
        allowed = self._get_formatted_paths(allowed)
472
        return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
473
    
474
    def _list_object_permissions(self, user, account, container, prefix, shared):
475
        allowed = []
476
        path = '/'.join((account, container, prefix)).rstrip('/')
477
        if user != account:
478
            allowed = self.permissions.access_list_paths(user, path)
479
            if not allowed:
480
                raise NotAllowedError
481
        else:
482
            if shared:
483
                allowed = self.permissions.access_list_shared(path)
484
                if not allowed:
485
                    return []
486
        return allowed
487
    
488
    @backend_method
489
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
490
        """Return a list of object (name, version_id) tuples existing under a container."""
491
        
492
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
493
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
494
    
495
    @backend_method
496
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
497
        """Return a list of object metadata dicts existing under a container."""
498
        
499
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
500
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
501
        objects = []
502
        for p in props:
503
            if len(p) == 2:
504
                objects.append({'subdir': p[0]})
505
            else:
506
                objects.append({'name': p[0],
507
                                'bytes': p[self.SIZE + 1],
508
                                'type': p[self.TYPE + 1],
509
                                'hash': p[self.HASH + 1],
510
                                'version': p[self.SERIAL + 1],
511
                                'version_timestamp': p[self.MTIME + 1],
512
                                'modified': p[self.MTIME + 1] if until is None else None,
513
                                'modified_by': p[self.MUSER + 1],
514
                                'uuid': p[self.UUID + 1],
515
                                'checksum': p[self.CHECKSUM + 1]})
516
        return objects
517
    
518
    @backend_method
519
    def list_object_permissions(self, user, account, container, prefix=''):
520
        """Return a list of paths that enforce permissions under a container."""
521
        
522
        logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
523
        return self._list_object_permissions(user, account, container, prefix, True)
524
    
525
    @backend_method
526
    def list_object_public(self, user, account, container, prefix=''):
527
        """Return a dict mapping paths to public ids for objects that are public under a container."""
528
        
529
        logger.debug("list_object_public: %s %s %s", account, container, prefix)
530
        public = {}
531
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
532
            public[path] = p + ULTIMATE_ANSWER
533
        return public
534
    
535
    @backend_method
536
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
537
        """Return a dictionary with the object metadata for the domain."""
538
        
539
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
540
        self._can_read(user, account, container, name)
541
        path, node = self._lookup_object(account, container, name)
542
        props = self._get_version(node, version)
543
        if version is None:
544
            modified = props[self.MTIME]
545
        else:
546
            try:
547
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
548
            except NameError: # Object may be deleted.
549
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
550
                if del_props is None:
551
                    raise NameError('Object does not exist')
552
                modified = del_props[self.MTIME]
553
        
554
        meta = {}
555
        if include_user_defined:
556
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
557
        meta.update({'name': name,
558
                     'bytes': props[self.SIZE],
559
                     'type': props[self.TYPE],
560
                     'hash': props[self.HASH],
561
                     'version': props[self.SERIAL],
562
                     'version_timestamp': props[self.MTIME],
563
                     'modified': modified,
564
                     'modified_by': props[self.MUSER],
565
                     'uuid': props[self.UUID],
566
                     'checksum': props[self.CHECKSUM]})
567
        return meta
568
    
569
    @backend_method
570
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
571
        """Update the metadata associated with the object for the domain and return the new version."""
572
        
573
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
574
        self._can_write(user, account, container, name)
575
        path, node = self._lookup_object(account, container, name)
576
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
577
        self._apply_versioning(account, container, src_version_id)
578
        return dest_version_id
579
    
580
    @backend_method
581
    def get_object_permissions(self, user, account, container, name):
582
        """Return the action allowed on the object, the path
583
        from which the object gets its permissions from,
584
        along with a dictionary containing the permissions."""
585
        
586
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
587
        allowed = 'write'
588
        permissions_path = self._get_permissions_path(account, container, name)
589
        if user != account:
590
            if self.permissions.access_check(permissions_path, self.WRITE, user):
591
                allowed = 'write'
592
            elif self.permissions.access_check(permissions_path, self.READ, user):
593
                allowed = 'read'
594
            else:
595
                raise NotAllowedError
596
        self._lookup_object(account, container, name)
597
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
598
    
599
    @backend_method
600
    def update_object_permissions(self, user, account, container, name, permissions):
601
        """Update the permissions associated with the object."""
602
        
603
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
604
        if user != account:
605
            raise NotAllowedError
606
        path = self._lookup_object(account, container, name)[0]
607
        self._check_permissions(path, permissions)
608
        self.permissions.access_set(path, permissions)
609
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
610
    
611
    @backend_method
612
    def get_object_public(self, user, account, container, name):
613
        """Return the public id of the object if applicable."""
614
        
615
        logger.debug("get_object_public: %s %s %s", account, container, name)
616
        self._can_read(user, account, container, name)
617
        path = self._lookup_object(account, container, name)[0]
618
        p = self.permissions.public_get(path)
619
        if p is not None:
620
            p += ULTIMATE_ANSWER
621
        return p
622
    
623
    @backend_method
624
    def update_object_public(self, user, account, container, name, public):
625
        """Update the public status of the object."""
626
        
627
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
628
        self._can_write(user, account, container, name)
629
        path = self._lookup_object(account, container, name)[0]
630
        if not public:
631
            self.permissions.public_unset(path)
632
        else:
633
            self.permissions.public_set(path)
634
    
635
    @backend_method
636
    def get_object_hashmap(self, user, account, container, name, version=None):
637
        """Return the object's size and a list with partial hashes."""
638
        
639
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
640
        self._can_read(user, account, container, name)
641
        path, node = self._lookup_object(account, container, name)
642
        props = self._get_version(node, version)
643
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
644
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
645
    
646
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
647
        if permissions is not None and user != account:
648
            raise NotAllowedError
649
        self._can_write(user, account, container, name)
650
        if permissions is not None:
651
            path = '/'.join((account, container, name))
652
            self._check_permissions(path, permissions)
653
        
654
        account_path, account_node = self._lookup_account(account, True)
655
        container_path, container_node = self._lookup_container(account, container)
656
        path, node = self._put_object_node(container_path, container_node, name)
657
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
658
        
659
        # Handle meta.
660
        if src_version_id is None:
661
            src_version_id = pre_version_id
662
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
663
        
664
        # Check quota.
665
        del_size = self._apply_versioning(account, container, pre_version_id)
666
        size_delta = size - del_size
667
        if size_delta > 0:
668
            account_quota = long(self._get_policy(account_node)['quota'])
669
            container_quota = long(self._get_policy(container_node)['quota'])
670
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
671
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
672
                # This must be executed in a transaction, so the version is never created if it fails.
673
                raise QuotaError
674
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
675
        
676
        if permissions is not None:
677
            self.permissions.access_set(path, permissions)
678
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
679
        
680
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
681
        return dest_version_id
682
    
683
    @backend_method
684
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
685
        """Create/update an object with the specified size and partial hashes."""
686
        
687
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
688
        if size == 0: # No such thing as an empty hashmap.
689
            hashmap = [self.put_block('')]
690
        map = HashMap(self.block_size, self.hash_algorithm)
691
        map.extend([binascii.unhexlify(x) for x in hashmap])
692
        missing = self.store.block_search(map)
693
        if missing:
694
            ie = IndexError()
695
            ie.data = [binascii.hexlify(x) for x in missing]
696
            raise ie
697
        
698
        hash = map.hash()
699
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
700
        self.store.map_put(hash, map)
701
        return dest_version_id
702
    
703
    @backend_method
704
    def update_object_checksum(self, user, account, container, name, version, checksum):
705
        """Update an object's checksum."""
706
        
707
        logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
708
        # Update objects with greater version and same hashmap and size (fix metadata updates).
709
        self._can_write(user, account, container, name)
710
        path, node = self._lookup_object(account, container, name)
711
        props = self._get_version(node, version)
712
        versions = self.node.node_get_versions(node)
713
        for x in versions:
714
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
715
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
716
    
717
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
718
        self._can_read(user, src_account, src_container, src_name)
719
        path, node = self._lookup_object(src_account, src_container, src_name)
720
        # TODO: Will do another fetch of the properties in duplicate version...
721
        props = self._get_version(node, src_version) # Check to see if source exists.
722
        src_version_id = props[self.SERIAL]
723
        hash = props[self.HASH]
724
        size = props[self.SIZE]
725
        
726
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
727
        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
728
        return dest_version_id
729
    
730
    @backend_method
731
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
732
        """Copy an object's data and metadata."""
733
        
734
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
735
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
736
        return dest_version_id
737
    
738
    @backend_method
739
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
740
        """Move an object's data and metadata."""
741
        
742
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
743
        if user != src_account:
744
            raise NotAllowedError
745
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
746
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
747
            self._delete_object(user, src_account, src_container, src_name)
748
        return dest_version_id
749
    
750
    def _delete_object(self, user, account, container, name, until=None):
751
        if user != account:
752
            raise NotAllowedError
753
        
754
        if until is not None:
755
            path = '/'.join((account, container, name))
756
            node = self.node.node_lookup(path)
757
            if node is None:
758
                return
759
            hashes = []
760
            size = 0
761
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
762
            hashes += h
763
            size += s
764
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
765
            hashes += h
766
            size += s
767
            for h in hashes:
768
                self.store.map_delete(h)
769
            self.node.node_purge(node, until, CLUSTER_DELETED)
770
            try:
771
                props = self._get_version(node)
772
            except NameError:
773
                self.permissions.access_clear(path)
774
            self._report_size_change(user, account, -size, {'action': 'object purge'})
775
            return
776
        
777
        path, node = self._lookup_object(account, container, name)
778
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
779
        del_size = self._apply_versioning(account, container, src_version_id)
780
        if del_size:
781
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
782
        self._report_object_change(user, account, path, details={'action': 'object delete'})
783
        self.permissions.access_clear(path)
784
    
785
    @backend_method
786
    def delete_object(self, user, account, container, name, until=None):
787
        """Delete/purge an object."""
788
        
789
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
790
        self._delete_object(user, account, container, name, until)
791
    
792
    @backend_method
793
    def list_versions(self, user, account, container, name):
794
        """Return a list of all (version, version_timestamp) tuples for an object."""
795
        
796
        logger.debug("list_versions: %s %s %s", account, container, name)
797
        self._can_read(user, account, container, name)
798
        path, node = self._lookup_object(account, container, name)
799
        versions = self.node.node_get_versions(node)
800
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
801
    
802
    @backend_method
803
    def get_uuid(self, user, uuid):
804
        """Return the (account, container, name) for the UUID given."""
805
        
806
        logger.debug("get_uuid: %s", uuid)
807
        info = self.node.latest_uuid(uuid)
808
        if info is None:
809
            raise NameError
810
        path, serial = info
811
        account, container, name = path.split('/', 2)
812
        self._can_read(user, account, container, name)
813
        return (account, container, name)
814
    
815
    @backend_method
816
    def get_public(self, user, public):
817
        """Return the (account, container, name) for the public id given."""
818
        
819
        logger.debug("get_public: %s", public)
820
        if public is None or public < ULTIMATE_ANSWER:
821
            raise NameError
822
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
823
        if path is None:
824
            raise NameError
825
        account, container, name = path.split('/', 2)
826
        self._can_read(user, account, container, name)
827
        return (account, container, name)
828
    
829
    @backend_method(autocommit=0)
830
    def get_block(self, hash):
831
        """Return a block's data."""
832
        
833
        logger.debug("get_block: %s", hash)
834
        block = self.store.block_get(binascii.unhexlify(hash))
835
        if not block:
836
            raise NameError('Block does not exist')
837
        return block
838
    
839
    @backend_method(autocommit=0)
840
    def put_block(self, data):
841
        """Store a block and return the hash."""
842
        
843
        logger.debug("put_block: %s", len(data))
844
        return binascii.hexlify(self.store.block_put(data))
845
    
846
    @backend_method(autocommit=0)
847
    def update_block(self, hash, data, offset=0):
848
        """Update a known block and return the hash."""
849
        
850
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
851
        if offset == 0 and len(data) == self.block_size:
852
            return self.put_block(data)
853
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
854
        return binascii.hexlify(h)
855
    
856
    # Path functions.
857
    
858
    def _generate_uuid(self):
859
        return str(uuidlib.uuid4())
860
    
861
    def _put_object_node(self, path, parent, name):
862
        path = '/'.join((path, name))
863
        node = self.node.node_lookup(path)
864
        if node is None:
865
            node = self.node.node_create(parent, path)
866
        return path, node
867
    
868
    def _put_path(self, user, parent, path):
869
        node = self.node.node_create(parent, path)
870
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
871
        return node
872
    
873
    def _lookup_account(self, account, create=True):
874
        node = self.node.node_lookup(account)
875
        if node is None and create:
876
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
877
        return account, node
878
    
879
    def _lookup_container(self, account, container):
880
        path = '/'.join((account, container))
881
        node = self.node.node_lookup(path)
882
        if node is None:
883
            raise NameError('Container does not exist')
884
        return path, node
885
    
886
    def _lookup_object(self, account, container, name):
887
        path = '/'.join((account, container, name))
888
        node = self.node.node_lookup(path)
889
        if node is None:
890
            raise NameError('Object does not exist')
891
        return path, node
892
    
893
    def _get_properties(self, node, until=None):
894
        """Return properties until the timestamp given."""
895
        
896
        before = until if until is not None else inf
897
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
898
        if props is None and until is not None:
899
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
900
        if props is None:
901
            raise NameError('Path does not exist')
902
        return props
903
    
904
    def _get_statistics(self, node, until=None):
905
        """Return count, sum of size and latest timestamp of everything under node."""
906
        
907
        if until is None:
908
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
909
        else:
910
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
911
        if stats is None:
912
            stats = (0, 0, 0)
913
        return stats
914
    
915
    def _get_version(self, node, version=None):
916
        if version is None:
917
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
918
            if props is None:
919
                raise NameError('Object does not exist')
920
        else:
921
            try:
922
                version = int(version)
923
            except ValueError:
924
                raise IndexError('Version does not exist')
925
            props = self.node.version_get_properties(version)
926
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
927
                raise IndexError('Version does not exist')
928
        return props
929
    
930
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
931
        """Create a new version of the node."""
932
        
933
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
934
        if props is not None:
935
            src_version_id = props[self.SERIAL]
936
            src_hash = props[self.HASH]
937
            src_size = props[self.SIZE]
938
            src_type = props[self.TYPE]
939
            src_checksum = props[self.CHECKSUM]
940
        else:
941
            src_version_id = None
942
            src_hash = None
943
            src_size = 0
944
            src_type = ''
945
            src_checksum = ''
946
        if size is None: # Set metadata.
947
            hash = src_hash # This way hash can be set to None (account or container).
948
            size = src_size
949
        if type is None:
950
            type = src_type
951
        if checksum is None:
952
            checksum = src_checksum
953
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
954
        
955
        if src_node is None:
956
            pre_version_id = src_version_id
957
        else:
958
            pre_version_id = None
959
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
960
            if props is not None:
961
                pre_version_id = props[self.SERIAL]
962
        if pre_version_id is not None:
963
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
964
        
965
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
966
        return pre_version_id, dest_version_id
967
    
968
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
969
        if src_version_id is not None:
970
            self.node.attribute_copy(src_version_id, dest_version_id)
971
        if not replace:
972
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
973
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
974
        else:
975
            self.node.attribute_del(dest_version_id, domain)
976
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
977
    
978
    def _put_metadata(self, user, node, domain, meta, replace=False):
979
        """Create a new version and store metadata."""
980
        
981
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
982
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
983
        return src_version_id, dest_version_id
984
    
985
    def _list_limits(self, listing, marker, limit):
986
        start = 0
987
        if marker:
988
            try:
989
                start = listing.index(marker) + 1
990
            except ValueError:
991
                pass
992
        if not limit or limit > 10000:
993
            limit = 10000
994
        return start, limit
995
    
996
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
997
        cont_prefix = path + '/'
998
        prefix = cont_prefix + prefix
999
        start = cont_prefix + marker if marker else None
1000
        before = until if until is not None else inf
1001
        filterq = keys if domain else []
1002
        sizeq = size_range
1003
        
1004
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1005
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1006
        objects.sort(key=lambda x: x[0])
1007
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1008
        
1009
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1010
        return objects[start:start + limit]
1011
    
1012
    # Reporting functions.
1013
    
1014
    def _report_size_change(self, user, account, size, details={}):
1015
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1016
        account_node = self._lookup_account(account, True)[1]
1017
        total = self._get_statistics(account_node)[1]
1018
        details.update({'user': user, 'total': total})
1019
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1020
    
1021
    def _report_object_change(self, user, account, path, details={}):
1022
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1023
        details.update({'user': user})
1024
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1025
    
1026
    def _report_sharing_change(self, user, account, path, details={}):
1027
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1028
        details.update({'user': user})
1029
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1030
    
1031
    # Policy functions.
1032
    
1033
    def _check_policy(self, policy):
1034
        for k in policy.keys():
1035
            if policy[k] == '':
1036
                policy[k] = self.default_policy.get(k)
1037
        for k, v in policy.iteritems():
1038
            if k == 'quota':
1039
                q = int(v) # May raise ValueError.
1040
                if q < 0:
1041
                    raise ValueError
1042
            elif k == 'versioning':
1043
                if v not in ['auto', 'none']:
1044
                    raise ValueError
1045
            else:
1046
                raise ValueError
1047
    
1048
    def _put_policy(self, node, policy, replace):
1049
        if replace:
1050
            for k, v in self.default_policy.iteritems():
1051
                if k not in policy:
1052
                    policy[k] = v
1053
        self.node.policy_set(node, policy)
1054
    
1055
    def _get_policy(self, node):
1056
        policy = self.default_policy.copy()
1057
        policy.update(self.node.policy_get(node))
1058
        return policy
1059
    
1060
    def _apply_versioning(self, account, container, version_id):
1061
        """Delete the provided version if such is the policy.
1062
           Return size of object removed.
1063
        """
1064
        
1065
        if version_id is None:
1066
            return 0
1067
        path, node = self._lookup_container(account, container)
1068
        versioning = self._get_policy(node)['versioning']
1069
        if versioning != 'auto':
1070
            hash, size = self.node.version_remove(version_id)
1071
            self.store.map_delete(hash)
1072
            return size
1073
        return 0
1074
    
1075
    # Access control functions.
1076
    
1077
    def _check_groups(self, groups):
1078
        # raise ValueError('Bad characters in groups')
1079
        pass
1080
    
1081
    def _check_permissions(self, path, permissions):
1082
        # raise ValueError('Bad characters in permissions')
1083
        pass
1084
    
1085
    def _get_formatted_paths(self, paths):
1086
        formatted = []
1087
        for p in paths:
1088
            node = self.node.node_lookup(p)
1089
            if node is not None:
1090
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1091
            if props is not None:
1092
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1093
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1094
                formatted.append((p, self.MATCH_EXACT))
1095
        return formatted
1096
    
1097
    def _get_permissions_path(self, account, container, name):
1098
        path = '/'.join((account, container, name))
1099
        permission_paths = self.permissions.access_inherit(path)
1100
        permission_paths.sort()
1101
        permission_paths.reverse()
1102
        for p in permission_paths:
1103
            if p == path:
1104
                return p
1105
            else:
1106
                if p.count('/') < 2:
1107
                    continue
1108
                node = self.node.node_lookup(p)
1109
                if node is not None:
1110
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1111
                if props is not None:
1112
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1113
                        return p
1114
        return None
1115
    
1116
    def _can_read(self, user, account, container, name):
1117
        if user == account:
1118
            return True
1119
        path = '/'.join((account, container, name))
1120
        if self.permissions.public_get(path) is not None:
1121
            return True
1122
        path = self._get_permissions_path(account, container, name)
1123
        if not path:
1124
            raise NotAllowedError
1125
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1126
            raise NotAllowedError
1127
    
1128
    def _can_write(self, user, account, container, name):
1129
        if user == account:
1130
            return True
1131
        path = '/'.join((account, container, name))
1132
        path = self._get_permissions_path(account, container, name)
1133
        if not path:
1134
            raise NotAllowedError
1135
        if not self.permissions.access_check(path, self.WRITE, user):
1136
            raise NotAllowedError
1137
    
1138
    def _allowed_accounts(self, user):
1139
        allow = set()
1140
        for path in self.permissions.access_list_paths(user):
1141
            allow.add(path.split('/', 1)[0])
1142
        return sorted(allow)
1143
    
1144
    def _allowed_containers(self, user, account):
1145
        allow = set()
1146
        for path in self.permissions.access_list_paths(user, account):
1147
            allow.add(path.split('/', 2)[1])
1148
        return sorted(allow)