Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ f897bea9

History | View | Annotate | Download (38.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import uuid as uuidlib
38
import logging
39
import binascii
40

    
41
from base import NotAllowedError, QuotaError, BaseBackend
42

    
43
from pithos.lib.hashmap import HashMap
44

    
45
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
46

    
47
inf = float('inf')
48

    
49
ULTIMATE_ANSWER = 42
50

    
51

    
52
logger = logging.getLogger(__name__)
53

    
54

    
55
def backend_method(func=None, autocommit=1):
56
    if func is None:
57
        def fn(func):
58
            return backend_method(func, autocommit)
59
        return fn
60

    
61
    if not autocommit:
62
        return func
63
    def fn(self, *args, **kw):
64
        self.wrapper.execute()
65
        try:
66
            ret = func(self, *args, **kw)
67
            self.wrapper.commit()
68
            return ret
69
        except:
70
            self.wrapper.rollback()
71
            raise
72
    return fn
73

    
74

    
75
class ModularBackend(BaseBackend):
76
    """A modular backend.
77
    
78
    Uses modules for SQL functions and storage.
79
    """
80
    
81
    def __init__(self, db_module, db_connection, block_module, block_path):
82
        db_module = db_module or 'pithos.backends.lib.sqlalchemy'
83
        block_module = block_module or 'pithos.backends.lib.hashfiler'
84
        
85
        self.hash_algorithm = 'sha256'
86
        self.block_size = 4 * 1024 * 1024 # 4MB
87
        
88
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
89
        
90
        __import__(db_module)
91
        self.db_module = sys.modules[db_module]
92
        self.wrapper = self.db_module.DBWrapper(db_connection)
93
        
94
        params = {'wrapper': self.wrapper}
95
        self.permissions = self.db_module.Permissions(**params)
96
        for x in ['READ', 'WRITE']:
97
            setattr(self, x, getattr(self.db_module, x))
98
        self.node = self.db_module.Node(**params)
99
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
100
            setattr(self, x, getattr(self.db_module, x))
101
        
102
        __import__(block_module)
103
        self.block_module = sys.modules[block_module]
104
        
105
        params = {'path': block_path,
106
                  'block_size': self.block_size,
107
                  'hash_algorithm': self.hash_algorithm}
108
        self.store = self.block_module.Store(**params)
109
    
110
    def close(self):
111
        self.wrapper.close()
112
    
113
    @backend_method
114
    def list_accounts(self, user, marker=None, limit=10000):
115
        """Return a list of accounts the user can access."""
116
        
117
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
118
        allowed = self._allowed_accounts(user)
119
        start, limit = self._list_limits(allowed, marker, limit)
120
        return allowed[start:start + limit]
121
    
122
    @backend_method
123
    def get_account_meta(self, user, account, domain, until=None):
124
        """Return a dictionary with the account metadata for the domain."""
125
        
126
        logger.debug("get_account_meta: %s %s %s", account, domain, until)
127
        path, node = self._lookup_account(account, user == account)
128
        if user != account:
129
            if until or node is None or account not in self._allowed_accounts(user):
130
                raise NotAllowedError
131
        try:
132
            props = self._get_properties(node, until)
133
            mtime = props[self.MTIME]
134
        except NameError:
135
            props = None
136
            mtime = until
137
        count, bytes, tstamp = self._get_statistics(node, until)
138
        tstamp = max(tstamp, mtime)
139
        if until is None:
140
            modified = tstamp
141
        else:
142
            modified = self._get_statistics(node)[2] # Overall last modification.
143
            modified = max(modified, mtime)
144
        
145
        if user != account:
146
            meta = {'name': account}
147
        else:
148
            meta = {}
149
            if props is not None:
150
                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
151
            if until is not None:
152
                meta.update({'until_timestamp': tstamp})
153
            meta.update({'name': account, 'count': count, 'bytes': bytes})
154
        meta.update({'modified': modified})
155
        return meta
156
    
157
    @backend_method
158
    def update_account_meta(self, user, account, domain, meta, replace=False):
159
        """Update the metadata associated with the account for the domain."""
160
        
161
        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
162
        if user != account:
163
            raise NotAllowedError
164
        path, node = self._lookup_account(account, True)
165
        self._put_metadata(user, node, domain, meta, replace)
166
    
167
    @backend_method
168
    def get_account_groups(self, user, account):
169
        """Return a dictionary with the user groups defined for this account."""
170
        
171
        logger.debug("get_account_groups: %s", account)
172
        if user != account:
173
            if account not in self._allowed_accounts(user):
174
                raise NotAllowedError
175
            return {}
176
        self._lookup_account(account, True)
177
        return self.permissions.group_dict(account)
178
    
179
    @backend_method
180
    def update_account_groups(self, user, account, groups, replace=False):
181
        """Update the groups associated with the account."""
182
        
183
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
184
        if user != account:
185
            raise NotAllowedError
186
        self._lookup_account(account, True)
187
        self._check_groups(groups)
188
        if replace:
189
            self.permissions.group_destroy(account)
190
        for k, v in groups.iteritems():
191
            if not replace: # If not already deleted.
192
                self.permissions.group_delete(account, k)
193
            if v:
194
                self.permissions.group_addmany(account, k, v)
195
    
196
    @backend_method
197
    def get_account_policy(self, user, account):
198
        """Return a dictionary with the account policy."""
199
        
200
        logger.debug("get_account_policy: %s", account)
201
        if user != account:
202
            if account not in self._allowed_accounts(user):
203
                raise NotAllowedError
204
            return {}
205
        path, node = self._lookup_account(account, True)
206
        return self._get_policy(node)
207
    
208
    @backend_method
209
    def update_account_policy(self, user, account, policy, replace=False):
210
        """Update the policy associated with the account."""
211
        
212
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
213
        if user != account:
214
            raise NotAllowedError
215
        path, node = self._lookup_account(account, True)
216
        self._check_policy(policy)
217
        self._put_policy(node, policy, replace)
218
    
219
    @backend_method
220
    def put_account(self, user, account, policy={}):
221
        """Create a new account with the given name."""
222
        
223
        logger.debug("put_account: %s %s", account, policy)
224
        if user != account:
225
            raise NotAllowedError
226
        node = self.node.node_lookup(account)
227
        if node is not None:
228
            raise NameError('Account already exists')
229
        if policy:
230
            self._check_policy(policy)
231
        node = self._put_path(user, self.ROOTNODE, account)
232
        self._put_policy(node, policy, True)
233
    
234
    @backend_method
235
    def delete_account(self, user, account):
236
        """Delete the account with the given name."""
237
        
238
        logger.debug("delete_account: %s", account)
239
        if user != account:
240
            raise NotAllowedError
241
        node = self.node.node_lookup(account)
242
        if node is None:
243
            return
244
        if not self.node.node_remove(node):
245
            raise IndexError('Account is not empty')
246
        self.permissions.group_destroy(account)
247
    
248
    @backend_method
249
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
250
        """Return a list of containers existing under an account."""
251
        
252
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
253
        if user != account:
254
            if until or account not in self._allowed_accounts(user):
255
                raise NotAllowedError
256
            allowed = self._allowed_containers(user, account)
257
            start, limit = self._list_limits(allowed, marker, limit)
258
            return allowed[start:start + limit]
259
        if shared:
260
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
261
            allowed = list(set(allowed))
262
            start, limit = self._list_limits(allowed, marker, limit)
263
            return allowed[start:start + limit]
264
        node = self.node.node_lookup(account)
265
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
266
    
267
    @backend_method
268
    def get_container_meta(self, user, account, container, domain, until=None):
269
        """Return a dictionary with the container metadata for the domain."""
270
        
271
        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
272
        if user != account:
273
            if until or container not in self._allowed_containers(user, account):
274
                raise NotAllowedError
275
        path, node = self._lookup_container(account, container)
276
        props = self._get_properties(node, until)
277
        mtime = props[self.MTIME]
278
        count, bytes, tstamp = self._get_statistics(node, until)
279
        tstamp = max(tstamp, mtime)
280
        if until is None:
281
            modified = tstamp
282
        else:
283
            modified = self._get_statistics(node)[2] # Overall last modification.
284
            modified = max(modified, mtime)
285
        
286
        if user != account:
287
            meta = {'name': container}
288
        else:
289
            meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
290
            if until is not None:
291
                meta.update({'until_timestamp': tstamp})
292
            meta.update({'name': container, 'count': count, 'bytes': bytes})
293
        meta.update({'modified': modified})
294
        return meta
295
    
296
    @backend_method
297
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
298
        """Update the metadata associated with the container for the domain."""
299
        
300
        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
301
        if user != account:
302
            raise NotAllowedError
303
        path, node = self._lookup_container(account, container)
304
        self._put_metadata(user, node, domain, meta, replace)
305
    
306
    @backend_method
307
    def get_container_policy(self, user, account, container):
308
        """Return a dictionary with the container policy."""
309
        
310
        logger.debug("get_container_policy: %s %s", account, container)
311
        if user != account:
312
            if container not in self._allowed_containers(user, account):
313
                raise NotAllowedError
314
            return {}
315
        path, node = self._lookup_container(account, container)
316
        return self._get_policy(node)
317
    
318
    @backend_method
319
    def update_container_policy(self, user, account, container, policy, replace=False):
320
        """Update the policy associated with the container."""
321
        
322
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
323
        if user != account:
324
            raise NotAllowedError
325
        path, node = self._lookup_container(account, container)
326
        self._check_policy(policy)
327
        self._put_policy(node, policy, replace)
328
    
329
    @backend_method
330
    def put_container(self, user, account, container, policy={}):
331
        """Create a new container with the given name."""
332
        
333
        logger.debug("put_container: %s %s %s", account, container, policy)
334
        if user != account:
335
            raise NotAllowedError
336
        try:
337
            path, node = self._lookup_container(account, container)
338
        except NameError:
339
            pass
340
        else:
341
            raise NameError('Container already exists')
342
        if policy:
343
            self._check_policy(policy)
344
        path = '/'.join((account, container))
345
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
346
        self._put_policy(node, policy, True)
347
    
348
    @backend_method
349
    def delete_container(self, user, account, container, until=None):
350
        """Delete/purge the container with the given name."""
351
        
352
        logger.debug("delete_container: %s %s %s", account, container, until)
353
        if user != account:
354
            raise NotAllowedError
355
        path, node = self._lookup_container(account, container)
356
        
357
        if until is not None:
358
            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
359
            for h in hashes:
360
                self.store.map_delete(h)
361
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
362
            return
363
        
364
        if self._get_statistics(node)[0] > 0:
365
            raise IndexError('Container is not empty')
366
        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
367
        for h in hashes:
368
            self.store.map_delete(h)
369
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
370
        self.node.node_remove(node)
371
    
372
    @backend_method
373
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None):
374
        """Return a list of objects existing under a container."""
