Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / modular.py @ c915d3bf

History | View | Annotate | Download (35.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import os
35
import time
36
import sqlite3
37
import logging
38
import hashlib
39
import binascii
40

    
41
from base import NotAllowedError, BaseBackend
42
from lib.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
43
from lib.permissions import Permissions, READ, WRITE
44
from lib.policy import Policy
45
from lib.hashfiler import Mapper, Blocker
46

    
47
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
48

    
49
inf = float('inf')
50

    
51

    
52
logger = logging.getLogger(__name__)
53

    
54
def backend_method(func=None, autocommit=1):
55
    if func is None:
56
        def fn(func):
57
            return backend_method(func, autocommit)
58
        return fn
59

    
60
    if not autocommit:
61
        return func
62
    def fn(self, *args, **kw):
63
        self.con.execute('begin deferred')
64
        try:
65
            ret = func(self, *args, **kw)
66
            self.con.commit()
67
            return ret
68
        except:
69
            self.con.rollback()
70
            raise
71
    return fn
72

    
73

    
74
class ModularBackend(BaseBackend):
75
    """A modular backend.
76
    
77
    Uses SQLite for storage.
78
    """
79
    
80
    # TODO: Create account if not present in all functions.
81
    
82
    def __init__(self, db):
83
        self.hash_algorithm = 'sha256'
84
        self.block_size = 4 * 1024 * 1024 # 4MB
85
        
86
        self.default_policy = {'quota': 0, 'versioning': 'auto'}
87
        
88
        basepath = os.path.split(db)[0]
89
        if basepath and not os.path.exists(basepath):
90
            os.makedirs(basepath)
91
        if not os.path.isdir(basepath):
92
            raise RuntimeError("Cannot open database at '%s'" % (db,))
93
        
94
        self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)        
95
        
96
        params = {'blocksize': self.block_size,
97
                  'blockpath': basepath + '/blocks',
98
                  'hashtype': self.hash_algorithm}
99
        self.blocker = Blocker(**params)
100
        
101
        params = {'mappath': basepath + '/maps',
102
                  'namelen': self.blocker.hashlen}
103
        self.mapper = Mapper(**params)
104
        
105
        params = {'connection': self.con,
106
                  'cursor': self.con.cursor()}
107
        self.permissions = Permissions(**params)
108
        self.policy = Policy(**params)
109
        self.node = Node(**params)
110
        
111
        self.con.commit()
112
    
113
    @backend_method
114
    def list_accounts(self, user, marker=None, limit=10000):
115
        """Return a list of accounts the user can access."""
116
        
117
        allowed = self._allowed_accounts(user)
118
        start, limit = self._list_limits(allowed, marker, limit)
119
        return allowed[start:start + limit]
120
    
121
    @backend_method
122
    def get_account_meta(self, user, account, until=None):
123
        """Return a dictionary with the account metadata."""
124
        
125
        logger.debug("get_account_meta: %s %s", account, until)
126
        path, node = self._lookup_account(account, user == account)
127
        if user != account:
128
            if until or node is None or account not in self._allowed_accounts(user):
129
                raise NotAllowedError
130
        try:
131
            props = self._get_properties(node, until)
132
            version_id = props[SERIAL]
133
            mtime = props[MTIME]
134
        except NameError:
135
            # Account does not exist before until.
136
            version_id = None
137
            mtime = until
138
        object_count, bytes, tstamp = self._get_statistics(node, path, until)
139
        tstamp = max(mtime, tstamp)
140
        if until is None:
141
            modified = tstamp
142
        else:
143
            modified = self._get_statistics(node, path)[2] # Overall last modification.
144
            modified = max(mtime, modified)
145
        
146
        # Proper count.
147
        # TODO: Fix this to count until.
148
        count = self.node.node_count_children(node)
149
        object_count -= count
150
        
151
        if user != account:
152
            meta = {'name': account}
153
        else:
154
            meta = {}
155
            if version_id is not None:
156
                meta.update(dict(self.node.attribute_get(version_id)))
157
            if until is not None:
158
                meta.update({'until_timestamp': tstamp})
159
            meta.update({'name': account, 'count': count, 'bytes': bytes})
160
        meta.update({'modified': modified})
161
        return meta
162
    
163
    @backend_method
164
    def update_account_meta(self, user, account, meta, replace=False):
165
        """Update the metadata associated with the account."""
166
        
167
        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
168
        if user != account:
169
            raise NotAllowedError
170
        path, node = self._lookup_account(account, True)
171
        self._put_metadata(user, node, meta, replace, False)
172
    
173
    @backend_method
174
    def get_account_groups(self, user, account):
175
        """Return a dictionary with the user groups defined for this account."""
176
        
177
        logger.debug("get_account_groups: %s", account)
178
        if user != account:
179
            if account not in self._allowed_accounts(user):
180
                raise NotAllowedError
181
            return {}
182
        self._lookup_account(account, True)
183
        return self.permissions.group_dict(account)
184
    
185
    @backend_method
186
    def update_account_groups(self, user, account, groups, replace=False):
187
        """Update the groups associated with the account."""
188
        
189
        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
190
        if user != account:
191
            raise NotAllowedError
192
        self._lookup_account(account, True)
193
        self._check_groups(groups)
194
        if replace:
195
            self.permissions.group_destroy(account)
196
        for k, v in groups.iteritems():
197
            if not replace: # If not already deleted.
198
                self.permissions.group_delete(account, k)
199
            if v:
200
                self.permissions.group_addmany(account, k, v)
201
    
202
    @backend_method
203
    def put_account(self, user, account):
204
        """Create a new account with the given name."""
205
        
206
        logger.debug("put_account: %s", account)
207
        if user != account:
208
            raise NotAllowedError
209
        node = self.node.node_lookup(account)
210
        if node is not None:
211
            raise NameError('Account already exists')
212
        self._put_path(ROOTNODE, account)
213
    
214
    @backend_method
215
    def delete_account(self, user, account):
216
        """Delete the account with the given name."""
217
        
218
        logger.debug("delete_account: %s", account)
219
        if user != account:
220
            raise NotAllowedError
221
        node = self.node.node_lookup(account)
222
        if node is None:
223
            return
224
        if not self.node.node_remove(node):
225
            raise IndexError('Account is not empty')
226
        self.permissions.group_destroy(account)
227
    
228
#     @backend_method
229
#     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
230
#         """Return a list of containers existing under an account."""
231
#         
232
#         logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
233
#         if user != account:
234
#             if until or account not in self._allowed_accounts(user):
235
#                 raise NotAllowedError
236
#             allowed = self._allowed_containers(user, account)
237
#             start, limit = self._list_limits(allowed, marker, limit)
238
#             return allowed[start:start + limit]
239
#         else:
240
#             if shared:
241
#                 allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
242
#                 start, limit = self._list_limits(allowed, marker, limit)
243
#                 return allowed[start:start + limit]
244
#         return [x[0] for x in self._list_objects(account, '', '/', marker, limit, False, [], until)]
245
    
246
    @backend_method
247
    def get_container_meta(self, user, account, container, until=None):
248
        """Return a dictionary with the container metadata."""
249
        
250
        logger.debug("get_container_meta: %s %s %s", account, container, until)
251
        if user != account:
252
            if until or container not in self._allowed_containers(user, account):
253
                raise NotAllowedError
254
        path, node = self._lookup_container(account, container)
255
        props = self._get_properties(node, until)
256
        count, bytes, tstamp = self._get_statistics(node, path, until)
257
        tstamp = max(mtime, tstamp)
258
        if until is None:
259
            modified = tstamp
260
        else:
261
            modified = self._get_statistics(node, path)[2] # Overall last modification.
262
            modified = max(mtime, modified)
263
        
264
        if user != account:
265
            meta = {'name': container}
266
        else:
267
            meta = dict(self.node.attribute_get(props[SERIAL]))
268
            if until is not None:
269
                meta.update({'until_timestamp': tstamp})
