Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ 2e662088

History | View | Annotate | Download (39.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import binascii
40

    
41
from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
42

    
43
from pithos.lib.hashmap import HashMap
44

    
45
# Default modules and settings.
46
DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
47
DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
48
DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
49
DEFAULT_BLOCK_PATH = 'data/'
50

    
51
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
52

    
53
inf = float('inf')
54

    
55
ULTIMATE_ANSWER = 42
56

    
57

    
58
logger = logging.getLogger(__name__)
59

    
60

    
61
def backend_method(func=None, autocommit=1):
62
    if func is None:
63
        def fn(func):
64
            return backend_method(func, autocommit)
65
        return fn
66

    
67
    if not autocommit:
68
        return func
69
    def fn(self, *args, **kw):
70
        self.wrapper.execute()
71
        try:
72
            ret = func(self, *args, **kw)
73
            self.wrapper.commit()
74
            return ret
75
        except:
76
            self.wrapper.rollback()
77
            raise
78
    return fn
79

    
80

    
81
class ModularBackend(BaseBackend):
82
    """A modular backend.
83
    
84
    Uses modules for SQL functions and storage.
85
    """
86
    
87
    def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None):
88
        db_module = db_module or DEFAULT_DB_MODULE
89
        db_connection = db_connection or DEFAULT_DB_CONNECTION
90
        block_module = block_module or DEFAULT_BLOCK_MODULE
91
        block_path = block_path or DEFAULT_BLOCK_PATH
92
        
93
        self.hash_algorithm = 'sha256'
94
        self.block_size = 4 * 1024 * 1024 # 4MB
95
        
96
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
97
        
98
        __import__(db_module)
99
        self.db_module = sys.modules[db_module]
100
        self.wrapper = self.db_module.DBWrapper(db_connection)
101
        
102
        params = {'wrapper': self.wrapper}
103
        self.permissions = self.db_module.Permissions(**params)
104
        for x in ['READ', 'WRITE']:
105
            setattr(self, x, getattr(self.db_module, x))
106
        self.node = self.db_module.Node(**params)
107
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
108
            setattr(self, x, getattr(self.db_module, x))
109
        
110
        __import__(block_module)
111
        self.block_module = sys.modules[block_module]
112
        
113
        params = {'path': block_path,
114
                  'block_size': self.block_size,
115
                  'hash_algorithm': self.hash_algorithm}
116
        self.store = self.block_module.Store(**params)
117
    
118
    def close(self):
119
        self.wrapper.close()
120
    
121
    @backend_method
122
    def list_accounts(self, user, marker=None, limit=10000):
123
        """Return a list of accounts the user can access."""
124
        
125
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
126
        allowed = self._allowed_accounts(user)
127
        start, limit = self._list_limits(allowed, marker, limit)
128
        return allowed[start:start + limit]
129
    
130
    @backend_method
131
    def get_account_meta(self, user, account, domain, until=None):
132
        """Return a dictionary with the account metadata for the domain."""
133
        
134
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
135
        path, node = self._lookup_account(account, user == account)
136
        if user != account:
137
            if until or node is None or account not in self._allowed_accounts(user):
138
                raise NotAllowedError
139
        try:
140
            props = self._get_properties(node, until)
141
            mtime = props[self.MTIME]
142
        except NameError:
143
            props = None
144
            mtime = until
145
        count, bytes, tstamp = self._get_statistics(node, until)
146
        tstamp = max(tstamp, mtime)
147
        if until is None:
148
            modified = tstamp
149
        else:
150
            modified = self._get_statistics(node)[2] # Overall last modification.
151
            modified = max(modified, mtime)
152
        
153
        if user != account:
154
            meta = {'name': account}
155
        else:
156
            meta = {}
157
            if props is not None:
158
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
159
            if until is not None:
160
                meta.update({'until_timestamp': tstamp})
161
            meta.update({'name': account, 'count': count, 'bytes': bytes})
162
        meta.update({'modified': modified})
163
        return meta
