Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / modular.py @ a74ba506

History | View | Annotate | Download (50.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43

    
44
# Stripped-down version of the HashMap class found in tools.
45
class HashMap(list):
46

    
47
    def __init__(self, blocksize, blockhash):
48
        super(HashMap, self).__init__()
49
        self.blocksize = blocksize
50
        self.blockhash = blockhash
51

    
52
    def _hash_raw(self, v):
53
        h = hashlib.new(self.blockhash)
54
        h.update(v)
55
        return h.digest()
56

    
57
    def hash(self):
58
        if len(self) == 0:
59
            return self._hash_raw('')
60
        if len(self) == 1:
61
            return self.__getitem__(0)
62

    
63
        h = list(self)
64
        s = 2
65
        while s < len(h):
66
            s = s * 2
67
        h += [('\x00' * len(h[0]))] * (s - len(h))
68
        while len(h) > 1:
69
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70
        return h[0]
71

    
72
# Default modules and settings.
73
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76
DEFAULT_BLOCK_PATH = 'data/'
77
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
78
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
79

    
80
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
81
QUEUE_CLIENT_ID = 'pithos'
82
QUEUE_INSTANCE_ID = '1'
83

    
84
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
85

    
86
inf = float('inf')
87

    
88
ULTIMATE_ANSWER = 42
89

    
90

    
91
logger = logging.getLogger(__name__)
92

    
93

    
94
def backend_method(func=None, autocommit=1):
95
    if func is None:
96
        def fn(func):
97
            return backend_method(func, autocommit)
98
        return fn
99

    
100
    if not autocommit:
101
        return func
102
    def fn(self, *args, **kw):
103
        self.wrapper.execute()
104
        try:
105
            self.messages = []
106
            ret = func(self, *args, **kw)
107
            for m in self.messages:
108
                self.queue.send(*m)
109
            self.wrapper.commit()
110
            return ret
111
        except:
112
            self.wrapper.rollback()
113
            raise
114
    return fn
115

    
116

    
117
class ModularBackend(BaseBackend):
118
    """A modular backend.
119
    
120
    Uses modules for SQL functions and storage.
121
    """
122
    
123
    def __init__(self, db_module=None, db_connection=None,
124
                 block_module=None, block_path=None,
125
                 queue_module=None, queue_connection=None):
126
        db_module = db_module or DEFAULT_DB_MODULE
127
        db_connection = db_connection or DEFAULT_DB_CONNECTION
128
        block_module = block_module or DEFAULT_BLOCK_MODULE
129
        block_path = block_path or DEFAULT_BLOCK_PATH
130
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
131
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
132
        
133
        self.hash_algorithm = 'sha256'
134
        self.block_size = 4 * 1024 * 1024 # 4MB
135
        
136
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
137
        
138
        def load_module(m):
139
            __import__(m)
140
            return sys.modules[m]
141
        
142
        self.db_module = load_module(db_module)
143
        self.wrapper = self.db_module.DBWrapper(db_connection)
144
        params = {'wrapper': self.wrapper}
145
        self.permissions = self.db_module.Permissions(**params)
146
        for x in ['READ', 'WRITE']:
147
            setattr(self, x, getattr(self.db_module, x))
148
        self.node = self.db_module.Node(**params)
149
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
150
            setattr(self, x, getattr(self.db_module, x))
151
        
152
        self.block_module = load_module(block_module)
153
        params = {'path': block_path,
154
                  'block_size': self.block_size,
155
                  'hash_algorithm': self.hash_algorithm}
156
        self.store = self.block_module.Store(**params)
157

    
158
        if queue_module and queue_connection:
159
            self.queue_module = load_module(queue_module)
160
            params = {'exchange': queue_connection,
161
                      'client_id': QUEUE_CLIENT_ID}
162
            self.queue = self.queue_module.Queue(**params)
163
        else:
164
            class NoQueue:
165
                def send(self, *args):
166
                    pass
167
                
168
                def close(self):
169
                    pass
170
            
171
            self.queue = NoQueue()
172
    
173
    def close(self):
174
        self.wrapper.close()
175
        self.queue.close()
176
    
177
    @backend_method
178
    def list_accounts(self, user, marker=None, limit=10000):
179
        """Return a list of accounts the user can access."""
180
        
181
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
182
        allowed = self._allowed_accounts(user)
183
        start, limit = self._list_limits(allowed, marker, limit)
184
        return allowed[start:start + limit]
185
    
186
    @backend_method
187
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
188
        """Return a dictionary with the account metadata for the domain."""
189
        
190
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
191
        path, node = self._lookup_account(account, user == account)
192
        if user != account:
193
            if until or node is None or account not in self._allowed_accounts(user):
194
                raise NotAllowedError
195
        try:
196
            props = self._get_properties(node, until)
197
            mtime = props[self.MTIME]
198
        except NameError:
199
            props = None
200
            mtime = until
201
        count, bytes, tstamp = self._get_statistics(node, until)
202
        tstamp = max(tstamp, mtime)
203
        if until is None:
204
            modified = tstamp
205
        else:
206
            modified = self._get_statistics(node)[2] # Overall last modification.
207
            modified = max(modified, mtime)
208
        
209
        if user != account:
210
            meta = {'name': account}
211
        else:
212
            meta = {}
213
            if props is not None and include_user_defined:
214
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
215
            if until is not None:
216
                meta.update({'until_timestamp': tstamp})
217
            meta.update({'name': account, 'count': count, 'bytes': bytes})
218
        meta.update({'modified': modified})
219
        return meta
220
    
221
    @backend_method
222
    def update_account_meta(self, user, account, domain, meta, replace=False):
223
        """Update the metadata associated with the account for the domain."""
224
        
225
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
226
        if user != account:
227
            raise NotAllowedError
228
        path, node = self._lookup_account(account, True)
229
        self._put_metadata(user, node, domain, meta, replace)
230
    
231
    @backend_method
232
    def get_account_groups(self, user, account):
233
        """Return a dictionary with the user groups defined for this account."""
234
        
235
        logger.debug("get_account_groups: %s", account)
236
        if user != account:
237
            if account not in self._allowed_accounts(user):
238
                raise NotAllowedError
239
            return {}
240
        self._lookup_account(account, True)
241
        return self.permissions.group_dict(account)
242
    
243
    @backend_method
244
    def update_account_groups(self, user, account, groups, replace=False):
245
        """Update the groups associated with the account."""
246
        
247
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
248
        if user != account:
249
            raise NotAllowedError
250
        self._lookup_account(account, True)
251
        self._check_groups(groups)
252
        if replace:
253
            self.permissions.group_destroy(account)
254
        for k, v in groups.iteritems():
255
            if not replace: # If not already deleted.
256
                self.permissions.group_delete(account, k)
257
            if v:
258
                self.permissions.group_addmany(account, k, v)
259
    
260
    @backend_method
261
    def get_account_policy(self, user, account):
262
        """Return a dictionary with the account policy."""
263
        
264
        logger.debug("get_account_policy: %s", account)
265
        if user != account:
266
            if account not in self._allowed_accounts(user):
267
                raise NotAllowedError
268
            return {}
269
        path, node = self._lookup_account(account, True)
270
        return self._get_policy(node)
271
    
272
    @backend_method
273
    def update_account_policy(self, user, account, policy, replace=False):
274
        """Update the policy associated with the account."""
275
        
276
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
277
        if user != account:
278
            raise NotAllowedError
279
        path, node = self._lookup_account(account, True)
280
        self._check_policy(policy)
281
        self._put_policy(node, policy, replace)
282
    
283
    @backend_method
284
    def put_account(self, user, account, policy={}):
285
        """Create a new account with the given name."""
286
        
287
        logger.debug("put_account: %s %s", account, policy)
288
        if user != account:
289
            raise NotAllowedError
290
        node = self.node.node_lookup(account)
291
        if node is not None:
292
            raise NameError('Account already exists')
293
        if policy:
294
            self._check_policy(policy)
295
        node = self._put_path(user, self.ROOTNODE, account)
296
        self._put_policy(node, policy, True)
297
    
298
    @backend_method
299
    def delete_account(self, user, account):
300
        """Delete the account with the given name."""
301
        
302
        logger.debug("delete_account: %s", account)
303
        if user != account:
304
            raise NotAllowedError
305
        node = self.node.node_lookup(account)
306
        if node is None:
307
            return
308
        if not self.node.node_remove(node):
309
            raise IndexError('Account is not empty')
310
        self.permissions.group_destroy(account)
311
    
312
    @backend_method
313
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
314
        """Return a list of containers existing under an account."""
315
        
316
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
317
        if user != account:
318
            if until or account not in self._allowed_accounts(user):
319
                raise NotAllowedError
320
            allowed = self._allowed_containers(user, account)
321
            start, limit = self._list_limits(allowed, marker, limit)
322
            return allowed[start:start + limit]
323
        if shared:
324
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
325
            allowed = list(set(allowed))
326
            start, limit = self._list_limits(allowed, marker, limit)
327
            return allowed[start:start + limit]
328
        node = self.node.node_lookup(account)
329
        return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
330
    
331
    @backend_method
332
    def list_container_meta(self, user, account, container, domain, until=None):
333
        """Return a list with all the container's object meta keys for the domain."""
334
        
335
        logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until)
