Merge branch 'master' of https://code.grnet.gr/git/pithos
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import sqlite3
38 import logging
39 import hashlib
40 import binascii
41
42 from base import NotAllowedError, BaseBackend
43 from lib.hashfiler import Mapper, Blocker
44
45 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
46
47 inf = float('inf')
48
49
50 logger = logging.getLogger(__name__)
51
52 def backend_method(func=None, autocommit=1):
53     if func is None:
54         def fn(func):
55             return backend_method(func, autocommit)
56         return fn
57
58     if not autocommit:
59         return func
60     def fn(self, *args, **kw):
61         self.wrapper.execute()
62         try:
63             ret = func(self, *args, **kw)
64             self.wrapper.commit()
65             return ret
66         except:
67             self.wrapper.rollback()
68             raise
69     return fn
70
71
72 class ModularBackend(BaseBackend):
73     """A modular backend.
74     
75     Uses modules for SQL functions and hashfiler for storage.
76     """
77     
78     def __init__(self, mod, path, db):
79         self.hash_algorithm = 'sha256'
80         self.block_size = 4 * 1024 * 1024 # 4MB
81         
82         self.default_policy = {'quota': 0, 'versioning': 'auto'}
83         
84         if path and not os.path.exists(path):
85             os.makedirs(path)
86         if not os.path.isdir(path):
87             raise RuntimeError("Cannot open path '%s'" % (path,))
88         
89         __import__(mod)
90         self.mod = sys.modules[mod]
91         self.wrapper = self.mod.dbwrapper.DBWrapper(db)
92         
93         params = {'blocksize': self.block_size,
94                   'blockpath': os.path.join(path + '/blocks'),
95                   'hashtype': self.hash_algorithm}
96         self.blocker = Blocker(**params)
97         
98         params = {'mappath': os.path.join(path + '/maps'),
99                   'namelen': self.blocker.hashlen}
100         self.mapper = Mapper(**params)
101         
102         params = {'connection': self.wrapper.conn,
103                   'cursor': self.wrapper.conn.cursor()}
104         self.permissions = self.mod.permissions.Permissions(**params)
105         for x in ['READ', 'WRITE']:
106             setattr(self, x, getattr(self.mod.permissions, x))
107         self.policy = self.mod.policy.Policy(**params)
108         self.node = self.mod.node.Node(**params)
109         for x in ['ROOTNODE', 'SERIAL', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
110             setattr(self, x, getattr(self.mod.node, x))
111     
112     @backend_method
113     def list_accounts(self, user, marker=None, limit=10000):
114         """Return a list of accounts the user can access."""
115         
116         logger.debug("list_accounts: %s %s %s", user, marker, limit)
117         allowed = self._allowed_accounts(user)
118         start, limit = self._list_limits(allowed, marker, limit)
119         return allowed[start:start + limit]
120     
121     @backend_method
122     def get_account_meta(self, user, account, until=None):
123         """Return a dictionary with the account metadata."""
124         
125         logger.debug("get_account_meta: %s %s", account, until)
126         path, node = self._lookup_account(account, user == account)
127         if user != account:
128             if until or node is None or account not in self._allowed_accounts(user):
129                 raise NotAllowedError
130         try:
131             props = self._get_properties(node, until)
132             mtime = props[self.MTIME]
133         except NameError:
134             props = None
135             mtime = until
136         count, bytes, tstamp = self._get_statistics(node, until)
137         tstamp = max(tstamp, mtime)
138         if until is None:
139             modified = tstamp
140         else:
141             modified = self._get_statistics(node)[2] # Overall last modification.
142             modified = max(modified, mtime)
143         
144         if user != account:
145             meta = {'name': account}
146         else:
147             meta = {}
148             if props is not None:
149                 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
150             if until is not None:
151                 meta.update({'until_timestamp': tstamp})
152             meta.update({'name': account, 'count': count, 'bytes': bytes})
153         meta.update({'modified': modified})
154         return meta
155     
156     @backend_method
157     def update_account_meta(self, user, account, meta, replace=False):
158         """Update the metadata associated with the account."""
159         
160         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
161         if user != account:
162             raise NotAllowedError
163         path, node = self._lookup_account(account, True)
164         self._put_metadata(user, node, meta, replace, False)
165     
166     @backend_method
167     def get_account_groups(self, user, account):
168         """Return a dictionary with the user groups defined for this account."""
169         
170         logger.debug("get_account_groups: %s", account)
171         if user != account:
172             if account not in self._allowed_accounts(user):
173                 raise NotAllowedError
174             return {}
175         self._lookup_account(account, True)
176         return self.permissions.group_dict(account)
177     
178     @backend_method
179     def update_account_groups(self, user, account, groups, replace=False):
180         """Update the groups associated with the account."""
181         
182         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
183         if user != account:
184             raise NotAllowedError
185         self._lookup_account(account, True)
186         self._check_groups(groups)
187         if replace:
188             self.permissions.group_destroy(account)
189         for k, v in groups.iteritems():
190             if not replace: # If not already deleted.
191                 self.permissions.group_delete(account, k)
192             if v:
193                 self.permissions.group_addmany(account, k, v)
194     
195     @backend_method
196     def put_account(self, user, account):
197         """Create a new account with the given name."""
198         
199         logger.debug("put_account: %s", account)
200         if user != account:
201             raise NotAllowedError
202         node = self.node.node_lookup(account)
203         if node is not None:
204             raise NameError('Account already exists')
205         self._put_path(user, self.ROOTNODE, account)
206     
207     @backend_method
208     def delete_account(self, user, account):
209         """Delete the account with the given name."""
210         
211         logger.debug("delete_account: %s", account)
212         if user != account:
213             raise NotAllowedError
214         node = self.node.node_lookup(account)
215         if node is None:
216             return
217         if not self.node.node_remove(node):
218             raise IndexError('Account is not empty')
219         self.permissions.group_destroy(account)
220     
221     @backend_method
222     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
223         """Return a list of containers existing under an account."""
224         
225         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
226         if user != account:
227             if until or account not in self._allowed_accounts(user):
228                 raise NotAllowedError
229             allowed = self._allowed_containers(user, account)
230             start, limit = self._list_limits(allowed, marker, limit)
231             return allowed[start:start + limit]
232         if shared:
233             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
234             start, limit = self._list_limits(allowed, marker, limit)
235             return allowed[start:start + limit]
236         node = self.node.node_lookup(account)
237         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
238     
239     @backend_method
240     def get_container_meta(self, user, account, container, until=None):
241         """Return a dictionary with the container metadata."""
242         
243         logger.debug("get_container_meta: %s %s %s", account, container, until)
244         if user != account:
245             if until or container not in self._allowed_containers(user, account):
246                 raise NotAllowedError
247         path, node = self._lookup_container(account, container)
248         props = self._get_properties(node, until)
249         mtime = props[self.MTIME]
250         count, bytes, tstamp = self._get_statistics(node, until)
251         tstamp = max(tstamp, mtime)
252         if until is None:
253             modified = tstamp
254         else:
255             modified = self._get_statistics(node)[2] # Overall last modification.
256             modified = max(modified, mtime)
257         
258         if user != account:
259             meta = {'name': container}
260         else:
261             meta = dict(self.node.attribute_get(props[self.SERIAL]))
262             if until is not None:
263                 meta.update({'until_timestamp': tstamp})
264             meta.update({'name': container, 'count': count, 'bytes': bytes})
265         meta.update({'modified': modified})
266         return meta
267     
268     @backend_method
269     def update_container_meta(self, user, account, container, meta, replace=False):
270         """Update the metadata associated with the container."""
271         
272         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
273         if user != account:
274             raise NotAllowedError
275         path, node = self._lookup_container(account, container)
276         self._put_metadata(user, node, meta, replace, False)
277     
278     @backend_method
279     def get_container_policy(self, user, account, container):
280         """Return a dictionary with the container policy."""
281         
282         logger.debug("get_container_policy: %s %s", account, container)
283         if user != account:
284             if container not in self._allowed_containers(user, account):
285                 raise NotAllowedError
286             return {}
287         path = self._lookup_container(account, container)[0]
288         return self.policy.policy_get(path)
289     
290     @backend_method
291     def update_container_policy(self, user, account, container, policy, replace=False):
292         """Update the policy associated with the account."""
293         
294         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
295         if user != account:
296             raise NotAllowedError
297         path = self._lookup_container(account, container)[0]
298         self._check_policy(policy)
299         if replace:
300             for k, v in self.default_policy.iteritems():
301                 if k not in policy:
302                     policy[k] = v
303         self.policy.policy_set(path, policy)
304     
305     @backend_method
306     def put_container(self, user, account, container, policy=None):
307         """Create a new container with the given name."""
308         
309         logger.debug("put_container: %s %s %s", account, container, policy)
310         if user != account:
311             raise NotAllowedError
312         try:
313             path, node = self._lookup_container(account, container)
314         except NameError:
315             pass
316         else:
317             raise NameError('Container already exists')
318         if policy:
319             self._check_policy(policy)
320         path = '/'.join((account, container))
321         self._put_path(user, self._lookup_account(account, True)[1], path)
322         for k, v in self.default_policy.iteritems():
323             if k not in policy:
324                 policy[k] = v
325         self.policy.policy_set(path, policy)
326     
327     @backend_method
328     def delete_container(self, user, account, container, until=None):
329         """Delete/purge the container with the given name."""
330         
331         logger.debug("delete_container: %s %s %s", account, container, until)
332         if user != account:
333             raise NotAllowedError
334         path, node = self._lookup_container(account, container)
335         
336         if until is not None:
337             versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
338             for v in versions:
339                 self.mapper.map_remv(v)
340             self.node.node_purge_children(node, until, CLUSTER_DELETED)
341             return
342         
343         if self._get_statistics(node)[0] > 0:
344             raise IndexError('Container is not empty')
345         versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
346         for v in versions:
347             self.mapper.map_remv(v)
348         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
349         self.node.node_remove(node)
350         self.policy.policy_unset(path)
351     
352     @backend_method
353     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
354         """Return a list of objects existing under a container."""
355         
356         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
357         allowed = []
358         if user != account:
359             if until:
360                 raise NotAllowedError
361             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
362             if not allowed:
363                 raise NotAllowedError
364         else:
365             if shared:
366                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
367                 if not allowed:
368                     return []
369         path, node = self._lookup_container(account, container)
370         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
371     
372     @backend_method
373     def list_object_meta(self, user, account, container, until=None):
374         """Return a list with all the container's object meta keys."""
375         
376         logger.debug("list_object_meta: %s %s %s", account, container, until)
377         allowed = []
378         if user != account:
379             if until:
380                 raise NotAllowedError
381             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
382             if not allowed:
383                 raise NotAllowedError
384         path, node = self._lookup_container(account, container)
385         before = until if until is not None else inf
386         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
387     
388     @backend_method
389     def get_object_meta(self, user, account, container, name, version=None):
390         """Return a dictionary with the object metadata."""
391         
392         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
393         self._can_read(user, account, container, name)
394         path, node = self._lookup_object(account, container, name)
395         props = self._get_version(node, version)
396         if version is None:
397             modified = props[self.MTIME]
398         else:
399             modified = self._get_version(node)[self.MTIME] # Overall last modification.
400         
401         meta = dict(self.node.attribute_get(props[self.SERIAL]))
402         meta.update({'name': name, 'bytes': props[self.SIZE]})
403         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
404         meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
405         return meta
406     
407     @backend_method
408     def update_object_meta(self, user, account, container, name, meta, replace=False):
409         """Update the metadata associated with the object."""
410         
411         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
412         self._can_write(user, account, container, name)
413         path, node = self._lookup_object(account, container, name)
414         return self._put_metadata(user, node, meta, replace)
415     
416     @backend_method
417     def get_object_permissions(self, user, account, container, name):
418         """Return the path from which this object gets its permissions from,\
419         along with a dictionary containing the permissions."""
420         
421         logger.debug("get_object_permissions: %s %s %s", account, container, name)
422         self._can_read(user, account, container, name)
423         path = self._lookup_object(account, container, name)[0]
424         return self.permissions.access_inherit(path)
425     
426     @backend_method
427     def update_object_permissions(self, user, account, container, name, permissions):
428         """Update the permissions associated with the object."""
429         
430         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
431         if user != account:
432             raise NotAllowedError
433         path = self._lookup_object(account, container, name)[0]
434         self._check_permissions(path, permissions)
435         self.permissions.access_set(path, permissions)
436     
437     @backend_method
438     def get_object_public(self, user, account, container, name):
439         """Return the public URL of the object if applicable."""
440         
441         logger.debug("get_object_public: %s %s %s", account, container, name)
442         self._can_read(user, account, container, name)
443         path = self._lookup_object(account, container, name)[0]
444         if self.permissions.public_check(path):
445             return '/public/' + path
446         return None
447     
448     @backend_method
449     def update_object_public(self, user, account, container, name, public):
450         """Update the public status of the object."""
451         
452         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
453         self._can_write(user, account, container, name)
454         path = self._lookup_object(account, container, name)[0]
455         if not public:
456             self.permissions.public_unset(path)
457         else:
458             self.permissions.public_set(path)
459     
460     @backend_method
461     def get_object_hashmap(self, user, account, container, name, version=None):
462         """Return the object's size and a list with partial hashes."""
463         
464         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
465         self._can_read(user, account, container, name)
466         path, node = self._lookup_object(account, container, name)
467         props = self._get_version(node, version)
468         hashmap = self.mapper.map_retr(props[self.SERIAL])
469         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
470     
471     @backend_method
472     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
473         """Create/update an object with the specified size and partial hashes."""
474         
475         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
476         if permissions is not None and user != account:
477             raise NotAllowedError
478         self._can_write(user, account, container, name)
479         missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
480         if missing:
481             ie = IndexError()
482             ie.data = [binascii.hexlify(x) for x in missing]
483             raise ie
484         path = '/'.join((account, container, name))
485         if permissions is not None:
486             self._check_permissions(path, permissions)
487         path, node = self._put_object_node(account, container, name)
488         src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
489         self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
490         if not replace_meta and src_version_id is not None:
491             self.node.attribute_copy(src_version_id, dest_version_id)
492         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
493         if permissions is not None:
494             self.permissions.access_set(path, permissions)
495         return dest_version_id
496     
497     @backend_method
498     def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
499         """Copy an object's data and metadata."""
500         
501         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
502         if permissions is not None and user != account:
503             raise NotAllowedError
504         self._can_read(user, account, src_container, src_name)
505         self._can_write(user, account, dest_container, dest_name)
506         src_path, src_node = self._lookup_object(account, src_container, src_name)
507         if permissions is not None:
508             self._check_permissions(dest_path, permissions)
509         dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
510         src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
511         if src_version_id is not None:
512             self._copy_data(src_version_id, dest_version_id)
513         if not replace_meta and src_version_id is not None:
514             self.node.attribute_copy(src_version_id, dest_version_id)
515         self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
516         if permissions is not None:
517             dest_path = '/'.join((account, container, name))
518             self.permissions.access_set(dest_path, permissions)
519         return dest_version_id
520     
521     @backend_method
522     def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
523         """Move an object's data and metadata."""
524         
525         logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
526         dest_version_id = self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
527         self.delete_object(user, account, src_container, src_name)
528         return dest_version_id
529     
530     @backend_method
531     def delete_object(self, user, account, container, name, until=None):
532         """Delete/purge an object."""
533         
534         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
535         if user != account:
536             raise NotAllowedError
537         
538         if until is not None:
539             path = '/'.join((account, container, name))
540             node = self.node.node_lookup(path)
541             if node is None:
542                 return
543             versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
544             versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
545             for v in versions:
546                 self.mapper.map_remv(v)
547             self.node.node_purge_children(node, until, CLUSTER_DELETED)
548             try:
549                 props = self._get_version(node)
550             except NameError:
551                 pass
552             else:
553                 self.permissions.access_clear(path)
554             return
555         
556         path, node = self._lookup_object(account, container, name)
557         self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
558         self.permissions.access_clear(path)
559     
560     @backend_method
561     def list_versions(self, user, account, container, name):
562         """Return a list of all (version, version_timestamp) tuples for an object."""
563         
564         logger.debug("list_versions: %s %s %s", account, container, name)
565         self._can_read(user, account, container, name)
566         path, node = self._lookup_object(account, container, name)
567         return self.node.node_get_versions(node, ['serial', 'mtime'])
568     
569     @backend_method(autocommit=0)
570     def get_block(self, hash):
571         """Return a block's data."""
572         
573         logger.debug("get_block: %s", hash)
574         blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
575         if not blocks:
576             raise NameError('Block does not exist')
577         return blocks[0]
578     
579     @backend_method(autocommit=0)
580     def put_block(self, data):
581         """Store a block and return the hash."""
582         
583         logger.debug("put_block: %s", len(data))
584         hashes, absent = self.blocker.block_stor((data,))
585         return binascii.hexlify(hashes[0])
586     
587     @backend_method(autocommit=0)
588     def update_block(self, hash, data, offset=0):
589         """Update a known block and return the hash."""
590         
591         logger.debug("update_block: %s %s %s", hash, len(data), offset)
592         if offset == 0 and len(data) == self.block_size:
593             return self.put_block(data)
594         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
595         return binascii.hexlify(h)
596     
597     # Path functions.
598     
599     def _put_object_node(self, account, container, name):
600         path, parent = self._lookup_container(account, container)
601         path = '/'.join((path, name))
602         node = self.node.node_lookup(path)
603         if node is None:
604             node = self.node.node_create(parent, path)
605         return path, node
606     
607     def _put_path(self, user, parent, path):
608         node = self.node.node_create(parent, path)
609         self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
610         return node
611     
612     def _lookup_account(self, account, create=True):
613         node = self.node.node_lookup(account)
614         if node is None and create:
615             node = self._put_path(account, self.ROOTNODE, account) # User is account.
616         return account, node
617     
618     def _lookup_container(self, account, container):
619         path = '/'.join((account, container))
620         node = self.node.node_lookup(path)
621         if node is None:
622             raise NameError('Container does not exist')
623         return path, node
624     
625     def _lookup_object(self, account, container, name):
626         path = '/'.join((account, container, name))
627         node = self.node.node_lookup(path)
628         if node is None:
629             raise NameError('Object does not exist')
630         return path, node
631     
632     def _get_properties(self, node, until=None):
633         """Return properties until the timestamp given."""
634         
635         before = until if until is not None else inf
636         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
637         if props is None and until is not None:
638             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
639         if props is None:
640             raise NameError('Path does not exist')
641         return props
642     
643     def _get_statistics(self, node, until=None):
644         """Return count, sum of size and latest timestamp of everything under node."""
645         
646         if until is None:
647             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
648         else:
649             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
650         if stats is None:
651             stats = (0, 0, 0)
652         return stats
653     
654     def _get_version(self, node, version=None):
655         if version is None:
656             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
657             if props is None:
658                 raise NameError('Object does not exist')
659         else:
660             props = self.node.version_get_properties(version)
661             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
662                 raise IndexError('Version does not exist')
663         return props
664     
665     def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
666         
667         # Get source serial and size.
668         if src_version is not None:
669             src_props = self._get_version(src_node, src_version)
670             src_version_id = src_props[self.SERIAL]
671             size = src_props[self.SIZE]
672         else:
673             # Latest or create from scratch.
674             try:
675                 src_props = self._get_version(src_node)
676                 src_version_id = src_props[self.SERIAL]
677                 size = src_props[self.SIZE]
678             except NameError:
679                 src_version_id = None
680                 size = 0
681         if dest_size is not None:
682             size = dest_size
683         
684         # Move the latest version at destination to CLUSTER_HISTORY and create new.
685         if src_node == dest_node and src_version is None and src_version_id is not None:
686             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
687         else:
688             dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
689             if dest_props is not None:
690                 self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY)
691         dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
692         
693         return src_version_id, dest_version_id
694     
695     def _copy_data(self, src_version, dest_version):
696         hashmap = self.mapper.map_retr(src_version)
697         self.mapper.map_stor(dest_version, hashmap)
698     
699     def _get_metadata(self, version):
700         if version is None:
701             return {}
702         return dict(self.node.attribute_get(version))
703     
704     def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
705         """Create a new version and store metadata."""
706         
707         src_version_id, dest_version_id = self._copy_version(user, node, None, node)
708         if not replace:
709             if src_version_id is not None:
710                 self.node.attribute_copy(src_version_id, dest_version_id)
711             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
712             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
713         else:
714             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
715         if copy_data and src_version_id is not None:
716             self._copy_data(src_version_id, dest_version_id)
717         return dest_version_id
718     
719     def _list_limits(self, listing, marker, limit):
720         start = 0
721         if marker:
722             try:
723                 start = listing.index(marker) + 1
724             except ValueError:
725                 pass
726         if not limit or limit > 10000:
727             limit = 10000
728         return start, limit
729     
730     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
731         cont_prefix = path + '/'
732         prefix = cont_prefix + prefix
733         start = cont_prefix + marker if marker else None
734         before = until if until is not None else inf
735         filterq = ','.join(keys) if keys else None
736         
737         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
738         objects.extend([(p, None) for p in prefixes] if virtual else [])
739         objects.sort()
740         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
741         
742         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
743         return objects[start:start + limit]
744     
745     # Policy functions.
746     
747     def _check_policy(self, policy):
748         for k in policy.keys():
749             if policy[k] == '':
750                 policy[k] = self.default_policy.get(k)
751         for k, v in policy.iteritems():
752             if k == 'quota':
753                 q = int(v) # May raise ValueError.
754                 if q < 0:
755                     raise ValueError
756             elif k == 'versioning':
757                 if v not in ['auto', 'manual', 'none']:
758                     raise ValueError
759             else:
760                 raise ValueError
761     
762     # Access control functions.
763     
764     def _check_groups(self, groups):
765         # raise ValueError('Bad characters in groups')
766         pass
767     
768     def _check_permissions(self, path, permissions):
769         # raise ValueError('Bad characters in permissions')
770         
771         # Check for existing permissions.
772         paths = self.permissions.access_list(path)
773         if paths:
774             ae = AttributeError()
775             ae.data = paths
776             raise ae
777     
778     def _can_read(self, user, account, container, name):
779         if user == account:
780             return True
781         path = '/'.join((account, container, name))
782         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
783             raise NotAllowedError
784     
785     def _can_write(self, user, account, container, name):
786         if user == account:
787             return True
788         path = '/'.join((account, container, name))
789         if not self.permissions.access_check(path, self.WRITE, user):
790             raise NotAllowedError
791     
792     def _allowed_accounts(self, user):
793         allow = set()
794         for path in self.permissions.access_list_paths(user):
795             allow.add(path.split('/', 1)[0])
796         return sorted(allow)
797     
798     def _allowed_containers(self, user, account):
799         allow = set()
800         for path in self.permissions.access_list_paths(user, account):
801             allow.add(path.split('/', 2)[1])
802         return sorted(allow)