164
    
165
    @backend_method
166
    def update_account_meta(self, user, account, domain, meta, replace=False):
167
        """Update the metadata associated with the account for the domain."""
168
        
169
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
170
        if user != account:
171
            raise NotAllowedError
172
        path, node = self._lookup_account(account, True)
173
        self._put_metadata(user, node, domain, meta, replace)
174
    
175
    @backend_method
176
    def get_account_groups(self, user, account):
177
        """Return a dictionary with the user groups defined for this account."""
178
        
179
        logger.debug("get_account_groups: %s", account)
180
        if user != account:
181
            if account not in self._allowed_accounts(user):
182
                raise NotAllowedError
183
            return {}
184
        self._lookup_account(account, True)
185
        return self.permissions.group_dict(account)
186
    
187
    @backend_method
188
    def update_account_groups(self, user, account, groups, replace=False):
189
        """Update the groups associated with the account."""
190
        
191
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
192
        if user != account:
193
            raise NotAllowedError
194
        self._lookup_account(account, True)
195
        self._check_groups(groups)
196
        if replace:
197
            self.permissions.group_destroy(account)
198
        for k, v in groups.iteritems():
199
            if not replace: # If not already deleted.
200
                self.permissions.group_delete(account, k)
201
            if v:
202
                self.permissions.group_addmany(account, k, v)
203
    
204
    @backend_method
205
    def get_account_policy(self, user, account):
206
        """Return a dictionary with the account policy."""
207
        
208
        logger.debug("get_account_policy: %s", account)
209
        if user != account:
210
            if account not in self._allowed_accounts(user):
211
                raise NotAllowedError
212
            return {}
213
        path, node = self._lookup_account(account, True)
214
        return self._get_policy(node)
215
    
216
    @backend_method
217
    def update_account_policy(self, user, account, policy, replace=False):
218
        """Update the policy associated with the account."""
219
        
220
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
221
        if user != account:
222
            raise NotAllowedError
223
        path, node = self._lookup_account(account, True)
224
        self._check_policy(policy)
225
        self._put_policy(node, policy, replace)
226
    
227
    @backend_method
228
    def put_account(self, user, account, policy={}):
229
        """Create a new account with the given name."""
230
        
231
        logger.debug("put_account: %s %s", account, policy)
232
        if user != account:
233
            raise NotAllowedError
234
        node = self.node.node_lookup(account)
235
        if node is not None:
236
            raise NameError('Account already exists')
237
        if policy:
238
            self._check_policy(policy)
239
        node = self._put_path(user, self.ROOTNODE, account)
240
        self._put_policy(node, policy, True)
241
    
242
    @backend_method
243
    def delete_account(self, user, account):
244
        """Delete the account with the given name."""
245
        
246
        logger.debug("delete_account: %s", account)
247
        if user != account:
248
            raise NotAllowedError
249
        node = self.node.node_lookup(account)
250
        if node is None:
251
            return
252
        if not self.node.node_remove(node):
253
            raise IndexError('Account is not empty')
254
        self.permissions.group_destroy(account)
255
    
256
    @backend_method
257
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
258
        """Return a list of containers existing under an account."""
259
        
260
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
261
        if user != account:
262
            if until or account not in self._allowed_accounts(user):
263
                raise NotAllowedError
264
            allowed = self._allowed_containers(user, account)
265
            start, limit = self._list_limits(allowed, marker, limit)
266
            return allowed[start:start + limit]
267
        if shared:
268
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
269
            allowed = list(set(allowed))
270
            start, limit = self._list_limits(allowed, marker, limit)
271
            return allowed[start:start + limit]
272
        node = self.node.node_lookup(account)
273
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
274
    
275
    @backend_method
276
    def get_container_meta(self, user, account, container, domain, until=None):
277
        """Return a dictionary with the container metadata for the domain."""
278
        
279
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
280
        if user != account:
281
            if until or container not in self._allowed_containers(user, account):
282
                raise NotAllowedError