336
        allowed = []
337
        if user != account:
338
            if until:
339
                raise NotAllowedError
340
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
341
            if not allowed:
342
                raise NotAllowedError
343
        path, node = self._lookup_container(account, container)
344
        before = until if until is not None else inf
345
        allowed = self._get_formatted_paths(allowed)
346
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
347
    
348
    @backend_method
349
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
350
        """Return a dictionary with the container metadata for the domain."""
351
        
352
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
353
        if user != account:
354
            if until or container not in self._allowed_containers(user, account):
355
                raise NotAllowedError
356
        path, node = self._lookup_container(account, container)
357
        props = self._get_properties(node, until)
358
        mtime = props[self.MTIME]
359
        count, bytes, tstamp = self._get_statistics(node, until)
360
        tstamp = max(tstamp, mtime)
361
        if until is None:
362
            modified = tstamp
363
        else:
364
            modified = self._get_statistics(node)[2] # Overall last modification.
365
            modified = max(modified, mtime)
366
        
367
        if user != account:
368
            meta = {'name': container}
369
        else:
370
            meta = {}
371
            if include_user_defined:
372
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
373
            if until is not None:
374
                meta.update({'until_timestamp': tstamp})
375
            meta.update({'name': container, 'count': count, 'bytes': bytes})
376
        meta.update({'modified': modified})
