Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ 4a1c29ea

History | View | Annotate | Download (36.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import logging
38
import hashlib
39
import binascii
40

    
41
from base import NotAllowedError, QuotaError, BaseBackend
42

    
43
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
44

    
45
inf = float('inf')
46

    
47

    
48
logger = logging.getLogger(__name__)
49

    
50

    
51
class HashMap(list):
52
    
53
    def __init__(self, blocksize, blockhash):
54
        super(HashMap, self).__init__()
55
        self.blocksize = blocksize
56
        self.blockhash = blockhash
57
    
58
    def _hash_raw(self, v):
59
        h = hashlib.new(self.blockhash)
60
        h.update(v)
61
        return h.digest()
62
    
63
    def hash(self):
64
        if len(self) == 0:
65
            return self._hash_raw('')
66
        if len(self) == 1:
67
            return self.__getitem__(0)
68
        
69
        h = list(self)
70
        s = 2
71
        while s < len(h):
72
            s = s * 2
73
        h += [('\x00' * len(h[0]))] * (s - len(h))
74
        while len(h) > 1:
75
            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
76
        return h[0]
77

    
78

    
79
def backend_method(func=None, autocommit=1):
80
    if func is None:
81
        def fn(func):
82
            return backend_method(func, autocommit)
83
        return fn
84

    
85
    if not autocommit:
86
        return func
87
    def fn(self, *args, **kw):
88
        self.wrapper.execute()
89
        try:
90
            ret = func(self, *args, **kw)
91
            self.wrapper.commit()
92
            return ret
93
        except:
94
            self.wrapper.rollback()
95
            raise
96
    return fn
97

    
98

    
99
class ModularBackend(BaseBackend):
100
    """A modular backend.
101
    
102
    Uses modules for SQL functions and storage.
103
    """
104
    
105
    def __init__(self, db_module, db_connection, block_module, block_path):
106
        self.hash_algorithm = 'sha256'
107
        self.block_size = 4 * 1024 * 1024 # 4MB
108
        
109
        self.default_policy = {'quota': 0, 'versioning': 'manual'}
110
        
111
        __import__(db_module)
112
        self.db_module = sys.modules[db_module]
113
        self.wrapper = self.db_module.DBWrapper(db_connection)
114
        
115
        params = {'wrapper': self.wrapper}
116
        self.permissions = self.db_module.Permissions(**params)
117
        for x in ['READ', 'WRITE']:
118
            setattr(self, x, getattr(self.db_module, x))
119
        self.node = self.db_module.Node(**params)
120
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
121
            setattr(self, x, getattr(self.db_module, x))
122
        
123
        __import__(block_module)
124
        self.block_module = sys.modules[block_module]
125
        
126
        params = {'path': block_path,
127
                  'block_size': self.block_size,
128
                  'hash_algorithm': self.hash_algorithm}
129
        self.store = self.block_module.Store(**params)
130
    
131
    def close(self):
132
        self.wrapper.close()
133
    
134
    @backend_method
135
    def list_accounts(self, user, marker=None, limit=10000):
136
        """Return a list of accounts the user can access."""
137
        
138
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
139
        allowed = self._allowed_accounts(user)
140
        start, limit = self._list_limits(allowed, marker, limit)
141
        return allowed[start:start + limit]
142
    
143
    @backend_method
144
    def get_account_meta(self, user, account, until=None):
145
        """Return a dictionary with the account metadata."""
146
        
147
        logger.debug("get_account_meta: %s %s", account, until)
148
        path, node = self._lookup_account(account, user == account)
149
        if user != account:
150
            if until or node is None or account not in self._allowed_accounts(user):
151
                raise NotAllowedError
152
        try:
153
            props = self._get_properties(node, until)
154
            mtime = props[self.MTIME]
155
        except NameError:
156
            props = None
157
            mtime = until
158
        count, bytes, tstamp = self._get_statistics(node, until)
159
        tstamp = max(tstamp, mtime)
160
        if until is None:
161
            modified = tstamp
162
        else:
163
            modified = self._get_statistics(node)[2] # Overall last modification.
164
            modified = max(modified, mtime)
165
        
166
        if user != account:
167
            meta = {'name': account}
168
        else:
169
            meta = {}
170
            if props is not None:
171
                meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
172
            if until is not None:
173
                meta.update({'until_timestamp': tstamp})
174
            meta.update({'name': account, 'count': count, 'bytes': bytes})
175
        meta.update({'modified': modified})
176
        return meta
177
    
178
    @backend_method
179
    def update_account_meta(self, user, account, meta, replace=False):
180
        """Update the metadata associated with the account."""
181
        
182
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
183
        if user != account:
184
            raise NotAllowedError
185
        path, node = self._lookup_account(account, True)
186
        self._put_metadata(user, node, meta, replace)
187
    
188
    @backend_method
189
    def get_account_groups(self, user, account):
190
        """Return a dictionary with the user groups defined for this account."""
191
        
192
        logger.debug("get_account_groups: %s", account)
193
        if user != account:
194
            if account not in self._allowed_accounts(user):
195
                raise NotAllowedError
196
            return {}
197
        self._lookup_account(account, True)
198
        return self.permissions.group_dict(account)
199
    
200
    @backend_method
201
    def update_account_groups(self, user, account, groups, replace=False):
202
        """Update the groups associated with the account."""
203
        
204
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
205
        if user != account:
206
            raise NotAllowedError
207
        self._lookup_account(account, True)
208
        self._check_groups(groups)
209
        if replace:
210
            self.permissions.group_destroy(account)
211
        for k, v in groups.iteritems():
212
            if not replace: # If not already deleted.
213
                self.permissions.group_delete(account, k)
214
            if v:
215
                self.permissions.group_addmany(account, k, v)
216
    
217
    @backend_method
218
    def get_account_policy(self, user, account):
219
        """Return a dictionary with the account policy."""
220
        
221
        logger.debug("get_account_policy: %s", account)
222
        if user != account:
223
            if account not in self._allowed_accounts(user):
224
                raise NotAllowedError
225
            return {}
226
        path, node = self._lookup_account(account, True)
227
        return self._get_policy(node)
228
    
229
    @backend_method
230
    def update_account_policy(self, user, account, policy, replace=False):
231
        """Update the policy associated with the account."""
232
        
233
        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
234
        if user != account:
235
            raise NotAllowedError
236
        path, node = self._lookup_account(account, True)
237
        self._check_policy(policy)
238
        self._put_policy(node, policy, replace)
239
    
240
    @backend_method
241
    def put_account(self, user, account, policy={}):
242
        """Create a new account with the given name."""
243
        
244
        logger.debug("put_account: %s %s", account, policy)
245
        if user != account:
246
            raise NotAllowedError
247
        node = self.node.node_lookup(account)
248
        if node is not None:
249
            raise NameError('Account already exists')
250
        if policy:
251
            self._check_policy(policy)
252
        node = self._put_path(user, self.ROOTNODE, account)
253
        self._put_policy(node, policy, True)
254
    
255
    @backend_method
256
    def delete_account(self, user, account):
257
        """Delete the account with the given name."""
258
        
259
        logger.debug("delete_account: %s", account)
260
        if user != account:
261
            raise NotAllowedError
262
        node = self.node.node_lookup(account)
263
        if node is None:
264
            return
265
        if not self.node.node_remove(node):
266
            raise IndexError('Account is not empty')
267
        self.permissions.group_destroy(account)
268
    
269
    @backend_method
270
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
271
        """Return a list of containers existing under an account."""
272
        
273
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
274
        if user != account:
275
            if until or account not in self._allowed_accounts(user):
276
                raise NotAllowedError
277
            allowed = self._allowed_containers(user, account)
278
            start, limit = self._list_limits(allowed, marker, limit)
279
            return allowed[start:start + limit]
280
        if shared:
281
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
282
            allowed = list(set(allowed))
283
            start, limit = self._list_limits(allowed, marker, limit)
284
            return allowed[start:start + limit]
285
        node = self.node.node_lookup(account)
286
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
287
    
288
    @backend_method
289
    def get_container_meta(self, user, account, container, until=None):
290
        """Return a dictionary with the container metadata."""
291
        
292
        logger.debug("get_container_meta: %s %s %s", account, container, until)
293
        if user != account:
294
            if until or container not in self._allowed_containers(user, account):
295
                raise NotAllowedError
296
        path, node = self._lookup_container(account, container)
297
        props = self._get_properties(node, until)
298
        mtime = props[self.MTIME]
299
        count, bytes, tstamp = self._get_statistics(node, until)
300
        tstamp = max(tstamp, mtime)
301
        if until is None:
302
            modified = tstamp
303
        else:
304
            modified = self._get_statistics(node)[2] # Overall last modification.
305
            modified = max(modified, mtime)
306
        
307
        if user != account:
308
            meta = {'name': container}
309
        else:
310
            meta = dict(self.node.attribute_get(props[self.SERIAL]))
311
            if until is not None:
312
                meta.update({'until_timestamp': tstamp})
313
            meta.update({'name': container, 'count': count, 'bytes': bytes})
314
        meta.update({'modified': modified})
315
        return meta
316
    
317
    @backend_method
318
    def update_container_meta(self, user, account, container, meta, replace=False):
319
        """Update the metadata associated with the container."""
320
        
321
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
322
        if user != account:
323
            raise NotAllowedError
324
        path, node = self._lookup_container(account, container)
325
        self._put_metadata(user, node, meta, replace)
326
    
327
    @backend_method
328
    def get_container_policy(self, user, account, container):
329
        """Return a dictionary with the container policy."""
330
        
331
        logger.debug("get_container_policy: %s %s", account, container)
332
        if user != account:
333
            if container not in self._allowed_containers(user, account):
334
                raise NotAllowedError
335
            return {}
336
        path, node = self._lookup_container(account, container)
337
        return self._get_policy(node)
338
    
339
    @backend_method
340
    def update_container_policy(self, user, account, container, policy, replace=False):
341
        """Update the policy associated with the container."""
342
        
343
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
344
        if user != account:
345
            raise NotAllowedError
346
        path, node = self._lookup_container(account, container)
347
        self._check_policy(policy)
348
        self._put_policy(node, policy, replace)
349
    
350
    @backend_method
351
    def put_container(self, user, account, container, policy={}):
352
        """Create a new container with the given name."""
353
        
354
        logger.debug("put_container: %s %s %s", account, container, policy)
355
        if user != account:
356
            raise NotAllowedError
357
        try:
358
            path, node = self._lookup_container(account, container)
359
        except NameError:
360
            pass
361
        else:
362
            raise NameError('Container already exists')
363
        if policy:
364
            self._check_policy(policy)
365
        path = '/'.join((account, container))
366
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
367
        self._put_policy(node, policy, True)
368
    
369
    @backend_method
370
    def delete_container(self, user, account, container, until=None):
371
        """Delete/purge the container with the given name."""
372
        
373
        logger.debug("delete_container: %s %s %s", account, container, until)
374
        if user != account:
375
            raise NotAllowedError
376
        path, node = self._lookup_container(account, container)
377
        
378
        if until is not None:
379
            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
380
            for h in hashes:
381
                self.store.map_delete(h)
382
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
383
            return
384
        
385
        if self._get_statistics(node)[0] > 0:
386
            raise IndexError('Container is not empty')
387
        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
388
        for h in hashes:
389
            self.store.map_delete(h)
390
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
391
        self.node.node_remove(node)
392
    
393
    @backend_method
394
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
395
        """Return a list of objects existing under a container."""
396
        
397
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
398
        allowed = []
399
        if user != account:
400
            if until:
401
                raise NotAllowedError
402
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
403
            if not allowed:
404
                raise NotAllowedError
405
        else:
406
            if shared:
407
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
408
                if not allowed:
409
                    return []
410
        path, node = self._lookup_container(account, container)
411
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
412
    
413
    @backend_method
414
    def list_object_meta(self, user, account, container, until=None):
415
        """Return a list with all the container's object meta keys."""
416
        
417
        logger.debug("list_object_meta: %s %s %s", account, container, until)
418
        allowed = []
419
        if user != account:
420
            if until:
421
                raise NotAllowedError
422
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
423
            if not allowed:
424
                raise NotAllowedError
425
        path, node = self._lookup_container(account, container)
426
        before = until if until is not None else inf
427
        return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
428
    
429
    @backend_method
430
    def get_object_meta(self, user, account, container, name, version=None):
431
        """Return a dictionary with the object metadata."""
432
        
433
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
434
        self._can_read(user, account, container, name)
435
        path, node = self._lookup_object(account, container, name)
436
        props = self._get_version(node, version)
437
        if version is None:
438
            modified = props[self.MTIME]
439
        else:
440
            try:
441
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
442
            except NameError: # Object may be deleted.
443
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
444
                if del_props is None:
445
                    raise NameError('Object does not exist')
446
                modified = del_props[self.MTIME]
447
        
448
        meta = dict(self.node.attribute_get(props[self.SERIAL]))
449
        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
450
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
451
        meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
452
        return meta
453
    
454
    @backend_method
455
    def update_object_meta(self, user, account, container, name, meta, replace=False):
456
        """Update the metadata associated with the object."""
457
        
458
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
459
        self._can_write(user, account, container, name)
460
        path, node = self._lookup_object(account, container, name)
461
        return self._put_metadata(user, node, meta, replace)
462
    
463
    @backend_method
464
    def get_object_permissions(self, user, account, container, name):
465
        """Return the action allowed on the object, the path
466
        from which the object gets its permissions from,
467
        along with a dictionary containing the permissions."""
468
        
469
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
470
        allowed = 'write'
471
        if user != account:
472
            path = '/'.join((account, container, name))
473
            if self.permissions.access_check(path, self.WRITE, user):
474
                allowed = 'write'
475
            elif self.permissions.access_check(path, self.READ, user):
476
                allowed = 'read'
477
            else:
478
                raise NotAllowedError
479
        path = self._lookup_object(account, container, name)[0]
480
        return (allowed,) + self.permissions.access_inherit(path)
481
    
482
    @backend_method
483
    def update_object_permissions(self, user, account, container, name, permissions):
484
        """Update the permissions associated with the object."""
485
        
486
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
487
        if user != account:
488
            raise NotAllowedError
489
        path = self._lookup_object(account, container, name)[0]
490
        self._check_permissions(path, permissions)
491
        self.permissions.access_set(path, permissions)
492
    
493
    @backend_method
494
    def get_object_public(self, user, account, container, name):
495
        """Return the public URL of the object if applicable."""
496
        
497
        logger.debug("get_object_public: %s %s %s", account, container, name)
498
        self._can_read(user, account, container, name)
499
        path = self._lookup_object(account, container, name)[0]
500
        if self.permissions.public_check(path):
501
            return '/public/' + path
502
        return None
503
    
504
    @backend_method
505
    def update_object_public(self, user, account, container, name, public):
506
        """Update the public status of the object."""
507
        
508
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
509
        self._can_write(user, account, container, name)
510
        path = self._lookup_object(account, container, name)[0]
511
        if not public:
512
            self.permissions.public_unset(path)
513
        else:
514
            self.permissions.public_set(path)
515
    
516
    @backend_method
517
    def get_object_hashmap(self, user, account, container, name, version=None):
518
        """Return the object's size and a list with partial hashes."""
519
        
520
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
521
        self._can_read(user, account, container, name)
522
        path, node = self._lookup_object(account, container, name)
523
        props = self._get_version(node, version)
524
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
525
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
526
    
527
    def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
528
        if permissions is not None and user != account:
529
            raise NotAllowedError
530
        self._can_write(user, account, container, name)
531
        if permissions is not None:
532
            path = '/'.join((account, container, name))
533
            self._check_permissions(path, permissions)
534
        
535
        account_path, account_node = self._lookup_account(account, True)
536
        container_path, container_node = self._lookup_container(account, container)
537
        path, node = self._put_object_node(container_path, container_node, name)
538
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
539
        
540
        # Check quota.
541
        size_delta = size # Change with versioning.
542
        if size_delta > 0:
543
            account_quota = long(self._get_policy(account_node)['quota'])
544
            container_quota = long(self._get_policy(container_node)['quota'])
545
            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
546
               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
547
                # This must be executed in a transaction, so the version is never created if it fails.
548
                raise QuotaError
549
        
550
        if not replace_meta and src_version_id is not None:
551
            self.node.attribute_copy(src_version_id, dest_version_id)
552
        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
553
        if permissions is not None:
554
            self.permissions.access_set(path, permissions)
555
        return dest_version_id
556
    
557
    @backend_method
558
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
559
        """Create/update an object with the specified size and partial hashes."""
560
        
561
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
562
        if size == 0: # No such thing as an empty hashmap.
563
            hashmap = [self.put_block('')]
564
        map = HashMap(self.block_size, self.hash_algorithm)
565
        map.extend([binascii.unhexlify(x) for x in hashmap])
566
        missing = self.store.block_search(map)
567
        if missing:
568
            ie = IndexError()
569
            ie.data = [binascii.hexlify(x) for x in missing]
570
            raise ie
571
        
572
        hash = map.hash()
573
        dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
574
        self.store.map_put(hash, map)
575
        return dest_version_id
576
    
577
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
578
        self._can_read(user, src_account, src_container, src_name)
579
        path, node = self._lookup_object(src_account, src_container, src_name)
580
        props = self._get_version(node, src_version)
581
        src_version_id = props[self.SERIAL]
582
        hash = props[self.HASH]
583
        size = props[self.SIZE]
584
        
585
        if replace_meta:
586
            meta = dest_meta
587
        else:
588
            meta = {}
589
        dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
590
        if not replace_meta:
591
            self.node.attribute_copy(src_version_id, dest_version_id)
592
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
593
        return dest_version_id
594
    
595
    @backend_method
596
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
597
        """Copy an object's data and metadata."""
598
        
599
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
600
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
601
    
602
    @backend_method
603
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
604
        """Move an object's data and metadata."""
605
        
606
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
607
        if user != src_account:
608
            raise NotAllowedError
609
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
610
        self._delete_object(user, src_account, src_container, src_name)
611
        return dest_version_id
612
    
613
    def _delete_object(self, user, account, container, name, until=None):
614
        if user != account:
615
            raise NotAllowedError
616
        
617
        if until is not None:
618
            path = '/'.join((account, container, name))
619
            node = self.node.node_lookup(path)
620
            if node is None:
621
                return
622
            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
623
            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
624
            for h in hashes:
625
                self.store.map_delete(h)
626
            self.node.node_purge(node, until, CLUSTER_DELETED)
627
            try:
628
                props = self._get_version(node)
629
            except NameError:
630
                self.permissions.access_clear(path)
631
            return
632
        
633
        path, node = self._lookup_object(account, container, name)
634
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
635
        self.permissions.access_clear(path)
636
    
637
    @backend_method
638
    def delete_object(self, user, account, container, name, until=None):
639
        """Delete/purge an object."""
640
        
641
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
642
        self._delete_object(user, account, container, name, until)
643
    
644
    @backend_method
645
    def list_versions(self, user, account, container, name):
646
        """Return a list of all (version, version_timestamp) tuples for an object."""
647
        
648
        logger.debug("list_versions: %s %s %s", account, container, name)
649
        self._can_read(user, account, container, name)
650
        path, node = self._lookup_object(account, container, name)
651
        versions = self.node.node_get_versions(node)
652
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
653
    
654
    @backend_method(autocommit=0)
655
    def get_block(self, hash):
656
        """Return a block's data."""
657
        
658
        logger.debug("get_block: %s", hash)
659
        block = self.store.block_get(binascii.unhexlify(hash))
660
        if not block:
661
            raise NameError('Block does not exist')
662
        return block
663
    
664
    @backend_method(autocommit=0)
665
    def put_block(self, data):
666
        """Store a block and return the hash."""
667
        
668
        logger.debug("put_block: %s", len(data))
669
        return binascii.hexlify(self.store.block_put(data))
670
    
671
    @backend_method(autocommit=0)
672
    def update_block(self, hash, data, offset=0):
673
        """Update a known block and return the hash."""
674
        
675
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
676
        if offset == 0 and len(data) == self.block_size:
677
            return self.put_block(data)
678
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
679
        return binascii.hexlify(h)
680
    
681
    # Path functions.
682
    
683
    def _put_object_node(self, path, parent, name):
684
        path = '/'.join((path, name))
685
        node = self.node.node_lookup(path)
686
        if node is None:
687
            node = self.node.node_create(parent, path)
688
        return path, node
689
    
690
    def _put_path(self, user, parent, path):
691
        node = self.node.node_create(parent, path)
692
        self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
693
        return node
694
    
695
    def _lookup_account(self, account, create=True):
696
        node = self.node.node_lookup(account)
697
        if node is None and create:
698
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
699
        return account, node
700
    
701
    def _lookup_container(self, account, container):
702
        path = '/'.join((account, container))
703
        node = self.node.node_lookup(path)
704
        if node is None:
705
            raise NameError('Container does not exist')
706
        return path, node
707
    
708
    def _lookup_object(self, account, container, name):
709
        path = '/'.join((account, container, name))
710
        node = self.node.node_lookup(path)
711
        if node is None:
712
            raise NameError('Object does not exist')
713
        return path, node
714
    
715
    def _get_properties(self, node, until=None):
716
        """Return properties until the timestamp given."""
717
        
718
        before = until if until is not None else inf
719
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
720
        if props is None and until is not None:
721
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
722
        if props is None:
723
            raise NameError('Path does not exist')
724
        return props
725
    
726
    def _get_statistics(self, node, until=None):
727
        """Return count, sum of size and latest timestamp of everything under node."""
728
        
729
        if until is None:
730
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
731
        else:
732
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
733
        if stats is None:
734
            stats = (0, 0, 0)
735
        return stats
736
    
737
    def _get_version(self, node, version=None):
738
        if version is None:
739
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
740
            if props is None:
741
                raise NameError('Object does not exist')
742
        else:
743
            try:
744
                version = int(version)
745
            except ValueError:
746
                raise IndexError('Version does not exist')
747
            props = self.node.version_get_properties(version)
748
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
749
                raise IndexError('Version does not exist')
750
        return props
751
    
752
    def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
753
        """Create a new version of the node."""
754
        
755
        props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
756
        if props is not None:
757
            src_version_id = props[self.SERIAL]
758
            src_hash = props[self.HASH]
759
            src_size = props[self.SIZE]
760
        else:
761
            src_version_id = None
762
            src_hash = None
763
            src_size = 0
764
        if size is None:
765
            hash = src_hash # This way hash can be set to None.
766
            size = src_size
767
        
768
        if src_version_id is not None:
769
            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
770
        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
771
        return src_version_id, dest_version_id
772
    
773
    def _put_metadata(self, user, node, meta, replace=False):
774
        """Create a new version and store metadata."""
775
        
776
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
777
        
778
        # TODO: Merge with other functions that update metadata...
779
        if not replace:
780
            if src_version_id is not None:
781
                self.node.attribute_copy(src_version_id, dest_version_id)
782
            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
783
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
784
        else:
785
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
786
        return dest_version_id
787
    
788
    def _list_limits(self, listing, marker, limit):
789
        start = 0
790
        if marker:
791
            try:
792
                start = listing.index(marker) + 1
793
            except ValueError:
794
                pass
795
        if not limit or limit > 10000:
796
            limit = 10000
797
        return start, limit
798
    
799
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
800
        cont_prefix = path + '/'
801
        prefix = cont_prefix + prefix
802
        start = cont_prefix + marker if marker else None
803
        before = until if until is not None else inf
804
        filterq = ','.join(keys) if keys else None
805
        
806
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
807
        objects.extend([(p, None) for p in prefixes] if virtual else [])
808
        objects.sort(key=lambda x: x[0])
809
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
810
        
811
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
812
        return objects[start:start + limit]
813
    
814
    # Policy functions.
815
    
816
    def _check_policy(self, policy):
817
        for k in policy.keys():
818
            if policy[k] == '':
819
                policy[k] = self.default_policy.get(k)
820
        for k, v in policy.iteritems():
821
            if k == 'quota':
822
                q = int(v) # May raise ValueError.
823
                if q < 0:
824
                    raise ValueError
825
            elif k == 'versioning':
826
                if v not in ['auto', 'manual', 'none']:
827
                    raise ValueError
828
            else:
829
                raise ValueError
830
    
831
    def _put_policy(self, node, policy, replace):
832
        if replace:
833
            for k, v in self.default_policy.iteritems():
834
                if k not in policy:
835
                    policy[k] = v
836
        self.node.policy_set(node, policy)
837
    
838
    def _get_policy(self, node):
839
        policy = self.default_policy.copy()
840
        policy.update(self.node.policy_get(node))
841
        return policy
842
    
843
    # Access control functions.
844
    
845
    def _check_groups(self, groups):
846
        # raise ValueError('Bad characters in groups')
847
        pass
848
    
849
    def _check_permissions(self, path, permissions):
850
        # raise ValueError('Bad characters in permissions')
851
        
852
        # Check for existing permissions.
853
        paths = self.permissions.access_list(path)
854
        if paths:
855
            ae = AttributeError()
856
            ae.data = paths
857
            raise ae
858
    
859
    def _can_read(self, user, account, container, name):
860
        if user == account:
861
            return True
862
        path = '/'.join((account, container, name))
863
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
864
            raise NotAllowedError
865
    
866
    def _can_write(self, user, account, container, name):
867
        if user == account:
868
            return True
869
        path = '/'.join((account, container, name))
870
        if not self.permissions.access_check(path, self.WRITE, user):
871
            raise NotAllowedError
872
    
873
    def _allowed_accounts(self, user):
874
        allow = set()
875
        for path in self.permissions.access_list_paths(user):
876
            allow.add(path.split('/', 1)[0])
877
        return sorted(allow)
878
    
879
    def _allowed_containers(self, user, account):
880
        allow = set()
881
        for path in self.permissions.access_list_paths(user, account):
882
            allow.add(path.split('/', 2)[1])
883
        return sorted(allow)