375
        
376
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
377
        allowed = []
378
        if user != account:
379
            if until:
380
                raise NotAllowedError
381
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
382
            if not allowed:
383
                raise NotAllowedError
384
        else:
385
            if shared:
386
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
387
                if not allowed:
388
                    return []
389
        path, node = self._lookup_container(account, container)
390
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, allowed)
391
    
392
    @backend_method
393
    def list_object_meta(self, user, account, container, domain, until=None):
394
        """Return a list with all the container's object meta keys for the domain."""
395
        
396
        logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
397
        allowed = []
398
        if user != account:
399
            if until:
400
                raise NotAllowedError
401
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
402
            if not allowed:
403
                raise NotAllowedError
404
        path, node = self._lookup_container(account, container)
405
        before = until if until is not None else inf
406
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
407
    
408
    @backend_method
409
    def get_object_meta(self, user, account, container, name, domain, version=None):
410
        """Return a dictionary with the object metadata for the domain."""
411
        
412
        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
413
        self._can_read(user, account, container, name)
414
        path, node = self._lookup_object(account, container, name)
415
        props = self._get_version(node, version)
416
        if version is None:
417
            modified = props[self.MTIME]
418
        else:
419
            try:
420
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
421
            except NameError: # Object may be deleted.
422
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
423
                if del_props is None:
424
                    raise NameError('Object does not exist')
425
                modified = del_props[self.MTIME]
426
        
427
        meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
428
        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
429
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
430
        meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
431
        return meta
432
    
433
    @backend_method
434
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
435
        """Update the metadata associated with the object for the domain and return the new version."""
436
        
437
        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
438
        self._can_write(user, account, container, name)
439
        path, node = self._lookup_object(account, container, name)
440
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
441
        self._apply_versioning(account, container, src_version_id)
442
        return dest_version_id
443
    
444
    @backend_method
445
    def get_object_permissions(self, user, account, container, name):
446
        """Return the action allowed on the object, the path
447
        from which the object gets its permissions from,
448
        along with a dictionary containing the permissions."""
449
        
450
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
451
        allowed = 'write'
452
        if user != account:
453
            path = '/'.join((account, container, name))
454
            if self.permissions.access_check(path, self.WRITE, user):
455
                allowed = 'write'
456
            elif self.permissions.access_check(path, self.READ, user):
457
                allowed = 'read'
458
            else:
459
                raise NotAllowedError
460
        path = self._lookup_object(account, container, name)[0]
461
        return (allowed,) + self.permissions.access_inherit(path)
462
    
463
    @backend_method
464
    def update_object_permissions(self, user, account, container, name, permissions):