377
        return meta
378
    
379
    @backend_method
380
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
381
        """Update the metadata associated with the container for the domain."""
382
        
383
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
384
        if user != account:
385
            raise NotAllowedError
386
        path, node = self._lookup_container(account, container)
387
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
388
        if src_version_id is not None:
389
            versioning = self._get_policy(node)['versioning']
390
            if versioning != 'auto':
391
                self.node.version_remove(src_version_id)
392
    
393
    @backend_method
394
    def get_container_policy(self, user, account, container):
395
        """Return a dictionary with the container policy."""
396
        
397
        logger.debug("get_container_policy: %s %s", account, container)
398
        if user != account:
399
            if container not in self._allowed_containers(user, account):
400
                raise NotAllowedError
401
            return {}
402
        path, node = self._lookup_container(account, container)
403
        return self._get_policy(node)
404
    
405
    @backend_method
406
    def update_container_policy(self, user, account, container, policy, replace=False):
407
        """Update the policy associated with the container."""
408
        
409
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
410
        if user != account:
411
            raise NotAllowedError
412
        path, node = self._lookup_container(account, container)
413
        self._check_policy(policy)
414
        self._put_policy(node, policy, replace)
415
    
416
    @backend_method
417
    def put_container(self, user, account, container, policy={}):
418
        """Create a new container with the given name."""
419
        
420
        logger.debug("put_container: %s %s %s", account, container, policy)
421
        if user != account:
422
            raise NotAllowedError
423
        try:
424
            path, node = self._lookup_container(account, container)
425
        except NameError:
426
            pass
427
        else:
428
            raise NameError('Container already exists')
429
        if policy:
430
            self._check_policy(policy)
431
        path = '/'.join((account, container))
432
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
433
        self._put_policy(node, policy, True)
434
    
435
    @backend_method
436
    def delete_container(self, user, account, container, until=None):
437
        """Delete/purge the container with the given name."""
438
        
439
        logger.debug("delete_container: %s %s %s", account, container, until)
440
        if user != account:
441
            raise NotAllowedError
442
        path, node = self._lookup_container(account, container)
443
        
444
        if until is not None:
445
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
446
            for h in hashes:
447
                self.store.map_delete(h)
448
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
449
            self._report_size_change(user, account, -size, {'action': 'container purge'})
450
            return
451
        
452
        if self._get_statistics(node)[0] > 0:
453
            raise IndexError('Container is not empty')
454
        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
455
        for h in hashes:
456
            self.store.map_delete(h)
457
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
458
        self.node.node_remove(node)
459
        self._report_size_change(user, account, -size, {'action': 'container delete'})
460
    
461
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props):
462
        if user != account and until:
463
            raise NotAllowedError
464
        allowed = self._list_object_permissions(user, account, container, prefix, shared)
465
        if shared and not allowed:
466
            return []
467
        path, node = self._lookup_container(account, container)
468
        allowed = self._get_formatted_paths(allowed)
469
        return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
470
    
471
    def _list_object_permissions(self, user, account, container, prefix, shared):
472
        allowed = []
473
        path = '/'.join((account, container, prefix)).rstrip('/')
474
        if user != account:
475
            allowed = self.permissions.access_list_paths(user, path)
476
            if not allowed:
477
                raise NotAllowedError
478
        else:
479
            if shared:
480
                allowed = self.permissions.access_list_shared(path)
481
                if not allowed:
482
                    return []
483
        return allowed
484
    
485
    @backend_method
486
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
487
        """Return a list of object (name, version_id) tuples existing under a container."""
488
        
489
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
490
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False)
491
    
492
    @backend_method
493
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
494
        """Return a list of object metadata dicts existing under a container."""
495
        
496
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range)
497
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True)
498
        objects = []
499
        for p in props:
500
            if len(p) == 2:
501
                objects.append({'subdir': p[0]})
502
            else:
503
                objects.append({'name': p[0],
504
                                'bytes': p[self.SIZE + 1],
505
                                'type': p[self.TYPE + 1],
506
                                'hash': p[self.HASH + 1],
507
                                'version': p[self.SERIAL + 1],
508
                                'version_timestamp': p[self.MTIME + 1],
509
                                'modified': p[self.MTIME + 1] if until is None else None,
510
                                'modified_by': p[self.MUSER + 1],
511
                                'uuid': p[self.UUID + 1],
512
                                'checksum': p[self.CHECKSUM + 1]})
513
        return objects
514
    
515
    @backend_method
516
    def list_object_permissions(self, user, account, container, prefix=''):
517
        """Return a list of paths that enforce permissions under a container."""
518
        
519
        logger.debug("list_object_permissions: %s %s %s", account, container, prefix)
520
        return self._list_object_permissions(user, account, container, prefix, True)
521
    
522
    @backend_method
523
    def list_object_public(self, user, account, container, prefix=''):
524
        """Return a dict mapping paths to public ids for objects that are public under a container."""
525
        
526
        logger.debug("list_object_public: %s %s %s", account, container, prefix)
527
        public = {}
528
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
529
            public[path] = p + ULTIMATE_ANSWER
530
        return public
531
    
532
    @backend_method
533
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
534
        """Return a dictionary with the object metadata for the domain."""
535
        
536
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
537
        self._can_read(user, account, container, name)
538
        path, node = self._lookup_object(account, container, name)
539
        props = self._get_version(node, version)
540
        if version is None:
541
            modified = props[self.MTIME]
542
        else:
543
            try:
544
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
545
            except NameError: # Object may be deleted.
546
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
547
                if del_props is None:
548
                    raise NameError('Object does not exist')
549
                modified = del_props[self.MTIME]
550
        
551
        meta = {}
552
        if include_user_defined:
553
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
554
        meta.update({'name': name,
555
                     'bytes': props[self.SIZE],
556
                     'type': props[self.TYPE],
557
                     'hash': props[self.HASH],
558
                     'version': props[self.SERIAL],
559
                     'version_timestamp': props[self.MTIME],
560
                     'modified': modified,
561
                     'modified_by': props[self.MUSER],
562
                     'uuid': props[self.UUID],
563
                     'checksum': props[self.CHECKSUM]})
564
        return meta
565
    
566
    @backend_method
567
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
568
        """Update the metadata associated with the object for the domain and return the new version."""
569
        
570
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
571
        self._can_write(user, account, container, name)
572
        path, node = self._lookup_object(account, container, name)
573
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
574
        self._apply_versioning(account, container, src_version_id)
575
        return dest_version_id
576
    
577
    @backend_method
578
    def get_object_permissions(self, user, account, container, name):
579
        """Return the action allowed on the object, the path
580
        from which the object gets its permissions from,
581
        along with a dictionary containing the permissions."""
582
        
583
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
584
        allowed = 'write'
585
        permissions_path = self._get_permissions_path(account, container, name)