270
            meta.update({'name': container, 'count': count, 'bytes': bytes})
271
        meta.update({'modified': modified})
272
        return meta
273
    
274
    @backend_method
275
    def update_container_meta(self, user, account, container, meta, replace=False):
276
        """Update the metadata associated with the container."""
277
        
278
        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
279
        if user != account:
280
            raise NotAllowedError
281
        path, node = self._lookup_container(account, container)
282
        self._put_metadata(user, node, meta, replace, False)
283
    
284
    @backend_method
285
    def get_container_policy(self, user, account, container):
286
        """Return a dictionary with the container policy."""
287
        
288
        logger.debug("get_container_policy: %s %s", account, container)
289
        if user != account:
290
            if container not in self._allowed_containers(user, account):
291
                raise NotAllowedError
292
            return {}
293
        path = self._lookup_container(account, container)[0]
294
        return self.policy.policy_get(path)
295
    
296
    @backend_method
297
    def update_container_policy(self, user, account, container, policy, replace=False):
298
        """Update the policy associated with the account."""
299
        
300
        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
301
        if user != account:
302
            raise NotAllowedError
303
        path = self._lookup_container(account, container)[0]
304
        self._check_policy(policy)
305
        if replace:
306
            for k, v in self.default_policy.iteritems():
307
                if k not in policy:
308
                    policy[k] = v
309
        self.policy.policy_set(path, policy)
310
    
311
    @backend_method
312
    def put_container(self, user, account, container, policy=None):
313
        """Create a new container with the given name."""
314
        
315
        logger.debug("put_container: %s %s %s", account, container, policy)
316
        if user != account:
317
            raise NotAllowedError
318
        try:
319
            path, node = self._lookup_container(account, container)
320
        except NameError:
321
            pass
322
        else:
323
            raise NameError('Container already exists')
324
        if policy:
325
            self._check_policy(policy)
326
        path = '/'.join((account, container))
327
        self._put_path(self._lookup_account(account, True)[1], path)
328
        for k, v in self.default_policy.iteritems():
329
            if k not in policy:
330
                policy[k] = v
331
        self.policy.policy_set(path, policy)
332
    
333
    @backend_method
334
    def delete_container(self, user, account, container, until=None):
335
        """Delete/purge the container with the given name."""
336
        
337
        logger.debug("delete_container: %s %s %s", account, container, until)
338
        if user != account:
339
            raise NotAllowedError
340
        path, node = self._lookup_container(account, container)
341
        
342
        if until is not None:
343
            versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
344
            for v in versions:
345
                self.mapper.map_remv(v)
346
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
347
            return
348
        
349
        if self._get_statistics(node, path)[0] > 0:
350
            raise IndexError('Container is not empty')
351
        versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
352
        for v in versions:
353
            self.mapper.map_remv(v)
354
        self.node.node_purge_children(node, until, CLUSTER_DELETED)
355
        self.node.node_remove(node)
356
        self.policy.policy_unset(path)
357
    
358
#     @backend_method
359
#     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
360
#         """Return a list of objects existing under a container."""
361
#         
362
#         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
363
#         allowed = []
364
#         if user != account:
365
#             if until:
366
#                 raise NotAllowedError
367
#             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
368
#             if not allowed:
369
#                 raise NotAllowedError
370
#         else:
371
#             if shared:
372
#                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
373
#         path, version_id, mtime = self._get_containerinfo(account, container, until)
374
#         return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
375
    
376
#     @backend_method
377
#     def list_object_meta(self, user, account, container, until=None):
378
#         """Return a list with all the container's object meta keys."""
379
#         
380
#         logger.debug("list_object_meta: %s %s %s", account, container, until)
381
#         allowed = []
382
#         if user != account:
383
#             if until:
384
#                 raise NotAllowedError
385
#             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
386
#             if not allowed:
387
#                 raise NotAllowedError
388
#         path, version_id, mtime = self._get_containerinfo(account, container, until)
389
#         sql = '''select distinct m.key from (%s) o, metadata m
390
#                     where m.version_id = o.version_id and o.name like ?'''
391
#         sql = sql % self._sql_until(until)
392
#         param = (path + '/%',)
393
#         if allowed:
394
#             for x in allowed:
395
#                 sql += ' and o.name like ?'
396
#                 param += (x,)
397
#         c = self.con.execute(sql, param)
398
#         return [x[0] for x in c.fetchall()]
399
    