283
        path, node = self._lookup_container(account, container)
284
        props = self._get_properties(node, until)
285
        mtime = props[self.MTIME]
286
        count, bytes, tstamp = self._get_statistics(node, until)
287
        tstamp = max(tstamp, mtime)
288
        if until is None:
289
            modified = tstamp
290
        else:
291
            modified = self._get_statistics(node)[2] # Overall last modification.
292
            modified = max(modified, mtime)
293
        
294
        if user != account:
295
            meta = {'name': container}
296
        else:
297
            meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
298
            if until is not None:
299
                meta.update({'until_timestamp': tstamp})
300
            meta.update({'name': container, 'count': count, 'bytes': bytes})
301
        meta.update({'modified': modified})
302
        return meta
303
    
304
    @backend_method
305
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
306
        """Update the metadata associated with the container for the domain."""
307
        
308
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
309
        if user != account:
310
            raise NotAllowedError
311
        path, node = self._lookup_container(account, container)
312
        self._put_metadata(user, node, domain, meta, replace)
313
    
314
    @backend_method
315
    def get_container_policy(self, user, account, container):
316
        """Return a dictionary with the container policy."""
317
        
318
        logger.debug("get_container_policy: %s %s", account, container)
319
        if user != account:
320
            if container not in self._allowed_containers(user, account):
321
                raise NotAllowedError
322
            return {}
323
        path, node = self._lookup_container(account, container)
324
        return self._get_policy(node)
325
    
326
    @backend_method
327
    def update_container_policy(self, user, account, container, policy, replace=False):
328
        """Update the policy associated with the container."""
329
        
330
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
331
        if user != account:
332
            raise NotAllowedError
333
        path, node = self._lookup_container(account, container)
334
        self._check_policy(policy)
335
        self._put_policy(node, policy, replace)
336
    
337
    @backend_method
338
    def put_container(self, user, account, container, policy={}):
339
        """Create a new container with the given name."""
340
        
341
        logger.debug("put_container: %s %s %s", account, container, policy)
342
        if user != account:
343
            raise NotAllowedError
344
        try:
345
            path, node = self._lookup_container(account, container)
346
        except NameError:
347
            pass
348
        else:
349
            raise NameError('Container already exists')
350
        if policy:
351
            self._check_policy(policy)
352
        path = '/'.join((account, container))
353
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
354
        self._put_policy(node, policy, True)
355
    
356
    @backend_method
357
    def delete_container(self, user, account, container, until=None):
358
        """Delete/purge the container with the given name."""
359
        
360
        logger.debug("delete_container: %s %s %s", account, container, until)
361
        if user != account:
362
            raise NotAllowedError
363
        path, node = self._lookup_container(account, container)
364
        
365
        if until is not None:
366
            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
367
            for h in hashes:
368
                self.store.map_delete(h)
369
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
370
            return
371
        
372
        if self._get_statistics(node)[0] > 0:
373
            raise IndexError('Container is not empty')
374
        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
375
        for h in hashes:
376
            self.store.map_delete(h)
377
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
378
        self.node.node_remove(node)
379
    
380
    @backend_method
381
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
382
        """Return a list of objects existing under a container."""
383
        
384
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
385
        allowed = []
386
        if user != account:
387
            if until:
388
                raise NotAllowedError
389
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
390
            if not allowed:
391
                raise NotAllowedError
392
        else:
393
            if shared:
394
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
395
                if not allowed:
396
                    return []
397
        path, node = self._lookup_container(account, container)
398
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
399
    
400
    @backend_method
401
    def list_object_meta(self, user, account, container, domain, until=None):
402
        """Return a list with all the container's object meta keys for the domain."""
403
        
404
        logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
405
        allowed = []
406
        if user != account:
407
            if until:
408
                raise NotAllowedError
409
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
410
            if not allowed:
411
                raise NotAllowedError
412
        path, node = self._lookup_container(account, container)
413
        before = until if until is not None else inf
414
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
415
    