465
        """Update the permissions associated with the object."""
466
        
467
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
468
        if user != account:
469
            raise NotAllowedError
470
        path = self._lookup_object(account, container, name)[0]
471
        self._check_permissions(path, permissions)
472
        self.permissions.access_set(path, permissions)
473
    
474
    @backend_method
475
    def get_object_public(self, user, account, container, name):
476
        """Return the public id of the object if applicable."""
477
        
478
        logger.debug("get_object_public: %s %s %s", account, container, name)
479
        self._can_read(user, account, container, name)
480
        path = self._lookup_object(account, container, name)[0]
481
        p = self.permissions.public_get(path)
482
        if p is not None:
483
            p += ULTIMATE_ANSWER
484
        return p
485
    
486
    @backend_method
487
    def update_object_public(self, user, account, container, name, public):
488
        """Update the public status of the object."""
489
        
490
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
491
        self._can_write(user, account, container, name)
492
        path = self._lookup_object(account, container, name)[0]
493
        if not public:
494
            self.permissions.public_unset(path)
495
        else:
496
            self.permissions.public_set(path)
497
    
498
    @backend_method
499
    def get_object_hashmap(self, user, account, container, name, version=None):
500
        """Return the object's size and a list with partial hashes."""
501
        
502
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
503
        self._can_read(user, account, container, name)
504
        path, node = self._lookup_object(account, container, name)
505
        props = self._get_version(node, version)
506
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
507
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
508
    
509
    def _update_object_hash(self, user, account, container, name, size, hash, permissions, is_copy=False):
510
        if permissions is not None and user != account:
511
            raise NotAllowedError
512
        self._can_write(user, account, container, name)
513
        if permissions is not None:
514
            path = '/'.join((account, container, name))
515
            self._check_permissions(path, permissions)
516
        
517
        account_path, account_node = self._lookup_account(account, True)
518
        container_path, container_node = self._lookup_container(account, container)
519
        path, node = self._put_object_node(container_path, container_node, name)
520
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=size, hash=hash, is_copy=is_copy)
521
        