400
    @backend_method
401
    def get_object_meta(self, user, account, container, name, version=None):
402
        """Return a dictionary with the object metadata."""
403
        
404
        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
405
        self._can_read(user, account, container, name)
406
        path, node = self._lookup_object(account, container, name)
407
        props = self._get_version(node, version)
408
        if version is None:
409
            modified = props[MTIME]
410
        else:
411
            # TODO: Use latest version if stop keeping statistics for leaves.
412
            modified = self._get_statistics(node, path)[2] # Overall last modification.
413
        
414
        meta = dict(self.node.attribute_get(props[SERIAL]))
415
        meta.update({'name': name, 'bytes': props[SIZE]})
416
        meta.update({'version': props[SERIAL], 'version_timestamp': props[MTIME]})
417
        meta.update({'modified': modified, 'modified_by': props[MUSER]})
418
        return meta
419
    
420
    @backend_method
421
    def update_object_meta(self, user, account, container, name, meta, replace=False):
422
        """Update the metadata associated with the object."""
423
        
424
        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
425
        self._can_write(user, account, container, name)
426
        path, node = self._lookup_object(account, container, name)
427
        self._put_metadata(user, node, meta, replace)
428
    
429
    @backend_method
430
    def get_object_permissions(self, user, account, container, name):
431
        """Return the path from which this object gets its permissions from,\
432
        along with a dictionary containing the permissions."""
433
        
434
        logger.debug("get_object_permissions: %s %s %s", account, container, name)
435
        self._can_read(user, account, container, name)
436
        path = self._lookup_object(account, container, name)[0]
437
        return self.permissions.access_inherit(path)
438
    
439
    @backend_method
440
    def update_object_permissions(self, user, account, container, name, permissions):
441
        """Update the permissions associated with the object."""
442
        
443
        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
444
        if user != account:
445
            raise NotAllowedError
446
        path = self._lookup_object(account, container, name)[0]
447
        self._check_permissions(path, permissions)
448
        self.permissions.access_set(path, permissions)
449
    
450
    @backend_method
451
    def get_object_public(self, user, account, container, name):
452
        """Return the public URL of the object if applicable."""
453
        
454
        logger.debug("get_object_public: %s %s %s", account, container, name)
455
        self._can_read(user, account, container, name)
456
        path = self._lookup_object(account, container, name)[0]
457
        if self.permissions.public_check(path):
458
            return '/public/' + path
459
        return None
460
    
461
    @backend_method
462
    def update_object_public(self, user, account, container, name, public):
463
        """Update the public status of the object."""
464
        
465
        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
466
        self._can_write(user, account, container, name)
467
        path = self._lookup_object(account, container, name)[0]
468
        if not public:
469
            self.permissions.public_unset(path)
470
        else:
471
            self.permissions.public_set(path)
472
    
473
    @backend_method
474
    def get_object_hashmap(self, user, account, container, name, version=None):
475
        """Return the object's size and a list with partial hashes."""
476
        
477
        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
478
        self._can_read(user, account, container, name)
479
        path, node = self._lookup_object(account, container, name)
480
        props = self._get_version(node, version)
481
        hashmap = self.mapper.map_retr(props[SERIAL])
482
        return props[SIZE], [binascii.hexlify(x) for x in hashmap]
483
    
484
    @backend_method
485
    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
486
        """Create/update an object with the specified size and partial hashes."""
487
        
488
        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
489
        if permissions is not None and user != account:
490
            raise NotAllowedError
491
        self._can_write(user, account, container, name)
492
        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
493
        if missing:
494
            ie = IndexError()
495
            ie.data = missing
496
            raise ie
497
        if permissions is not None:
498
            self._check_permissions(path, permissions)