416
    @backend_method
417
    def get_object_meta(self, user, account, container, name, domain, version=None):
418
        """Return a dictionary with the object metadata for the domain."""
419
        
420
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
421
        self._can_read(user, account, container, name)
422
        path, node = self._lookup_object(account, container, name)
423
        props = self._get_version(node, version)
424
        if version is None:
425
            modified = props[self.MTIME]
426
        else:
427
            try:
428
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
429
            except NameError: # Object may be deleted.
430
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
431
                if del_props is None:
432
                    raise NameError('Object does not exist')
433
                modified = del_props[self.MTIME]
434
        
435
        meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
436
        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
437
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
438
        meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
439
        return meta
440
    
441
    @backend_method
442
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
443
        """Update the metadata associated with the object for the domain and return the new version."""
444
        
445
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
446
        self._can_write(user, account, container, name)
447
        path, node = self._lookup_object(account, container, name)
448
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
449
        self._apply_versioning(account, container, src_version_id)
450
        return dest_version_id
451
    
452
    @backend_method
453
    def get_object_permissions(self, user, account, container, name):
454
        """Return the action allowed on the object, the path
455
        from which the object gets its permissions from,
456
        along with a dictionary containing the permissions."""
457
        
458
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
459
        allowed = 'write'
460
        if user != account:
461
            path = '/'.join((account, container, name))
462
            if self.permissions.access_check(path, self.WRITE, user):
463
                allowed = 'write'
464
            elif self.permissions.access_check(path, self.READ, user):
465
                allowed = 'read'
466
            else:
467
                raise NotAllowedError
468
        path = self._lookup_object(account, container, name)[0]
469
        return (allowed,) + self.permissions.access_inherit(path)
470
    
471
    @backend_method
472
    def update_object_permissions(self, user, account, container, name, permissions):
473
        """Update the permissions associated with the object."""
474
        
475
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
476
        if user != account:
477
            raise NotAllowedError
478
        path = self._lookup_object(account, container, name)[0]
479
        self._check_permissions(path, permissions)
480
        self.permissions.access_set(path, permissions)
481
    
482
    @backend_method
483
    def get_object_public(self, user, account, container, name):
484
        """Return the public id of the object if applicable."""
485
        
486
        logger.debug("get_object_public: %s %s %s", account, container, name)
487
        self._can_read(user, account, container, name)
488
        path = self._lookup_object(account, container, name)[0]
489
        p = self.permissions.public_get(path)
490
        if p is not None:
491
            p += ULTIMATE_ANSWER
492
        return p
493
    
494
    @backend_method
495
    def update_object_public(self, user, account, container, name, public):
496
        """Update the public status of the object."""
497
        
498
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
499
        self._can_write(user, account, container, name)
500
        path = self._lookup_object(account, container, name)[0]
501
        if not public:
502
            self.permissions.public_unset(path)
503
        else:
504
            self.permissions.public_set(path)
505
    
506
    @backend_method
507
    def get_object_hashmap(self, user, account, container, name, version=None):
508
        """Return the object's size and a list with partial hashes."""
509
        
510
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
511
        self._can_read(user, account, container, name)
512
        path, node = self._lookup_object(account, container, name)
513
        props = self._get_version(node, version)
514
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
515
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
516
    
517
    def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
518
        if permissions is not None and user != account:
519
            raise NotAllowedError
520
        self._can_write(user, account, container, name)
521
        if permissions is not None:
522
            path = '/'.join((account, container, name))
523
            self._check_permissions(path, permissions)
524
        
525
        account_path, account_node = self._lookup_account(account, True)
526
        container_path, container_node = self._lookup_container(account, container)
527
        path, node = self._put_object_node(container_path, container_node, name)
528
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
529
        
530
        # Check quota.
531
        versioning = self._get_policy(container_node)['versioning']
532
        if versioning != 'auto':
533
            size_delta = size - 0 # TODO: Get previous size.
534
        else:
535
            size_delta = size
536
        if size_delta > 0:
537
            account_quota = long(self._get_policy(account_node)['quota'])
538
            container_quota = long(self._get_policy(container_node)['quota'])
539
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
540
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
541
                # This must be executed in a transaction, so the version is never created if it fails.
542
                raise QuotaError
543
        
544
        if permissions is not None:
545
            self.permissions.access_set(path, permissions)
546
        self._apply_versioning(account, container, pre_version_id)
547
        return pre_version_id, dest_version_id
548
    
549
    @backend_method
550
    def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
551
        """Create/update an object with the specified size and partial hashes."""
552
        
553
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
554
        if size == 0: # No such thing as an empty hashmap.
555
            hashmap = [self.put_block('')]
556
        map = HashMap(self.block_size, self.hash_algorithm)
557
        map.extend([binascii.unhexlify(x) for x in hashmap])
558
        missing = self.store.block_search(map)
559
        if missing:
560
            ie = IndexError()
561
            ie.data = [binascii.hexlify(x) for x in missing]
562
            raise ie
563
        
564
        hash = map.hash()
565
        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
566
        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
567
        self.store.map_put(hash, map)
568
        return dest_version_id
569
    
570
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
571
        self._can_read(user, src_account, src_container, src_name)
572
        path, node = self._lookup_object(src_account, src_container, src_name)
573
        # TODO: Will do another fetch of the properties in duplicate version...
574
        props = self._get_version(node, src_version) # Check to see if source exists.
575
        src_version_id = props[self.SERIAL]
576
        hash = props[self.HASH]
577
        size = props[self.SIZE]
578
        
579
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
580
        pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
581
        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
582
        return dest_version_id
583
    
584
    @backend_method
585
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
586
        """Copy an object's data and metadata."""
587
        
588
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
589
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
590
    
591
    @backend_method
592
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
593
        """Move an object's data and metadata."""
594
        
595
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
596
        if user != src_account:
597
            raise NotAllowedError
598
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
599
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
600
            self._delete_object(user, src_account, src_container, src_name)
601
        return dest_version_id
602
    
603
    def _delete_object(self, user, account, container, name, until=None):
604
        if user != account:
605
            raise NotAllowedError
606
        
607
        if until is not None:
608
            path = '/'.join((account, container, name))
609
            node = self.node.node_lookup(path)
610
            if node is None:
611
                return
612
            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
613
            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
614
            for h in hashes:
615
                self.store.map_delete(h)
616
            self.node.node_purge(node, until, CLUSTER_DELETED)
617
            try:
618
                props = self._get_version(node)
619
            except NameError:
620
                self.permissions.access_clear(path)
621
            return
622
        
623
        path, node = self._lookup_object(account, container, name)
624
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
625
        self._apply_versioning(account, container, src_version_id)
626
        self.permissions.access_clear(path)
627
    
628
    @backend_method
629
    def delete_object(self, user, account, container, name, until=None):
630
        """Delete/purge an object."""
631
        
632
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
633
        self._delete_object(user, account, container, name, until)
634
    
635
    @backend_method
636
    def list_versions(self, user, account, container, name):
637
        """Return a list of all (version, version_timestamp) tuples for an object."""
638
        
639
        logger.debug("list_versions: %s %s %s", account, container, name)
640
        self._can_read(user, account, container, name)
641
        path, node = self._lookup_object(account, container, name)
642
        versions = self.node.node_get_versions(node)
643
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
644
    
645
    @backend_method
646
    def get_uuid(self, user, uuid):
647
        """Return the (account, container, name) for the UUID given."""
648
        
649
        logger.debug("get_uuid: %s", uuid)
650
        info = self.node.latest_uuid(uuid)
651
        if info is None:
652
            raise NameError
653
        path, serial = info
654
        account, container, name = path.split('/', 2)
655
        self._can_read(user, account, container, name)
656
        return (account, container, name)
657
    
658
    @backend_method
659
    def get_public(self, user, public):
660
        """Return the (account, container, name) for the public id given."""