522
        # Check quota.
523
        size_delta = size # Change with versioning.
524
        if size_delta > 0:
525
            account_quota = long(self._get_policy(account_node)['quota'])
526
            container_quota = long(self._get_policy(container_node)['quota'])
527
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
528
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
529
                # This must be executed in a transaction, so the version is never created if it fails.
530
                raise QuotaError
531
        
532
        if permissions is not None:
533
            self.permissions.access_set(path, permissions)
534
        self._apply_versioning(account, container, src_version_id)
535
        return src_version_id, dest_version_id
536
    
537
    @backend_method
538
    def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
539
        """Create/update an object with the specified size and partial hashes."""
540
        
541
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
542
        if size == 0: # No such thing as an empty hashmap.
543
            hashmap = [self.put_block('')]
544
        map = HashMap(self.block_size, self.hash_algorithm)
545
        map.extend([binascii.unhexlify(x) for x in hashmap])
546
        missing = self.store.block_search(map)
547
        if missing:
548
            ie = IndexError()
549
            ie.data = [binascii.hexlify(x) for x in missing]
550
            raise ie
551
        
552
        hash = map.hash()
553
        src_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
554
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
555
        self.store.map_put(hash, map)
556
        return dest_version_id
557
    
558
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
559
        self._can_read(user, src_account, src_container, src_name)
560
        path, node = self._lookup_object(src_account, src_container, src_name)
561
        props = self._get_version(node, src_version)
562
        src_version_id = props[self.SERIAL]
563
        hash = props[self.HASH]
564
        size = props[self.SIZE]
565
        
566
        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
567
        src_v_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, is_copy=is_copy)
568
        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
569
        return dest_version_id
570
    
571
    @backend_method
572
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
573
        """Copy an object's data and metadata."""
574
        
575
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
576
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
577
    
578
    @backend_method
579
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
580
        """Move an object's data and metadata."""
581
        
582
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
583
        if user != src_account:
584
            raise NotAllowedError
585
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
586
        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
587
            self._delete_object(user, src_account, src_container, src_name)
588
        return dest_version_id
589
    
590
    def _delete_object(self, user, account, container, name, until=None):
591
        if user != account:
592
            raise NotAllowedError
593
        
594
        if until is not None:
595
            path = '/'.join((account, container, name))
596
            node = self.node.node_lookup(path)
597
            if node is None:
598
                return
599
            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
600
            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
601
            for h in hashes:
602
                self.store.map_delete(h)
603
            self.node.node_purge(node, until, CLUSTER_DELETED)
604
            try:
605
                props = self._get_version(node)
606
            except NameError:
607
                self.permissions.access_clear(path)
608
            return
609
        
610
        path, node = self._lookup_object(account, container, name)
611
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
612
        self._apply_versioning(account, container, src_version_id)
613
        self.permissions.access_clear(path)
614
    
615
    @backend_method
616
    def delete_object(self, user, account, container, name, until=None):
617
        """Delete/purge an object."""
618
        
619
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
620
        self._delete_object(user, account, container, name, until)
621
    
622
    @backend_method
623
    def list_versions(self, user, account, container, name):
624
        """Return a list of all (version, version_timestamp) tuples for an object."""
625
        
626
        logger.debug("list_versions: %s %s %s", account, container, name)
627
        self._can_read(user, account, container, name)
628
        path, node = self._lookup_object(account, container, name)
629
        versions = self.node.node_get_versions(node)
630
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
631
    
632
    @backend_method
633
    def get_uuid(self, user, uuid):
634
        """Return the (account, container, name) for the UUID given."""
635
        logger.debug("get_uuid: %s", uuid)
636
        info = self.node.latest_uuid(uuid)
637
        if info is None:
638
            raise NameError
639
        path, serial = info
640
        account, container, name = path.split('/', 2)
641
        self._can_read(user, account, container, name)
642
        return (account, container, name)
643
    
644
    @backend_method