499
        path, node = self._put_object_path(account, container, name)
500
        src_version_id, dest_version_id = self._copy_version(user, node, node, not replace_meta, False)
501
        # TODO: Set size.
502
#         sql = 'update versions set size = ? where version_id = ?'
503
#         self.con.execute(sql, (size, dest_version_id))
504
        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
505
        # TODO: Check if can update meta with empty values.
506
        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
507
        if permissions is not None:
508
            self.permissions.access_set(path, permissions)
509
    
510
    @backend_method
511
    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
512
        """Copy an object's data and metadata."""
513
        
514
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
515
        if permissions is not None and user != account:
516
            raise NotAllowedError
517
        self._can_read(user, account, src_container, src_name)
518
        self._can_write(user, account, dest_container, dest_name)
519
        src_path, src_node = self._lookup_object(account, src_container, src_name)
520
        src_props = self._get_version(src_node)
521
        if permissions is not None:
522
            self._check_permissions(dest_path, permissions)
523
        dest_path, dest_node = self._put_object_path(account, dest_container, dest_name)
524
        src_version_id, dest_version_id = self._copy_version(user, src_node, dest_node, not replace_meta, True, src_version)
525
        # TODO: Check if can update meta with empty values.
526
        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
527
        if permissions is not None:
528
            self.permissions.access_set(dest_path, permissions)
529
    
530
    @backend_method
531
    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
532
        """Move an object's data and metadata."""
533
        
534
        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
535
        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
536
        self.delete_object(user, account, src_container, src_name)
537
    
538
    @backend_method
539
    def delete_object(self, user, account, container, name, until=None):
540
        """Delete/purge an object."""
541
        
542
        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
543
        if user != account:
544
            raise NotAllowedError
545
        
546
        if until is not None:
547
            path = '/'.join((account, container, name))
548
            node = self.node.node_lookup(path)
549
            if node is None:
550
                return
551
            versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
552
            versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
553
            for v in versions:
554
                self.mapper.map_remv(v)
555
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
556
            try:
557
                props = self._get_version(node)
558
            except NameError:
559
                pass
560
            else:
561
                self.permissions.access_clear(path)
562
            return
563
        
564
        path, node = self._lookup_object(account, container, name)
565
        self._copy_version(user, node, node, False, False, None, CLUSTER_DELETED)
566
        self.permissions.access_clear(path)
567
    
568
#     @backend_method
569
#     def list_versions(self, user, account, container, name):
570
#         """Return a list of all (version, version_timestamp) tuples for an object."""
571
#         
572
#         logger.debug("list_versions: %s %s %s", account, container, name)
573
#         self._can_read(user, account, container, name)
574
#         # This will even show deleted versions.
575
#         path = '/'.join((account, container, name))
576
#         sql = '''select distinct version_id, tstamp from versions where name = ? and hide = 0'''
577
#         c = self.con.execute(sql, (path,))
578
#         return [(int(x[0]), int(x[1])) for x in c.fetchall()]
579
    
580
    @backend_method(autocommit=0)
581
    def get_block(self, hash):
582
        """Return a block's data."""
583
        
584
        logger.debug("get_block: %s", hash)
585
        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
586
        if not blocks:
587
            raise NameError('Block does not exist')
588
        return blocks[0]
589
    
590
    @backend_method(autocommit=0)
591
    def put_block(self, data):
592
        """Create a block and return the hash."""
593
        
594
        logger.debug("put_block: %s", len(data))
595
        hashes, absent = self.blocker.block_stor((data,))
596
        return binascii.hexlify(hashes[0])
597
    
598
    @backend_method(autocommit=0)
599
    def update_block(self, hash, data, offset=0):
600
        """Update a known block and return the hash."""
601
        
602
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
603
        if offset == 0 and len(data) == self.block_size:
604
            return self.put_block(data)
605
        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
606
        return binascii.hexlify(h)
607
    
608
    def _check_policy(self, policy):
609
        for k in policy.keys():
610
            if policy[k] == '':
611
                policy[k] = self.default_policy.get(k)