586
        if user != account:
587
            if self.permissions.access_check(permissions_path, self.WRITE, user):
588
                allowed = 'write'
589
            elif self.permissions.access_check(permissions_path, self.READ, user):
590
                allowed = 'read'
591
            else:
592
                raise NotAllowedError
593
        self._lookup_object(account, container, name)
594
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
595
    
596
    @backend_method
597
    def update_object_permissions(self, user, account, container, name, permissions):
598
        """Update the permissions associated with the object."""
599
        
600
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
601
        if user != account:
602
            raise NotAllowedError
603
        path = self._lookup_object(account, container, name)[0]
604
        self._check_permissions(path, permissions)
605
        self.permissions.access_set(path, permissions)
606
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
607
    
608
    @backend_method
609
    def get_object_public(self, user, account, container, name):
610
        """Return the public id of the object if applicable."""
611
        
612
        logger.debug("get_object_public: %s %s %s", account, container, name)
613
        self._can_read(user, account, container, name)
614
        path = self._lookup_object(account, container, name)[0]
615
        p = self.permissions.public_get(path)
616
        if p is not None:
617
            p += ULTIMATE_ANSWER
618
        return p
619
    
620
    @backend_method
621
    def update_object_public(self, user, account, container, name, public):
622
        """Update the public status of the object."""
623
        
624
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
625
        self._can_write(user, account, container, name)
626
        path = self._lookup_object(account, container, name)[0]
627
        if not public:
628
            self.permissions.public_unset(path)
629
        else:
630
            self.permissions.public_set(path)
631
    
632
    @backend_method
633
    def get_object_hashmap(self, user, account, container, name, version=None):
634
        """Return the object's size and a list with partial hashes."""
635
        
636
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
637
        self._can_read(user, account, container, name)
638
        path, node = self._lookup_object(account, container, name)
639
        props = self._get_version(node, version)
640
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
641
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
642
    
643
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
644
        if permissions is not None and user != account:
645
            raise NotAllowedError
646
        self._can_write(user, account, container, name)
647
        if permissions is not None:
648
            path = '/'.join((account, container, name))
649
            self._check_permissions(path, permissions)
650
        
651
        account_path, account_node = self._lookup_account(account, True)
652
        container_path, container_node = self._lookup_container(account, container)
653
        path, node = self._put_object_node(container_path, container_node, name)
654
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
655
        
656
        # Handle meta.
657
        if src_version_id is None:
658
            src_version_id = pre_version_id
659
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
660
        
661
        # Check quota.
662
        del_size = self._apply_versioning(account, container, pre_version_id)
663
        size_delta = size - del_size
664
        if size_delta > 0:
665
            account_quota = long(self._get_policy(account_node)['quota'])
666
            container_quota = long(self._get_policy(container_node)['quota'])
667
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
668
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
669
                # This must be executed in a transaction, so the version is never created if it fails.
670
                raise QuotaError
671
        self._report_size_change(user, account, size_delta, {'action': 'object update'})
672
        
673
        if permissions is not None:
674
            self.permissions.access_set(path, permissions)
675
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
676
        
677
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
678
        return dest_version_id
679
    
680
    @backend_method
681
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
682
        """Create/update an object with the specified size and partial hashes."""
683
        
684
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum)
685
        if size == 0: # No such thing as an empty hashmap.
686
            hashmap = [self.put_block('')]
687
        map = HashMap(self.block_size, self.hash_algorithm)
688
        map.extend([binascii.unhexlify(x) for x in hashmap])
689
        missing = self.store.block_search(map)
690
        if missing:
691
            ie = IndexError()
692
            ie.data = [binascii.hexlify(x) for x in missing]
693
            raise ie
694
        
695
        hash = map.hash()
696
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
697
        self.store.map_put(hash, map)
698
        return dest_version_id
699
    
700
    @backend_method
701
    def update_object_checksum(self, user, account, container, name, version, checksum):
702
        """Update an object's checksum."""
703
        
704
        logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum)
705
        # Update objects with greater version and same hashmap and size (fix metadata updates).
706
        self._can_write(user, account, container, name)
707
        path, node = self._lookup_object(account, container, name)
708
        props = self._get_version(node, version)
709
        versions = self.node.node_get_versions(node)
710
        for x in versions:
711
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
712
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
713
    
714
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
715
        self._can_read(user, src_account, src_container, src_name)