645
    def get_public(self, user, public):
646
        """Return the (account, container, name) for the public id given."""
647
        logger.debug("get_public: %s", public)
648
        if public is None or public < ULTIMATE_ANSWER:
649
            raise NameError
650
        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
651
        if path is None:
652
            raise NameError
653
        account, container, name = path.split('/', 2)
654
        self._can_read(user, account, container, name)
655
        return (account, container, name)
656
    
657
    @backend_method(autocommit=0)
658
    def get_block(self, hash):
659
        """Return a block's data."""
660
        
661
        logger.debug("get_block: %s", hash)
662
        block = self.store.block_get(binascii.unhexlify(hash))
663
        if not block:
664
            raise NameError('Block does not exist')
665
        return block
666
    
667
    @backend_method(autocommit=0)
668
    def put_block(self, data):
669
        """Store a block and return the hash."""
670
        
671
        logger.debug("put_block: %s", len(data))
672
        return binascii.hexlify(self.store.block_put(data))
673
    
674
    @backend_method(autocommit=0)
675
    def update_block(self, hash, data, offset=0):
676
        """Update a known block and return the hash."""
677
        
678
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
679
        if offset == 0 and len(data) == self.block_size:
680
            return self.put_block(data)
681
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
682
        return binascii.hexlify(h)
683
    
684
    # Path functions.
685
    
686
    def _generate_uuid(self):
687
        return str(uuidlib.uuid4())
688
    
689
    def _put_object_node(self, path, parent, name):
690
        path = '/'.join((path, name))
691
        node = self.node.node_lookup(path)
692
        if node is None:
693
            node = self.node.node_create(parent, path)
694
        return path, node
695
    
696
    def _put_path(self, user, parent, path):
697
        node = self.node.node_create(parent, path)
698
        self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
699
        return node
700
    
701
    def _lookup_account(self, account, create=True):
702
        node = self.node.node_lookup(account)
703
        if node is None and create:
704
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
705
        return account, node
706
    
707
    def _lookup_container(self, account, container):
708
        path = '/'.join((account, container))
709
        node = self.node.node_lookup(path)
710
        if node is None:
711
            raise NameError('Container does not exist')
712
        return path, node
713
    
714
    def _lookup_object(self, account, container, name):
715
        path = '/'.join((account, container, name))
716
        node = self.node.node_lookup(path)
717
        if node is None:
718
            raise NameError('Object does not exist')
719
        return path, node
720
    
721
    def _get_properties(self, node, until=None):
722
        """Return properties until the timestamp given."""
723
        
724
        before = until if until is not None else inf
725
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
726
        if props is None and until is not None:
727
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
728
        if props is None:
729
            raise NameError('Path does not exist')
730
        return props
731
    
732
    def _get_statistics(self, node, until=None):
733
        """Return count, sum of size and latest timestamp of everything under node."""
734
        
735
        if until is None:
736
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
737
        else:
738
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
739
        if stats is None:
740
            stats = (0, 0, 0)
741
        return stats
742
    
743
    def _get_version(self, node, version=None):
744
        if version is None:
745
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
746
            if props is None:
747
                raise NameError('Object does not exist')
748
        else:
749
            try:
750
                version = int(version)
751
            except ValueError:
752
                raise IndexError('Version does not exist')
753
            props = self.node.version_get_properties(version)
754
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
755
                raise IndexError('Version does not exist')
756
        return props
757
    
758
    def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
759
        """Create a new version of the node."""
760
        
761
        props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
762
        if props is not None:
763
            src_version_id = props[self.SERIAL]
764
            src_hash = props[self.HASH]
765
            src_size = props[self.SIZE]
766
        else:
767
            src_version_id = None
768
            src_hash = None
769
            src_size = 0
770
        if size is None:
771
            hash = src_hash # This way hash can be set to None.
772
            size = src_size
773
        uuid = self._generate_uuid() if is_copy or src_version_id is None else props[self.UUID]
774
        