612
        for k, v in policy.iteritems():
613
            if k == 'quota':
614
                q = int(v) # May raise ValueError.
615
                if q < 0:
616
                    raise ValueError
617
            elif k == 'versioning':
618
                if v not in ['auto', 'manual', 'none']:
619
                    raise ValueError
620
            else:
621
                raise ValueError
622
    
623
    def _list_limits(self, listing, marker, limit):
624
        start = 0
625
        if marker:
626
            try:
627
                start = listing.index(marker) + 1
628
            except ValueError:
629
                pass
630
        if not limit or limit > 10000:
631
            limit = 10000
632
        return start, limit
633
    
634
#     def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
635
#         cont_prefix = path + '/'
636
#         if keys and len(keys) > 0:
637
#             sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
638
#                         m.version_id = o.version_id and m.key in (%s)'''
639
#             sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
640
#             param = (cont_prefix + prefix + '%',) + tuple(keys)
641
#             if allowed:
642
#                 sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')'
643
#                 param += tuple([x + '%' for x in allowed])
644
#             sql += ' order by o.name'
645
#         else:
646
#             sql = 'select name, version_id from (%s) where name like ?'
647
#             sql = sql % self._sql_until(until)
648
#             param = (cont_prefix + prefix + '%',)
649
#             if allowed:
650
#                 sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')'
651
#                 param += tuple([x + '%' for x in allowed])
652
#             sql += ' order by name'
653
#         c = self.con.execute(sql, param)
654
#         objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
655
#         if delimiter:
656
#             pseudo_objects = []
657
#             for x in objects:
658
#                 pseudo_name = x[0]
659
#                 i = pseudo_name.find(delimiter, len(prefix))
660
#                 if not virtual:
661
#                     # If the delimiter is not found, or the name ends
662
#                     # with the delimiter's first occurence.
663
#                     if i == -1 or len(pseudo_name) == i + len(delimiter):
664
#                         pseudo_objects.append(x)
665
#                 else:
666
#                     # If the delimiter is found, keep up to (and including) the delimiter.
667
#                     if i != -1:
668
#                         pseudo_name = pseudo_name[:i + len(delimiter)]
669
#                     if pseudo_name not in [y[0] for y in pseudo_objects]:
670
#                         if pseudo_name == x[0]:
671
#                             pseudo_objects.append(x)
672
#                         else:
673
#                             pseudo_objects.append((pseudo_name, None))
674
#             objects = pseudo_objects
675
#         
676
#         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
677
#         return objects[start:start + limit]
678
    
679
    # Path functions.
680
    
681
    def _put_path(self, parent, path):
682
        node = self.node.node_create(parent, path)
683
        self.node.version_create(node, 0, None, path, CLUSTER_NORMAL)
684
    
685
    def _put_object_path(self, account, container, name):
686
        path, parent = self._lookup_container(account, container)
687
        path = '/'.join((path, name))
688
        node = self.node.node_lookup(path)
689
        if node is None:
690
            node = self.node.node_create(parent, path)
691
        return path, node
692
    
693
    def _lookup_account(self, account, create=True):
694
        node = self.node.node_lookup(account)
695
        if node is None and create:
696
            self._put_path(ROOTNODE, account)
697
        return account, node
698
    
699
    def _lookup_container(self, account, container):
700
        path = '/'.join((account, container))
701
        node = self.node.node_lookup(path)
702
        if node is None:
703
            raise NameError('Container does not exist')
704
        return path, node
705
    
706
    def _lookup_object(self, account, container, name):
707
        path = '/'.join((account, container, name))
708
        node = self.node.node_lookup(path)
709
        if node is None:
710
            raise NameError('Object does not exist')
711
        return path, node
712
    
713
    def _get_properties(self, node, until=None):
714
        """Return properties until the timestamp given."""
715
        
716
        before = until if until is not None else inf
717
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
718
        # TODO: Do one lookup.
719
        if props is None and until is not None:
720
            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
721
        if props is None:
722
            raise NameError('Path does not exist')
