Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ 79bb41b7

History | View | Annotate | Download (34.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import sys
35
import os
36
import time
37
import sqlite3
38
import logging
39
import hashlib
40
import binascii
41

    
42
from base import NotAllowedError, BaseBackend
43
from lib.hashfiler import Mapper, Blocker
44

    
45
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
46

    
47
inf = float('inf')
48

    
49

    
50
logger = logging.getLogger(__name__)
51

    
52
def backend_method(func=None, autocommit=1):
53
    if func is None:
54
        def fn(func):
55
            return backend_method(func, autocommit)
56
        return fn
57

    
58
    if not autocommit:
59
        return func
60
    def fn(self, *args, **kw):
61
        self.wrapper.execute()
62
        try:
63
            ret = func(self, *args, **kw)
64
            self.wrapper.commit()
65
            return ret
66
        except:
67
            self.wrapper.rollback()
68
            raise
69
    return fn
70

    
71

    
72
class ModularBackend(BaseBackend):
73
    """A modular backend.
74
    
75
    Uses modules for SQL functions and hashfiler for storage.
76
    """
77
    
78
    def __init__(self, mod, path, db):
79
        self.hash_algorithm = 'sha256'
80
        self.block_size = 4 * 1024 * 1024 # 4MB
81
        
82
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
83
        
84
        if path and not os.path.exists(path):
85
            os.makedirs(path)
86
        if not os.path.isdir(path):
87
            raise RuntimeError("Cannot open path '%s'" % (path,))
88
        
89
        __import__(mod)
90
        self.mod = sys.modules[mod]
91
        self.wrapper = self.mod.dbwrapper.DBWrapper(db)
92
        
93
        params = {'blocksize': self.block_size,
94
                  'blockpath': os.path.join(path + '/blocks'),
95
                  'hashtype': self.hash_algorithm}
96
        self.blocker = Blocker(**params)
97
        
98
        params = {'mappath': os.path.join(path + '/maps'),
99
                  'namelen': self.blocker.hashlen}
100
        self.mapper = Mapper(**params)
101
        
102
        params = {'wrapper': self.wrapper}
103
        self.permissions = self.mod.permissions.Permissions(**params)
104
        for x in ['READ', 'WRITE']:
105
            setattr(self, x, getattr(self.mod.permissions, x))
106
        self.policy = self.mod.policy.Policy(**params)
107
        self.node = self.mod.node.Node(**params)
108
        for x in ['ROOTNODE', 'SERIAL', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
109
            setattr(self, x, getattr(self.mod.node, x))
110
    
111
    @backend_method
112
    def list_accounts(self, user, marker=None, limit=10000):
113
        """Return a list of accounts the user can access."""
114
        
115
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
116
        allowed = self._allowed_accounts(user)
117
        start, limit = self._list_limits(allowed, marker, limit)
118
        return allowed[start:start + limit]
119
    
120
    @backend_method
121
    def get_account_meta(self, user, account, until=None):
122
        """Return a dictionary with the account metadata."""
123
        
124
        logger.debug("get_account_meta: %s %s", account, until)
125
        path, node = self._lookup_account(account, user == account)
126
        if user != account:
127
            if until or node is None or account not in self._allowed_accounts(user):
128
                raise NotAllowedError
129
        try:
130
            props = self._get_properties(node, until)
131
            mtime = props[self.MTIME]
132
        except NameError:
133
            props = None
134
            mtime = until
135
        count, bytes, tstamp = self._get_statistics(node, until)
136
        tstamp = max(tstamp, mtime)
137
        if until is None:
138
            modified = tstamp
139
        else:
140
            modified = self._get_statistics(node)[2] # Overall last modification.
141
            modified = max(modified, mtime)
142
        
143
        if user != account:
144
            meta = {'name': account}
145
        else:
146
            meta = {}
147
            if props is not None:
148
                meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
149
            if until is not None:
150
                meta.update({'until_timestamp': tstamp})
151
            meta.update({'name': account, 'count': count, 'bytes': bytes})
152
        meta.update({'modified': modified})
153
        return meta
154
    
155
    @backend_method
156
    def update_account_meta(self, user, account, meta, replace=False):
157
        """Update the metadata associated with the account."""
158
        
159
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
160
        if user != account:
161
            raise NotAllowedError
162
        path, node = self._lookup_account(account, True)
163
        self._put_metadata(user, node, meta, replace, False)
164
    
165
    @backend_method
166
    def get_account_groups(self, user, account):
167
        """Return a dictionary with the user groups defined for this account."""
168
        
169
        logger.debug("get_account_groups: %s", account)
170
        if user != account:
171
            if account not in self._allowed_accounts(user):
172
                raise NotAllowedError
173
            return {}
174
        self._lookup_account(account, True)
175
        return self.permissions.group_dict(account)
176
    
177
    @backend_method
178
    def update_account_groups(self, user, account, groups, replace=False):
179
        """Update the groups associated with the account."""
180
        
181
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
182
        if user != account:
183
            raise NotAllowedError
184
        self._lookup_account(account, True)
185
        self._check_groups(groups)
186
        if replace:
187
            self.permissions.group_destroy(account)
188
        for k, v in groups.iteritems():
189
            if not replace: # If not already deleted.
190
                self.permissions.group_delete(account, k)
191
            if v:
192
                self.permissions.group_addmany(account, k, v)
193
    
194
    @backend_method
195
    def put_account(self, user, account):
196
        """Create a new account with the given name."""
197
        
198
        logger.debug("put_account: %s", account)
199
        if user != account:
200
            raise NotAllowedError
201
        node = self.node.node_lookup(account)
202
        if node is not None:
203
            raise NameError('Account already exists')
204
        self._put_path(user, self.ROOTNODE, account)
205
    
206
    @backend_method
207
    def delete_account(self, user, account):
208
        """Delete the account with the given name."""
209
        
210
        logger.debug("delete_account: %s", account)
211
        if user != account:
212
            raise NotAllowedError
213
        node = self.node.node_lookup(account)
214
        if node is None:
215
            return
216
        if not self.node.node_remove(node):
217
            raise IndexError('Account is not empty')
218
        self.permissions.group_destroy(account)
219
    
220
    @backend_method
221
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
222
        """Return a list of containers existing under an account."""
223
        
224
        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
225
        if user != account:
226
            if until or account not in self._allowed_accounts(user):
227
                raise NotAllowedError
228
            allowed = self._allowed_containers(user, account)
229
            start, limit = self._list_limits(allowed, marker, limit)
230
            return allowed[start:start + limit]
231
        if shared:
232
            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
233
            allowed = list(set(allowed))
234
            start, limit = self._list_limits(allowed, marker, limit)
235
            return allowed[start:start + limit]
236
        node = self.node.node_lookup(account)
237
        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
238
    
239
    @backend_method
240
    def get_container_meta(self, user, account, container, until=None):
241
        """Return a dictionary with the container metadata."""
242
        
243
        logger.debug("get_container_meta: %s %s %s", account, container, until)
244
        if user != account:
245
            if until or container not in self._allowed_containers(user, account):
246
                raise NotAllowedError
247
        path, node = self._lookup_container(account, container)
248
        props = self._get_properties(node, until)
249
        mtime = props[self.MTIME]
250
        count, bytes, tstamp = self._get_statistics(node, until)
251
        tstamp = max(tstamp, mtime)
252
        if until is None:
253
            modified = tstamp
254
        else:
255
            modified = self._get_statistics(node)[2] # Overall last modification.
256
            modified = max(modified, mtime)
257
        
258
        if user != account:
259
            meta = {'name': container}
260
        else:
261
            meta = dict(self.node.attribute_get(props[self.SERIAL]))
262
            if until is not None:
263
                meta.update({'until_timestamp': tstamp})
264
            meta.update({'name': container, 'count': count, 'bytes': bytes})
265
        meta.update({'modified': modified})
266
        return meta
267
    
268
    @backend_method
269
    def update_container_meta(self, user, account, container, meta, replace=False):
270
        """Update the metadata associated with the container."""
271
        
272
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
273
        if user != account:
274
            raise NotAllowedError
275
        path, node = self._lookup_container(account, container)
276
        self._put_metadata(user, node, meta, replace, False)
277
    
278
    @backend_method
279
    def get_container_policy(self, user, account, container):
280
        """Return a dictionary with the container policy."""
281
        
282
        logger.debug("get_container_policy: %s %s", account, container)
283
        if user != account:
284
            if container not in self._allowed_containers(user, account):
285
                raise NotAllowedError
286
            return {}
287
        path = self._lookup_container(account, container)[0]
288
        return self.policy.policy_get(path)
289
    
290
    @backend_method
291
    def update_container_policy(self, user, account, container, policy, replace=False):
292
        """Update the policy associated with the account."""
293
        
294
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
295
        if user != account:
296
            raise NotAllowedError
297
        path = self._lookup_container(account, container)[0]
298
        self._check_policy(policy)
299
        if replace:
300
            for k, v in self.default_policy.iteritems():
301
                if k not in policy:
302
                    policy[k] = v
303
        self.policy.policy_set(path, policy)
304
    
305
    @backend_method
306
    def put_container(self, user, account, container, policy=None):
307
        """Create a new container with the given name."""
308
        
309
        logger.debug("put_container: %s %s %s", account, container, policy)
310
        if user != account:
311
            raise NotAllowedError
312
        try:
313
            path, node = self._lookup_container(account, container)
314
        except NameError:
315
            pass
316
        else:
317
            raise NameError('Container already exists')
318
        if policy:
319
            self._check_policy(policy)
320
        path = '/'.join((account, container))
321
        self._put_path(user, self._lookup_account(account, True)[1], path)
322
        for k, v in self.default_policy.iteritems():
323
            if k not in policy:
324
                policy[k] = v
325
        self.policy.policy_set(path, policy)
326
    
327
    @backend_method
328
    def delete_container(self, user, account, container, until=None):
329
        """Delete/purge the container with the given name."""
330
        
331
        logger.debug("delete_container: %s %s %s", account, container, until)
332
        if user != account:
333
            raise NotAllowedError
334
        path, node = self._lookup_container(account, container)
335
        
336
        if until is not None:
337
            versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
338
            for v in versions:
339
                self.mapper.map_remv(v)
340
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
341
            return
342
        
343
        if self._get_statistics(node)[0] > 0:
344
            raise IndexError('Container is not empty')
345
        versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
346
        for v in versions:
347
            self.mapper.map_remv(v)
348
        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
349
        self.node.node_remove(node)
350
        self.policy.policy_unset(path)
351
    
352
    @backend_method
353
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
354
        """Return a list of objects existing under a container."""
355
        
356
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
357
        allowed = []
358
        if user != account:
359
            if until:
360
                raise NotAllowedError
361
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
362
            if not allowed:
363
                raise NotAllowedError
364
        else:
365
            if shared:
366
                allowed = self.permissions.access_list_shared('/'.join((account, container)))
367
                if not allowed:
368
                    return []
369
        path, node = self._lookup_container(account, container)
370
        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
371
    
372
    @backend_method
373
    def list_object_meta(self, user, account, container, until=None):
374
        """Return a list with all the container's object meta keys."""
375
        
376
        logger.debug("list_object_meta: %s %s %s", account, container, until)
377
        allowed = []
378
        if user != account:
379
            if until:
380
                raise NotAllowedError
381
            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
382
            if not allowed:
383
                raise NotAllowedError
384
        path, node = self._lookup_container(account, container)
385
        before = until if until is not None else inf
386
        return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
387
    
388
    @backend_method
389
    def get_object_meta(self, user, account, container, name, version=None):
390
        """Return a dictionary with the object metadata."""
391
        
392
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
393
        self._can_read(user, account, container, name)
394
        path, node = self._lookup_object(account, container, name)
395
        props = self._get_version(node, version)
396
        if version is None:
397
            modified = props[self.MTIME]
398
        else:
399
            try:
400
                modified = self._get_version(node)[self.MTIME] # Overall last modification.
401
            except NameError: # Object may be deleted.
402
                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
403
                if del_props is None:
404
                    raise NameError('Object does not exist')
405
                modified = del_props[self.MTIME]
406
        
407
        meta = dict(self.node.attribute_get(props[self.SERIAL]))
408
        meta.update({'name': name, 'bytes': props[self.SIZE]})
409
        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
410
        meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
411
        return meta
412
    
413
    @backend_method
414
    def update_object_meta(self, user, account, container, name, meta, replace=False):
415
        """Update the metadata associated with the object."""
416
        
417
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
418
        self._can_write(user, account, container, name)
419
        path, node = self._lookup_object(account, container, name)
420
        return self._put_metadata(user, node, meta, replace)
421
    
422
    @backend_method
423
    def get_object_permissions(self, user, account, container, name):
424
        """Return the action allowed on the object, the path
425
        from which the object gets its permissions from,
426
        along with a dictionary containing the permissions."""
427
        
428
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
429
        allowed = 'write'
430
        if user != account:
431
            path = '/'.join((account, container, name))
432
            if self.permissions.access_check(path, self.WRITE, user):
433
                allowed = 'write'
434
            elif self.permissions.access_check(path, self.READ, user):
435
                allowed = 'read'
436
            else:
437
                raise NotAllowedError
438
        path = self._lookup_object(account, container, name)[0]
439
        return (allowed,) + self.permissions.access_inherit(path)
440
    
441
    @backend_method
442
    def update_object_permissions(self, user, account, container, name, permissions):
443
        """Update the permissions associated with the object."""
444
        
445
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
446
        if user != account:
447
            raise NotAllowedError
448
        path = self._lookup_object(account, container, name)[0]
449
        self._check_permissions(path, permissions)
450
        self.permissions.access_set(path, permissions)
451
    
452
    @backend_method
453
    def get_object_public(self, user, account, container, name):
454
        """Return the public URL of the object if applicable."""
455
        
456
        logger.debug("get_object_public: %s %s %s", account, container, name)
457
        self._can_read(user, account, container, name)
458
        path = self._lookup_object(account, container, name)[0]
459
        if self.permissions.public_check(path):
460
            return '/public/' + path
461
        return None
462
    
463
    @backend_method
464
    def update_object_public(self, user, account, container, name, public):
465
        """Update the public status of the object."""
466
        
467
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
468
        self._can_write(user, account, container, name)
469
        path = self._lookup_object(account, container, name)[0]
470
        if not public:
471
            self.permissions.public_unset(path)
472
        else:
473
            self.permissions.public_set(path)
474
    
475
    @backend_method
476
    def get_object_hashmap(self, user, account, container, name, version=None):
477
        """Return the object's size and a list with partial hashes."""
478
        
479
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
480
        self._can_read(user, account, container, name)
481
        path, node = self._lookup_object(account, container, name)
482
        props = self._get_version(node, version)
483
        hashmap = self.mapper.map_retr(props[self.SERIAL])
484
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
485
    
486
    @backend_method
487
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
488
        """Create/update an object with the specified size and partial hashes."""
489
        
490
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
491
        if permissions is not None and user != account:
492
            raise NotAllowedError
493
        self._can_write(user, account, container, name)
494
        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
495
        if missing:
496
            ie = IndexError()
497
            ie.data = [binascii.hexlify(x) for x in missing]
498
            raise ie
499
        if permissions is not None:
500
            path = '/'.join((account, container, name))
501
            self._check_permissions(path, permissions)
502
        path, node = self._put_object_node(account, container, name)
503
        src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
504
        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
505
        if not replace_meta and src_version_id is not None:
506
            self.node.attribute_copy(src_version_id, dest_version_id)
507
        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
508
        if permissions is not None:
509
            self.permissions.access_set(path, permissions)
510
        return dest_version_id
511
    
512
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
513
        if permissions is not None and user != dest_account:
514
            raise NotAllowedError
515
        self._can_read(user, src_account, src_container, src_name)
516
        self._can_write(user, dest_account, dest_container, dest_name)
517
        src_path, src_node = self._lookup_object(src_account, src_container, src_name)
518
        self._get_version(src_node, src_version)
519
        if permissions is not None:
520
            dest_path = '/'.join((dest_account, dest_container, dest_name))
521
            self._check_permissions(dest_path, permissions)
522
        dest_path, dest_node = self._put_object_node(dest_account, dest_container, dest_name)
523
        src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
524
        if src_version_id is not None:
525
            self._copy_data(src_version_id, dest_version_id)
526
        if not replace_meta and src_version_id is not None:
527
            self.node.attribute_copy(src_version_id, dest_version_id)
528
        self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
529
        if permissions is not None:
530
            self.permissions.access_set(dest_path, permissions)
531
        return dest_version_id
532
    
533
    @backend_method
534
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
535
        """Copy an object's data and metadata."""
536
        
537
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
538
        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
539
    
540
    @backend_method
541
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
542
        """Move an object's data and metadata."""
543
        
544
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
545
        if user != src_account:
546
            raise NotAllowedError
547
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
548
        self._delete_object(user, src_account, src_container, src_name)
549
        return dest_version_id
550
    
551
    def _delete_object(self, user, account, container, name, until=None):
552
        if user != account:
553
            raise NotAllowedError
554
        
555
        if until is not None:
556
            path = '/'.join((account, container, name))
557
            node = self.node.node_lookup(path)
558
            if node is None:
559
                return
560
            versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
561
            versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
562
            for v in versions:
563
                self.mapper.map_remv(v)
564
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
565
            try:
566
                props = self._get_version(node)
567
            except NameError:
568
                pass
569
            else:
570
                self.permissions.access_clear(path)
571
            return
572
        
573
        path, node = self._lookup_object(account, container, name)
574
        self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
575
        self.permissions.access_clear(path)
576
    
577
    @backend_method
578
    def delete_object(self, user, account, container, name, until=None):
579
        """Delete/purge an object."""
580
        
581
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
582
        self._delete_object(user, account, container, name, until)
583
    
584
    @backend_method
585
    def list_versions(self, user, account, container, name):
586
        """Return a list of all (version, version_timestamp) tuples for an object."""
587
        
588
        logger.debug("list_versions: %s %s %s", account, container, name)
589
        self._can_read(user, account, container, name)
590
        path, node = self._lookup_object(account, container, name)
591
        versions = self.node.node_get_versions(node)
592
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
593
    
594
    @backend_method(autocommit=0)
595
    def get_block(self, hash):
596
        """Return a block's data."""
597
        
598
        logger.debug("get_block: %s", hash)
599
        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
600
        if not blocks:
601
            raise NameError('Block does not exist')
602
        return blocks[0]
603
    
604
    @backend_method(autocommit=0)
605
    def put_block(self, data):
606
        """Store a block and return the hash."""
607
        
608
        logger.debug("put_block: %s", len(data))
609
        hashes, absent = self.blocker.block_stor((data,))
610
        return binascii.hexlify(hashes[0])
611
    
612
    @backend_method(autocommit=0)
613
    def update_block(self, hash, data, offset=0):
614
        """Update a known block and return the hash."""
615
        
616
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
617
        if offset == 0 and len(data) == self.block_size:
618
            return self.put_block(data)
619
        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
620
        return binascii.hexlify(h)
621
    
622
    # Path functions.
623
    
624
    def _put_object_node(self, account, container, name):
625
        path, parent = self._lookup_container(account, container)
626
        path = '/'.join((path, name))
627
        node = self.node.node_lookup(path)
628
        if node is None:
629
            node = self.node.node_create(parent, path)
630
        return path, node
631
    
632
    def _put_path(self, user, parent, path):
633
        node = self.node.node_create(parent, path)
634
        self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
635
        return node
636
    
637
    def _lookup_account(self, account, create=True):
638
        node = self.node.node_lookup(account)
639
        if node is None and create:
640
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
641
        return account, node
642
    
643
    def _lookup_container(self, account, container):
644
        path = '/'.join((account, container))
645
        node = self.node.node_lookup(path)
646
        if node is None:
647
            raise NameError('Container does not exist')
648
        return path, node
649
    
650
    def _lookup_object(self, account, container, name):
651
        path = '/'.join((account, container, name))
652
        node = self.node.node_lookup(path)
653
        if node is None:
654
            raise NameError('Object does not exist')
655
        return path, node
656
    
657
    def _get_properties(self, node, until=None):
658
        """Return properties until the timestamp given."""
659
        
660
        before = until if until is not None else inf
661
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
662
        if props is None and until is not None:
663
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
664
        if props is None:
665
            raise NameError('Path does not exist')
666
        return props
667
    
668
    def _get_statistics(self, node, until=None):
669
        """Return count, sum of size and latest timestamp of everything under node."""
670
        
671
        if until is None:
672
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
673
        else:
674
            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
675
        if stats is None:
676
            stats = (0, 0, 0)
677
        return stats
678
    
679
    def _get_version(self, node, version=None):
680
        if version is None:
681
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
682
            if props is None:
683
                raise NameError('Object does not exist')
684
        else:
685
            props = self.node.version_get_properties(version)
686
            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
687
                raise IndexError('Version does not exist')
688
        return props
689
    
690
    def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
691
        
692
        # Get source serial and size.
693
        if src_version is not None:
694
            src_props = self._get_version(src_node, src_version)
695
            src_version_id = src_props[self.SERIAL]
696
            size = src_props[self.SIZE]
697
        else:
698
            # Latest or create from scratch.
699
            try:
700
                src_props = self._get_version(src_node)
701
                src_version_id = src_props[self.SERIAL]
702
                size = src_props[self.SIZE]
703
            except NameError:
704
                src_version_id = None
705
                size = 0
706
        if dest_size is not None:
707
            size = dest_size
708
        
709
        # Move the latest version at destination to CLUSTER_HISTORY and create new.
710
        if src_node == dest_node and src_version is None and src_version_id is not None:
711
            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
712
        else:
713
            dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
714
            if dest_props is not None:
715
                self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY)
716
        dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
717
        
718
        return src_version_id, dest_version_id
719
    
720
    def _copy_data(self, src_version, dest_version):
721
        hashmap = self.mapper.map_retr(src_version)
722
        self.mapper.map_stor(dest_version, hashmap)
723
    
724
    def _get_metadata(self, version):
725
        if version is None:
726
            return {}
727
        return dict(self.node.attribute_get(version))
728
    
729
    def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
730
        """Create a new version and store metadata."""
731
        
732
        src_version_id, dest_version_id = self._copy_version(user, node, None, node)
733
        if not replace:
734
            if src_version_id is not None:
735
                self.node.attribute_copy(src_version_id, dest_version_id)
736
            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
737
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
738
        else:
739
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
740
        if copy_data and src_version_id is not None:
741
            self._copy_data(src_version_id, dest_version_id)
742
        return dest_version_id
743
    
744
    def _list_limits(self, listing, marker, limit):
745
        start = 0
746
        if marker:
747
            try:
748
                start = listing.index(marker) + 1
749
            except ValueError:
750
                pass
751
        if not limit or limit > 10000:
752
            limit = 10000
753
        return start, limit
754
    
755
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
756
        cont_prefix = path + '/'
757
        prefix = cont_prefix + prefix
758
        start = cont_prefix + marker if marker else None
759
        before = until if until is not None else inf
760
        filterq = ','.join(keys) if keys else None
761
        
762
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
763
        objects.extend([(p, None) for p in prefixes] if virtual else [])
764
        objects.sort(key=lambda x: x[0])
765
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
766
        
767
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
768
        return objects[start:start + limit]
769
    
770
    # Policy functions.
771
    
772
    def _check_policy(self, policy):
773
        for k in policy.keys():
774
            if policy[k] == '':
775
                policy[k] = self.default_policy.get(k)
776
        for k, v in policy.iteritems():
777
            if k == 'quota':
778
                q = int(v) # May raise ValueError.
779
                if q < 0:
780
                    raise ValueError
781
            elif k == 'versioning':
782
                if v not in ['auto', 'manual', 'none']:
783
                    raise ValueError
784
            else:
785
                raise ValueError
786
    
787
    # Access control functions.
788
    
789
    def _check_groups(self, groups):
790
        # raise ValueError('Bad characters in groups')
791
        pass
792
    
793
    def _check_permissions(self, path, permissions):
794
        # raise ValueError('Bad characters in permissions')
795
        
796
        # Check for existing permissions.
797
        paths = self.permissions.access_list(path)
798
        if paths:
799
            ae = AttributeError()
800
            ae.data = paths
801
            raise ae
802
    
803
    def _can_read(self, user, account, container, name):
804
        if user == account:
805
            return True
806
        path = '/'.join((account, container, name))
807
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
808
            raise NotAllowedError
809
    
810
    def _can_write(self, user, account, container, name):
811
        if user == account:
812
            return True
813
        path = '/'.join((account, container, name))
814
        if not self.permissions.access_check(path, self.WRITE, user):
815
            raise NotAllowedError
816
    
817
    def _allowed_accounts(self, user):
818
        allow = set()
819
        for path in self.permissions.access_list_paths(user):
820
            allow.add(path.split('/', 1)[0])
821
        return sorted(allow)
822
    
823
    def _allowed_containers(self, user, account):
824
        allow = set()
825
        for path in self.permissions.access_list_paths(user, account):
826
            allow.add(path.split('/', 2)[1])
827
        return sorted(allow)