661
        
662
        logger.debug("get_public: %s", public)
663
        if public is None or public < ULTIMATE_ANSWER:
664
            raise NameError
665
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
666
        if path is None:
667
            raise NameError
668
        account, container, name = path.split('/', 2)
669
        self._can_read(user, account, container, name)
670
        return (account, container, name)
671
    
672
    @backend_method(autocommit=0)
673
    def get_block(self, hash):
674
        """Return a block's data."""
675
        
676
        logger.debug("get_block: %s", hash)
677
        block = self.store.block_get(binascii.unhexlify(hash))
678
        if not block:
679
            raise NameError('Block does not exist')
680
        return block
681
    
682
    @backend_method(autocommit=0)
683
    def put_block(self, data):
684
        """Store a block and return the hash."""
685
        
686
        logger.debug("put_block: %s", len(data))
687
        return binascii.hexlify(self.store.block_put(data))
688
    
689
    @backend_method(autocommit=0)
690
    def update_block(self, hash, data, offset=0):
691
        """Update a known block and return the hash."""
692
        
693
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
694
        if offset == 0 and len(data) == self.block_size:
695
            return self.put_block(data)
696
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
697
        return binascii.hexlify(h)
698
    
699
    # Path functions.
700
    
701
    def _generate_uuid(self):
702
        return str(uuidlib.uuid4())
703
    
704
    def _put_object_node(self, path, parent, name):
705
        path = '/'.join((path, name))
706
        node = self.node.node_lookup(path)
707
        if node is None:
708
            node = self.node.node_create(parent, path)
709
        return path, node
710
    
711
    def _put_path(self, user, parent, path):
712
        node = self.node.node_create(parent, path)
713
        self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
714
        return node
715
    
716
    def _lookup_account(self, account, create=True):
717
        node = self.node.node_lookup(account)
718
        if node is None and create:
719
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
720
        return account, node
721
    
722
    def _lookup_container(self, account, container):
723
        path = '/'.join((account, container))
724
        node = self.node.node_lookup(path)
725
        if node is None:
726
            raise NameError('Container does not exist')
727
        return path, node
728
    
729
    def _lookup_object(self, account, container, name):
730
        path = '/'.join((account, container, name))
731
        node = self.node.node_lookup(path)
732
        if node is None:
733
            raise NameError('Object does not exist')
734
        return path, node
735
    
736
    def _get_properties(self, node, until=None):
737
        """Return properties until the timestamp given."""
738
        
739
        before = until if until is not None else inf
740
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
741
        if props is None and until is not None:
742
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
743
        if props is None:
744
            raise NameError('Path does not exist')
745
        return props
746
    
747
    def _get_statistics(self, node, until=None):
748
        """Return count, sum of size and latest timestamp of everything under node."""
749
        
750
        if until is None:
751
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
752
        else:
753
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
754
        if stats is None:
755
            stats = (0, 0, 0)
756
        return stats
757
    
758
    def _get_version(self, node, version=None):
759
        if version is None:
760
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
761
            if props is None:
762
                raise NameError('Object does not exist')
763
        else:
764
            try:
765
                version = int(version)
766
            except ValueError:
767
                raise IndexError('Version does not exist')
768
            props = self.node.version_get_properties(version)
769
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
770
                raise IndexError('Version does not exist')
771
        return props
772
    
773
    def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
774
        """Create a new version of the node."""
775
        
776
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
777
        if props is not None:
778
            src_version_id = props[self.SERIAL]
779
            src_hash = props[self.HASH]
780
            src_size = props[self.SIZE]
781
        else:
782
            src_version_id = None
783
            src_hash = None
784
            src_size = 0
785
        if size is None:
786
            hash = src_hash # This way hash can be set to None.
787
            size = src_size
788
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
789
        
790
        if src_node is None:
791
            pre_version_id = src_version_id
792
        else:
793
            pre_version_id = None
794
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
795
            if props is not None:
796
                pre_version_id = props[self.SERIAL]