716
        path, node = self._lookup_object(src_account, src_container, src_name)
717
        # TODO: Will do another fetch of the properties in duplicate version...
718
        props = self._get_version(node, src_version) # Check to see if source exists.
719
        src_version_id = props[self.SERIAL]
720
        hash = props[self.HASH]
721
        size = props[self.SIZE]
722
        
723
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
724
        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)
725
        return dest_version_id
726
    
727
    @backend_method
728
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
729
        """Copy an object's data and metadata."""
730
        
731
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version)
732
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False)
733
        return dest_version_id
734
    
735
    @backend_method
736
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None):
737
        """Move an object's data and metadata."""
738
        
739
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions)
740
        if user != src_account:
741
            raise NotAllowedError
742
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True)
743
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
744
            self._delete_object(user, src_account, src_container, src_name)
745
        return dest_version_id
746
    
747
    def _delete_object(self, user, account, container, name, until=None):
748
        if user != account:
749
            raise NotAllowedError
750
        
751
        if until is not None:
752
            path = '/'.join((account, container, name))
753
            node = self.node.node_lookup(path)
754
            if node is None:
755
                return
756
            hashes = []
757
            size = 0
758
            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
759
            hashes += h
760
            size += s
761
            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
762
            hashes += h
763
            size += s
764
            for h in hashes:
765
                self.store.map_delete(h)
766
            self.node.node_purge(node, until, CLUSTER_DELETED)
767
            try:
768
                props = self._get_version(node)
769
            except NameError:
770
                self.permissions.access_clear(path)
771
            self._report_size_change(user, account, -size, {'action': 'object purge'})
772
            return
773
        
774
        path, node = self._lookup_object(account, container, name)
775
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
776
        del_size = self._apply_versioning(account, container, src_version_id)
777
        if del_size:
778
            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
779
        self._report_object_change(user, account, path, details={'action': 'object delete'})
780
        self.permissions.access_clear(path)
781
    
782
    @backend_method
783
    def delete_object(self, user, account, container, name, until=None):
784
        """Delete/purge an object."""
785
        
786
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
787
        self._delete_object(user, account, container, name, until)
788
    
789
    @backend_method
790
    def list_versions(self, user, account, container, name):
791
        """Return a list of all (version, version_timestamp) tuples for an object."""
792
        
793
        logger.debug("list_versions: %s %s %s", account, container, name)
794
        self._can_read(user, account, container, name)
795
        path, node = self._lookup_object(account, container, name)
796
        versions = self.node.node_get_versions(node)
797
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
798
    
799
    @backend_method
800
    def get_uuid(self, user, uuid):
801
        """Return the (account, container, name) for the UUID given."""
802
        
803
        logger.debug("get_uuid: %s", uuid)
804
        info = self.node.latest_uuid(uuid)
805
        if info is None:
806
            raise NameError
807
        path, serial = info
808
        account, container, name = path.split('/', 2)
809
        self._can_read(user, account, container, name)
810
        return (account, container, name)
811
    
812
    @backend_method
813
    def get_public(self, user, public):
814
        """Return the (account, container, name) for the public id given."""
815
        
816
        logger.debug("get_public: %s", public)
817
        if public is None or public < ULTIMATE_ANSWER:
818
            raise NameError
819
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
820
        if path is None:
821
            raise NameError
822
        account, container, name = path.split('/', 2)
823
        self._can_read(user, account, container, name)
824
        return (account, container, name)
825
    
826
    @backend_method(autocommit=0)
827
    def get_block(self, hash):
828
        """Return a block's data."""
829
        
830
        logger.debug("get_block: %s", hash)
831
        block = self.store.block_get(binascii.unhexlify(hash))
832
        if not block:
833
            raise NameError('Block does not exist')
834
        return block
835
    
836
    @backend_method(autocommit=0)
837
    def put_block(self, data):
838
        """Store a block and return the hash."""
839
        
840
        logger.debug("put_block: %s", len(data))
841
        return binascii.hexlify(self.store.block_put(data))
842
    
843
    @backend_method(autocommit=0)
844
    def update_block(self, hash, data, offset=0):
845
        """Update a known block and return the hash."""
846
        
847
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
848
        if offset == 0 and len(data) == self.block_size:
849
            return self.put_block(data)
850
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
851
        return binascii.hexlify(h)
852
    
853
    # Path functions.
854
    
855
    def _generate_uuid(self):