723
        return props
724
    
725
    def _get_statistics(self, node, path, until=None):
726
        """Return count, sum of size and latest timestamp of everything under node/path."""
727
        
728
        # TODO: Remove this function.
729
        
730
        if until is None:
731
            return self.node.node_statistics(node, CLUSTER_NORMAL)
732
        else:
733
            return self.node.path_statistics(path + '/', until, CLUSTER_DELETED)
734
    
735
    def _get_version(self, node, version=None):
736
        if version is None:
737
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
738
            if props is None:
739
                raise NameError('Object does not exist')
740
        else:
741
            props = self.node.version_get_properties(version)
742
            if props is None or props[CLUSTER] == CLUSTER_DELETE:
743
                raise IndexError('Version does not exist')
744
        return props
745
    
746
    def _copy_version(self, user, src_node, dest_node, copy_meta=True, copy_data=True, src_version=None, dest_cluster=CLUSTER_NORMAL):
747
        
748
        # TODO: Break this into two functions - one that creates and one that copies.
749
        
750
        # Get source serial and size.
751
        if src_version is not None:
752
            src_props = self._get_version(src_node, src_version)
753
            src_version_id = src_props[SERIAL]
754
            size = src_props[SIZE]
755
        else:
756
            # Latest or create from scratch.
757
            try:
758
                src_props = self._get_version(src_node)
759
                src_version_id = src_props[SERIAL]
760
                size = src_props[SIZE]
761
            except NameError:
762
                src_version_id = None
763
                size = 0
764
        if not copy_data:
765
            size = 0
766
        
767
        # Move the latest version at destination to CLUSTER_HISTORY and create new.
768
        if src_node == dest_node and src_version is None and src_version_id is not None:
769
            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
770
        else:
771
            dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
772
            if dest_props is not None:
773
                self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY)
774
        dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
775
        
776
        # Copy meta and data.
777
        if copy_meta and src_version_id is not None:
778
            self.attribute_copy(src_version_id, dest_version_id)
779
        if copy_data and src_version_id is not None:
780
            hashmap = self.mapper.map_retr(src_version_id)
781
            self.mapper.map_stor(dest_version_id, hashmap)
782
        
783
        return src_version_id, dest_version_id
784
    
785
    def _get_metadata(self, version):
786
        if version is None:
787
            return {}
788
        return dict(self.node.attribute_get(version))
789
    
790
    def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
791
        """Create a new version and store metadata."""
792
        
793
        src_version_id, dest_version_id = self._copy_version(user, node, node, not replace, copy_data)
794
        if not replace:
795
            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
796
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
797
        else:
798
            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
799
    
800
    # Access control functions.
801
    
802
    def _check_groups(self, groups):
803
        # raise ValueError('Bad characters in groups')
804
        pass
805
    
806
    def _check_permissions(self, path, permissions):
807
        # raise ValueError('Bad characters in permissions')
808
        
809
        # Check for existing permissions.
810
        paths = self.permissions.access_list(path)
811
        if paths:
812
            ae = AttributeError()
813
            ae.data = paths
814
            raise ae
815
    
816
    def _can_read(self, user, account, container, name):
817
        if user == account:
818
            return True
819
        path = '/'.join((account, container, name))
820
        if not self.permissions.access_check(path, READ, user) and not self.permissions.access_check(path, WRITE, user):
821
            raise NotAllowedError
822
    
823
    def _can_write(self, user, account, container, name):
824
        if user == account:
825
            return True
826
        path = '/'.join((account, container, name))
827
        if not self.permissions.access_check(path, WRITE, user):
828
            raise NotAllowedError
829
    
830
    def _allowed_accounts(self, user):
831
        allow = set()
832
        for path in self.permissions.access_list_paths(user):
833
            allow.add(path.split('/', 1)[0])
834
        return sorted(allow)
835
    
836
    def _allowed_containers(self, user, account):
837
        allow = set()
838
        for path in self.permissions.access_list_paths(user, account):
839
            allow.add(path.split('/', 2)[1])
840
        return sorted(allow)