797
        if pre_version_id is not None:
798
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
799
        
800
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
801
        return pre_version_id, dest_version_id
802
    
803
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
804
        if src_version_id is not None:
805
            self.node.attribute_copy(src_version_id, dest_version_id)
806
        if not replace:
807
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
808
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
809
        else:
810
            self.node.attribute_del(dest_version_id, domain)
811
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
812
    
813
    def _put_metadata(self, user, node, domain, meta, replace=False):
814
        """Create a new version and store metadata."""
815
        
816
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
817
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
818
        return src_version_id, dest_version_id
819
    
820
    def _list_limits(self, listing, marker, limit):
821
        start = 0
822
        if marker:
823
            try:
824
                start = listing.index(marker) + 1
825
            except ValueError:
826
                pass
827
        if not limit or limit > 10000:
828
            limit = 10000
829
        return start, limit
830
    
831
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
832
        cont_prefix = path + '/'
833
        prefix = cont_prefix + prefix
834
        start = cont_prefix + marker if marker else None
835
        before = until if until is not None else inf
836
        filterq = keys if domain else []
837
        sizeq = size_range
838
        
839
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
840
        objects.extend([(p, None) for p in prefixes] if virtual else [])
841
        objects.sort(key=lambda x: x[0])
842
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
843
        
844
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
845
        return objects[start:start + limit]
846
    
847
    # Policy functions.
848
    
849
    def _check_policy(self, policy):
850
        for k in policy.keys():
851
            if policy[k] == '':
852
                policy[k] = self.default_policy.get(k)
853
        for k, v in policy.iteritems():
854
            if k == 'quota':
855
                q = int(v) # May raise ValueError.
856
                if q < 0:
857
                    raise ValueError
858
            elif k == 'versioning':
859
                if v not in ['auto', 'none']:
860
                    raise ValueError
861
            else:
862
                raise ValueError
863
    
864
    def _put_policy(self, node, policy, replace):
865
        if replace:
866
            for k, v in self.default_policy.iteritems():
867
                if k not in policy:
868
                    policy[k] = v
869
        self.node.policy_set(node, policy)
870
    
871
    def _get_policy(self, node):
872
        policy = self.default_policy.copy()
873
        policy.update(self.node.policy_get(node))
874
        return policy
875
    
876
    def _apply_versioning(self, account, container, version_id):
877
        if version_id is None:
878
            return
879
        path, node = self._lookup_container(account, container)
880
        versioning = self._get_policy(node)['versioning']
881
        if versioning != 'auto':
882
            hash = self.node.version_remove(version_id)
883
            self.store.map_delete(hash)
884
    
885
    # Access control functions.
886
    
887
    def _check_groups(self, groups):
888
        # raise ValueError('Bad characters in groups')
889
        pass
890
    
891
    def _check_permissions(self, path, permissions):
892
        # raise ValueError('Bad characters in permissions')
893
        
894
        # Check for existing permissions.
895
        paths = self.permissions.access_list(path)
896
        if paths:
897
            ae = AttributeError()
898
            ae.data = paths
899
            raise ae
900
    
901
    def _can_read(self, user, account, container, name):
902
        if user == account:
903
            return True
904
        path = '/'.join((account, container, name))
905
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
906
            raise NotAllowedError
907
    
908
    def _can_write(self, user, account, container, name):
909
        if user == account:
910
            return True
911
        path = '/'.join((account, container, name))
912
        if not self.permissions.access_check(path, self.WRITE, user):
913
            raise NotAllowedError
914
    
915
    def _allowed_accounts(self, user):
916
        allow = set()
917
        for path in self.permissions.access_list_paths(user):
918
            allow.add(path.split('/', 1)[0])
919
        return sorted(allow)
920
    
921
    def _allowed_containers(self, user, account):
922
        allow = set()
923
        for path in self.permissions.access_list_paths(user, account):
924
            allow.add(path.split('/', 2)[1])
925
        return sorted(allow)