856
        return str(uuidlib.uuid4())
857
    
858
    def _put_object_node(self, path, parent, name):
859
        path = '/'.join((path, name))
860
        node = self.node.node_lookup(path)
861
        if node is None:
862
            node = self.node.node_create(parent, path)
863
        return path, node
864
    
865
    def _put_path(self, user, parent, path):
866
        node = self.node.node_create(parent, path)
867
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
868
        return node
869
    
870
    def _lookup_account(self, account, create=True):
871
        node = self.node.node_lookup(account)
872
        if node is None and create:
873
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
874
        return account, node
875
    
876
    def _lookup_container(self, account, container):
877
        path = '/'.join((account, container))
878
        node = self.node.node_lookup(path)
879
        if node is None:
880
            raise NameError('Container does not exist')
881
        return path, node
882
    
883
    def _lookup_object(self, account, container, name):
884
        path = '/'.join((account, container, name))
885
        node = self.node.node_lookup(path)
886
        if node is None:
887
            raise NameError('Object does not exist')
888
        return path, node
889
    
890
    def _get_properties(self, node, until=None):
891
        """Return properties until the timestamp given."""
892
        
893
        before = until if until is not None else inf
894
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
895
        if props is None and until is not None:
896
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
897
        if props is None:
898
            raise NameError('Path does not exist')
899
        return props
900
    
901
    def _get_statistics(self, node, until=None):
902
        """Return count, sum of size and latest timestamp of everything under node."""
903
        
904
        if until is None:
905
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
906
        else:
907
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
908
        if stats is None:
909
            stats = (0, 0, 0)
910
        return stats
911
    
912
    def _get_version(self, node, version=None):
913
        if version is None:
914
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
915
            if props is None:
916
                raise NameError('Object does not exist')
917
        else:
918
            try:
919
                version = int(version)
920
            except ValueError:
921
                raise IndexError('Version does not exist')
922
            props = self.node.version_get_properties(version)
923
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
924
                raise IndexError('Version does not exist')
925
        return props
926
    
927
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
928
        """Create a new version of the node."""
929
        
930
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
931
        if props is not None:
932
            src_version_id = props[self.SERIAL]
933
            src_hash = props[self.HASH]
934
            src_size = props[self.SIZE]
935
            src_type = props[self.TYPE]
936
            src_checksum = props[self.CHECKSUM]
937
        else:
938
            src_version_id = None
939
            src_hash = None
940
            src_size = 0
941
            src_type = ''
942
            src_checksum = ''
943
        if size is None: # Set metadata.
944
            hash = src_hash # This way hash can be set to None (account or container).
945
            size = src_size
946
        if type is None:
947
            type = src_type
948
        if checksum is None:
949
            checksum = src_checksum
950
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
951
        
952
        if src_node is None:
953
            pre_version_id = src_version_id
954
        else:
955
            pre_version_id = None
956
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
957
            if props is not None:
958
                pre_version_id = props[self.SERIAL]
959
        if pre_version_id is not None:
960
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
961
        
962
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
963
        return pre_version_id, dest_version_id
964
    
965
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
966
        if src_version_id is not None:
967
            self.node.attribute_copy(src_version_id, dest_version_id)
968
        if not replace:
969
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
970
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
971
        else:
972
            self.node.attribute_del(dest_version_id, domain)
973
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
974
    
975
    def _put_metadata(self, user, node, domain, meta, replace=False):
976
        """Create a new version and store metadata."""
977
        
978
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
979
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
980
        return src_version_id, dest_version_id
981
    
982
    def _list_limits(self, listing, marker, limit):
983
        start = 0
984
        if marker:
985
            try:
986
                start = listing.index(marker) + 1
987
            except ValueError:
988
                pass
989
        if not limit or limit > 10000:
990
            limit = 10000
991
        return start, limit
992
    
993
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
994
        cont_prefix = path + '/'
995
        prefix = cont_prefix + prefix
996
        start = cont_prefix + marker if marker else None
997
        before = until if until is not None else inf
998
        filterq = keys if domain else []
999
        sizeq = size_range
1000
        
1001
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1002
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1003
        objects.sort(key=lambda x: x[0])
1004
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1005
        
1006
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
1007
        return objects[start:start + limit]
1008
    
1009
    # Reporting functions.
1010
    
1011
    def _report_size_change(self, user, account, size, details={}):
1012
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1013
        account_node = self._lookup_account(account, True)[1]