775
        if src_version_id is not None:
776
            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
777
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
778
        return src_version_id, dest_version_id
779
    
780
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
781
        if src_version_id is not None:
782
            self.node.attribute_copy(src_version_id, dest_version_id)
783
        if not replace:
784
            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
785
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
786
        else:
787
            self.node.attribute_del(dest_version_id, domain)
788
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
789
    
790
    def _put_metadata(self, user, node, domain, meta, replace=False):
791
        """Create a new version and store metadata."""
792
        
793
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
794
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
795
        return src_version_id, dest_version_id
796
    
797
    def _list_limits(self, listing, marker, limit):
798
        start = 0
799
        if marker:
800
            try:
801
                start = listing.index(marker) + 1
802
            except ValueError:
803
                pass
804
        if not limit or limit > 10000:
805
            limit = 10000
806
        return start, limit
807
    
808
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, allowed=[]):
809
        cont_prefix = path + '/'
810
        prefix = cont_prefix + prefix
811
        start = cont_prefix + marker if marker else None
812
        before = until if until is not None else inf
813
        filterq = keys if domain else []
814
        
815
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq)
816
        objects.extend([(p, None) for p in prefixes] if virtual else [])
817
        objects.sort(key=lambda x: x[0])
818
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
819
        
820
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
821
        return objects[start:start + limit]
822
    
823
    # Policy functions.
824
    
825
    def _check_policy(self, policy):
826
        for k in policy.keys():
827
            if policy[k] == '':
828
                policy[k] = self.default_policy.get(k)
829
        for k, v in policy.iteritems():
830
            if k == 'quota':
831
                q = int(v) # May raise ValueError.
832
                if q < 0:
833
                    raise ValueError
834
            elif k == 'versioning':
835
                if v not in ['auto', 'none']:
836
                    raise ValueError
837
            else:
838
                raise ValueError
839
    
840
    def _put_policy(self, node, policy, replace):
841
        if replace:
842
            for k, v in self.default_policy.iteritems():
843
                if k not in policy:
844
                    policy[k] = v
845
        self.node.policy_set(node, policy)
846
    
847
    def _get_policy(self, node):
848
        policy = self.default_policy.copy()
849
        policy.update(self.node.policy_get(node))
850
        return policy
851
    
852
    def _apply_versioning(self, account, container, version_id):
853
        if version_id is None:
854
            return
855
        path, node = self._lookup_container(account, container)
856
        versioning = self._get_policy(node)['versioning']
857
        if versioning != 'auto':
858
            hash = self.node.version_remove(version_id)
859
            self.store.map_delete(hash)
860
    
861
    # Access control functions.
862
    
863
    def _check_groups(self, groups):
864
        # raise ValueError('Bad characters in groups')
865
        pass
866
    
867
    def _check_permissions(self, path, permissions):
868
        # raise ValueError('Bad characters in permissions')
869
        
870
        # Check for existing permissions.
871
        paths = self.permissions.access_list(path)
872
        if paths:
873
            ae = AttributeError()
874
            ae.data = paths
875
            raise ae
876
    
877
    def _can_read(self, user, account, container, name):
878
        if user == account:
879
            return True
880
        path = '/'.join((account, container, name))
881
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
882
            raise NotAllowedError
883
    
884
    def _can_write(self, user, account, container, name):
885
        if user == account:
886
            return True
887
        path = '/'.join((account, container, name))
888
        if not self.permissions.access_check(path, self.WRITE, user):
889
            raise NotAllowedError
890
    
891
    def _allowed_accounts(self, user):
892
        allow = set()
893
        for path in self.permissions.access_list_paths(user):
894
            allow.add(path.split('/', 1)[0])
895
        return sorted(allow)
896
    
897
    def _allowed_containers(self, user, account):
898
        allow = set()
899
        for path in self.permissions.access_list_paths(user, account):
900
            allow.add(path.split('/', 2)[1])
901
        return sorted(allow)