1014
        total = self._get_statistics(account_node)[1]
1015
        details.update({'user': user, 'total': total})
1016
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1017
    
1018
    def _report_object_change(self, user, account, path, details={}):
1019
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1020
        details.update({'user': user})
1021
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1022
    
1023
    def _report_sharing_change(self, user, account, path, details={}):
1024
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1025
        details.update({'user': user})
1026
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1027
    
1028
    # Policy functions.
1029
    
1030
    def _check_policy(self, policy):
1031
        for k in policy.keys():
1032
            if policy[k] == '':
1033
                policy[k] = self.default_policy.get(k)
1034
        for k, v in policy.iteritems():
1035
            if k == 'quota':
1036
                q = int(v) # May raise ValueError.
1037
                if q < 0:
1038
                    raise ValueError
1039
            elif k == 'versioning':
1040
                if v not in ['auto', 'none']:
1041
                    raise ValueError
1042
            else:
1043
                raise ValueError
1044
    
1045
    def _put_policy(self, node, policy, replace):
1046
        if replace:
1047
            for k, v in self.default_policy.iteritems():
1048
                if k not in policy:
1049
                    policy[k] = v
1050
        self.node.policy_set(node, policy)
1051
    
1052
    def _get_policy(self, node):
1053
        policy = self.default_policy.copy()
1054
        policy.update(self.node.policy_get(node))
1055
        return policy
1056
    
1057
    def _apply_versioning(self, account, container, version_id):
1058
        """Delete the provided version if such is the policy.
1059
           Return size of object removed.
1060
        """
1061
        
1062
        if version_id is None:
1063
            return 0
1064
        path, node = self._lookup_container(account, container)
1065
        versioning = self._get_policy(node)['versioning']
1066
        if versioning != 'auto':
1067
            hash, size = self.node.version_remove(version_id)
1068
            self.store.map_delete(hash)
1069
            return size
1070
        return 0
1071
    
1072
    # Access control functions.
1073
    
1074
    def _check_groups(self, groups):
1075
        # raise ValueError('Bad characters in groups')
1076
        pass
1077
    
1078
    def _check_permissions(self, path, permissions):
1079
        # raise ValueError('Bad characters in permissions')
1080
        pass
1081
    
1082
    def _get_formatted_paths(self, paths):
1083
        formatted = []
1084
        for p in paths:
1085
            node = self.node.node_lookup(p)
1086
            if node is not None:
1087
                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1088
            if props is not None:
1089
                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1090
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1091
                formatted.append((p, self.MATCH_EXACT))
1092
        return formatted
1093
    
1094
    def _get_permissions_path(self, account, container, name):
1095
        path = '/'.join((account, container, name))
1096
        permission_paths = self.permissions.access_inherit(path)
1097
        permission_paths.sort()
1098
        permission_paths.reverse()
1099
        for p in permission_paths:
1100
            if p == path:
1101
                return p
1102
            else:
1103
                if p.count('/') < 2:
1104
                    continue
1105
                node = self.node.node_lookup(p)
1106
                if node is not None:
1107
                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1108
                if props is not None:
1109
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1110
                        return p
1111
        return None
1112
    
1113
    def _can_read(self, user, account, container, name):
1114
        if user == account:
1115
            return True
1116
        path = '/'.join((account, container, name))
1117
        if self.permissions.public_get(path) is not None:
1118
            return True
1119
        path = self._get_permissions_path(account, container, name)
1120
        if not path:
1121
            raise NotAllowedError
1122
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1123
            raise NotAllowedError
1124
    
1125
    def _can_write(self, user, account, container, name):
1126
        if user == account:
1127
            return True
1128
        path = '/'.join((account, container, name))
1129
        path = self._get_permissions_path(account, container, name)
1130
        if not path:
1131
            raise NotAllowedError
1132
        if not self.permissions.access_check(path, self.WRITE, user):
1133
            raise NotAllowedError
1134
    
1135
    def _allowed_accounts(self, user):
1136
        allow = set()
1137
        for path in self.permissions.access_list_paths(user):
1138
            allow.add(path.split('/', 1)[0])
1139
        return sorted(allow)
1140
    
1141
    def _allowed_containers(self, user, account):
1142
        allow = set()
1143
        for path in self.permissions.access_list_paths(user, account):
1144
            allow.add(path.split('/', 2)[1])
1145
        return sorted(allow)