Make modular backend load modules dynamically.
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import sqlite3
38 import logging
39 import hashlib
40 import binascii
41
42 from base import NotAllowedError, BaseBackend
43 from lib.hashfiler import Mapper, Blocker
44
45 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
46
47 inf = float('inf')
48
49
50 logger = logging.getLogger(__name__)
51
52 def backend_method(func=None, autocommit=1):
53     if func is None:
54         def fn(func):
55             return backend_method(func, autocommit)
56         return fn
57
58     if not autocommit:
59         return func
60     def fn(self, *args, **kw):
61         self.wrapper.execute()
62         try:
63             ret = func(self, *args, **kw)
64             self.wrapper.commit()
65             return ret
66         except:
67             self.wrapper.rollback()
68             raise
69     return fn
70
71
72 class ModularBackend(BaseBackend):
73     """A modular backend.
74     
75     Uses modules for SQL functions and hashfiler for storage.
76     """
77     
78     def __init__(self, mod, path, db):
79         self.hash_algorithm = 'sha256'
80         self.block_size = 4 * 1024 * 1024 # 4MB
81         
82         self.default_policy = {'quota': 0, 'versioning': 'auto'}
83         
84         if path and not os.path.exists(path):
85             os.makedirs(path)
86         if not os.path.isdir(path):
87             raise RuntimeError("Cannot open path '%s'" % (path,))
88         
89         __import__(mod)
90         self.mod = sys.modules[mod]
91         self.wrapper = self.mod.dbwrapper.DBWrapper(db)
92         
93         params = {'blocksize': self.block_size,
94                   'blockpath': os.path.join(path + '/blocks'),
95                   'hashtype': self.hash_algorithm}
96         self.blocker = Blocker(**params)
97         
98         params = {'mappath': os.path.join(path + '/maps'),
99                   'namelen': self.blocker.hashlen}
100         self.mapper = Mapper(**params)
101         
102         params = {'connection': self.wrapper.conn,
103                   'cursor': self.wrapper.conn.cursor()}
104         self.permissions = self.mod.permissions.Permissions(**params)
105         for x in ['READ', 'WRITE']:
106             setattr(self, x, getattr(self.mod.permissions, x))
107         self.policy = self.mod.policy.Policy(**params)
108         self.node = self.mod.node.Node(**params)
109         for x in ['ROOTNODE', 'SERIAL', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
110             setattr(self, x, getattr(self.mod.node, x))
111     
112     @backend_method
113     def list_accounts(self, user, marker=None, limit=10000):
114         """Return a list of accounts the user can access."""
115         
116         logger.debug("list_accounts: %s %s %s", user, marker, limit)
117         allowed = self._allowed_accounts(user)
118         start, limit = self._list_limits(allowed, marker, limit)
119         return allowed[start:start + limit]
120     
121     @backend_method
122     def get_account_meta(self, user, account, until=None):
123         """Return a dictionary with the account metadata."""
124         
125         logger.debug("get_account_meta: %s %s", account, until)
126         path, node = self._lookup_account(account, user == account)
127         if user != account:
128             if until or node is None or account not in self._allowed_accounts(user):
129                 raise NotAllowedError
130         try:
131             props = self._get_properties(node, until)
132             mtime = props[self.MTIME]
133         except NameError:
134             props = None
135             mtime = until
136         count, bytes, tstamp = self._get_statistics(node, until)
137         tstamp = max(tstamp, mtime)
138         if until is None:
139             modified = tstamp
140         else:
141             modified = self._get_statistics(node)[2] # Overall last modification.
142             modified = max(modified, mtime)
143         
144         if user != account:
145             meta = {'name': account}
146         else:
147             meta = {}
148             if props is not None:
149                 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
150             if until is not None:
151                 meta.update({'until_timestamp': tstamp})
152             meta.update({'name': account, 'count': count, 'bytes': bytes})
153         meta.update({'modified': modified})
154         return meta
155     
156     @backend_method
157     def update_account_meta(self, user, account, meta, replace=False):
158         """Update the metadata associated with the account."""
159         
160         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
161         if user != account:
162             raise NotAllowedError
163         path, node = self._lookup_account(account, True)
164         self._put_metadata(user, node, meta, replace, False)
165     
166     @backend_method
167     def get_account_groups(self, user, account):
168         """Return a dictionary with the user groups defined for this account."""
169         
170         logger.debug("get_account_groups: %s", account)
171         if user != account:
172             if account not in self._allowed_accounts(user):
173                 raise NotAllowedError
174             return {}
175         self._lookup_account(account, True)
176         return self.permissions.group_dict(account)
177     
178     @backend_method
179     def update_account_groups(self, user, account, groups, replace=False):
180         """Update the groups associated with the account."""
181         
182         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
183         if user != account:
184             raise NotAllowedError
185         self._lookup_account(account, True)
186         self._check_groups(groups)
187         if replace:
188             self.permissions.group_destroy(account)
189         for k, v in groups.iteritems():
190             if not replace: # If not already deleted.
191                 self.permissions.group_delete(account, k)
192             if v:
193                 self.permissions.group_addmany(account, k, v)
194     
195     @backend_method
196     def put_account(self, user, account):
197         """Create a new account with the given name."""
198         
199         logger.debug("put_account: %s", account)
200         if user != account:
201             raise NotAllowedError
202         node = self.node.node_lookup(account)
203         if node is not None:
204             raise NameError('Account already exists')
205         self._put_path(user, self.ROOTNODE, account)
206     
207     @backend_method
208     def delete_account(self, user, account):
209         """Delete the account with the given name."""
210         
211         logger.debug("delete_account: %s", account)
212         if user != account:
213             raise NotAllowedError
214         node = self.node.node_lookup(account)
215         if node is None:
216             return
217         if not self.node.node_remove(node):
218             raise IndexError('Account is not empty')
219         self.permissions.group_destroy(account)
220     
221     @backend_method
222     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
223         """Return a list of containers existing under an account."""
224         
225         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
226         if user != account:
227             if until or account not in self._allowed_accounts(user):
228                 raise NotAllowedError
229             allowed = self._allowed_containers(user, account)
230             start, limit = self._list_limits(allowed, marker, limit)
231             return allowed[start:start + limit]
232         if shared:
233             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
234             start, limit = self._list_limits(allowed, marker, limit)
235             return allowed[start:start + limit]
236         node = self.node.node_lookup(account)
237         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
238     
239     @backend_method
240     def get_container_meta(self, user, account, container, until=None):
241         """Return a dictionary with the container metadata."""
242         
243         logger.debug("get_container_meta: %s %s %s", account, container, until)
244         if user != account:
245             if until or container not in self._allowed_containers(user, account):
246                 raise NotAllowedError
247         path, node = self._lookup_container(account, container)
248         props = self._get_properties(node, until)
249         mtime = props[self.MTIME]
250         count, bytes, tstamp = self._get_statistics(node, until)
251         tstamp = max(tstamp, mtime)
252         if until is None:
253             modified = tstamp
254         else:
255             modified = self._get_statistics(node)[2] # Overall last modification.
256             modified = max(modified, mtime)
257         
258         if user != account:
259             meta = {'name': container}
260         else:
261             meta = dict(self.node.attribute_get(props[self.SERIAL]))
262             if until is not None:
263                 meta.update({'until_timestamp': tstamp})
264             meta.update({'name': container, 'count': count, 'bytes': bytes})
265         meta.update({'modified': modified})
266         return meta
267     
268     @backend_method
269     def update_container_meta(self, user, account, container, meta, replace=False):
270         """Update the metadata associated with the container."""
271         
272         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
273         if user != account:
274             raise NotAllowedError
275         path, node = self._lookup_container(account, container)
276         self._put_metadata(user, node, meta, replace, False)
277     
278     @backend_method
279     def get_container_policy(self, user, account, container):
280         """Return a dictionary with the container policy."""
281         
282         logger.debug("get_container_policy: %s %s", account, container)
283         if user != account:
284             if container not in self._allowed_containers(user, account):
285                 raise NotAllowedError
286             return {}
287         path = self._lookup_container(account, container)[0]
288         return self.policy.policy_get(path)
289     
290     @backend_method
291     def update_container_policy(self, user, account, container, policy, replace=False):
292         """Update the policy associated with the account."""
293         
294         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
295         if user != account:
296             raise NotAllowedError
297         path = self._lookup_container(account, container)[0]
298         self._check_policy(policy)
299         if replace:
300             for k, v in self.default_policy.iteritems():
301                 if k not in policy:
302                     policy[k] = v
303         self.policy.policy_set(path, policy)
304     
305     @backend_method
306     def put_container(self, user, account, container, policy=None):
307         """Create a new container with the given name."""
308         
309         logger.debug("put_container: %s %s %s", account, container, policy)
310         if user != account:
311             raise NotAllowedError
312         try:
313             path, node = self._lookup_container(account, container)
314         except NameError:
315             pass
316         else:
317             raise NameError('Container already exists')
318         if policy:
319             self._check_policy(policy)
320         path = '/'.join((account, container))
321         self._put_path(user, self._lookup_account(account, True)[1], path)
322         for k, v in self.default_policy.iteritems():
323             if k not in policy:
324                 policy[k] = v
325         self.policy.policy_set(path, policy)
326     
327     @backend_method
328     def delete_container(self, user, account, container, until=None):
329         """Delete/purge the container with the given name."""
330         
331         logger.debug("delete_container: %s %s %s", account, container, until)
332         if user != account:
333             raise NotAllowedError
334         path, node = self._lookup_container(account, container)
335         
336         if until is not None:
337             versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
338             for v in versions:
339                 self.mapper.map_remv(v)
340             self.node.node_purge_children(node, until, CLUSTER_DELETED)
341             return
342         
343         if self._get_statistics(node)[0] > 0:
344             raise IndexError('Container is not empty')
345         versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
346         for v in versions:
347             self.mapper.map_remv(v)
348         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
349         self.node.node_remove(node)
350         self.policy.policy_unset(path)
351     
352     @backend_method
353     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
354         """Return a list of objects existing under a container."""
355         
356         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
357         allowed = []
358         if user != account:
359             if until:
360                 raise NotAllowedError
361             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
362             if not allowed:
363                 raise NotAllowedError
364         else:
365             if shared:
366                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
367                 if not allowed:
368                     return []
369         path, node = self._lookup_container(account, container)
370         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
371     
372     @backend_method
373     def list_object_meta(self, user, account, container, until=None):
374         """Return a list with all the container's object meta keys."""
375         
376         logger.debug("list_object_meta: %s %s %s", account, container, until)
377         allowed = []
378         if user != account:
379             if until:
380                 raise NotAllowedError
381             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
382             if not allowed:
383                 raise NotAllowedError
384         path, node = self._lookup_container(account, container)
385         before = until if until is not None else inf
386         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
387     
388     @backend_method
389     def get_object_meta(self, user, account, container, name, version=None):
390         """Return a dictionary with the object metadata."""
391         
392         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
393         self._can_read(user, account, container, name)
394         path, node = self._lookup_object(account, container, name)
395         props = self._get_version(node, version)
396         if version is None:
397             modified = props[self.MTIME]
398         else:
399             modified = self._get_version(node)[self.MTIME] # Overall last modification.
400         
401         meta = dict(self.node.attribute_get(props[self.SERIAL]))
402         meta.update({'name': name, 'bytes': props[self.SIZE]})
403         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
404         meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
405         return meta
406     
407     @backend_method
408     def update_object_meta(self, user, account, container, name, meta, replace=False):
409         """Update the metadata associated with the object."""
410         
411         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
412         self._can_write(user, account, container, name)
413         path, node = self._lookup_object(account, container, name)
414         return self._put_metadata(user, node, meta, replace)
415     
416     @backend_method
417     def get_object_permissions(self, user, account, container, name):
418         """Return the path from which this object gets its permissions from,\
419         along with a dictionary containing the permissions."""
420         
421         logger.debug("get_object_permissions: %s %s %s", account, container, name)
422         self._can_read(user, account, container, name)
423         path = self._lookup_object(account, container, name)[0]
424         return self.permissions.access_inherit(path)
425     
426     @backend_method
427     def update_object_permissions(self, user, account, container, name, permissions):
428         """Update the permissions associated with the object."""
429         
430         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
431         if user != account:
432             raise NotAllowedError
433         path = self._lookup_object(account, container, name)[0]
434         self._check_permissions(path, permissions)
435         self.permissions.access_set(path, permissions)
436     
437     @backend_method
438     def get_object_public(self, user, account, container, name):
439         """Return the public URL of the object if applicable."""
440         
441         logger.debug("get_object_public: %s %s %s", account, container, name)
442         self._can_read(user, account, container, name)
443         path = self._lookup_object(account, container, name)[0]
444         if self.permissions.public_check(path):
445             return '/public/' + path
446         return None
447     
448     @backend_method
449     def update_object_public(self, user, account, container, name, public):
450         """Update the public status of the object."""
451         
452         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
453         self._can_write(user, account, container, name)
454         path = self._lookup_object(account, container, name)[0]
455         if not public:
456             self.permissions.public_unset(path)
457         else:
458             self.permissions.public_set(path)
459     
460     @backend_method
461     def get_object_hashmap(self, user, account, container, name, version=None):
462         """Return the object's size and a list with partial hashes."""
463         
464         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
465         self._can_read(user, account, container, name)
466         path, node = self._lookup_object(account, container, name)
467         props = self._get_version(node, version)
468         hashmap = self.mapper.map_retr(props[self.SERIAL])
469         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
470     
471     @backend_method
472     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
473         """Create/update an object with the specified size and partial hashes."""
474         
475         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
476         if permissions is not None and user != account:
477             raise NotAllowedError
478         self._can_write(user, account, container, name)
479         missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
480         if missing:
481             ie = IndexError()
482             ie.data = [binascii.hexlify(x) for x in missing]
483             raise ie
484         if permissions is not None:
485             self._check_permissions(path, permissions)
486         path, node = self._put_object_node(account, container, name)
487         src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
488         self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
489         if not replace_meta and src_version_id is not None:
490             self.node.attribute_copy(src_version_id, dest_version_id)
491         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
492         if permissions is not None:
493             self.permissions.access_set(path, permissions)
494         return dest_version_id
495     
496     @backend_method
497     def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
498         """Copy an object's data and metadata."""
499         
500         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
501         if permissions is not None and user != account:
502             raise NotAllowedError
503         self._can_read(user, account, src_container, src_name)
504         self._can_write(user, account, dest_container, dest_name)
505         src_path, src_node = self._lookup_object(account, src_container, src_name)
506         if permissions is not None:
507             self._check_permissions(dest_path, permissions)
508         dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
509         src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
510         if src_version_id is not None:
511             self._copy_data(src_version_id, dest_version_id)
512         if not replace_meta and src_version_id is not None:
513             self.node.attribute_copy(src_version_id, dest_version_id)
514         self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
515         if permissions is not None:
516             self.permissions.access_set(dest_path, permissions)
517         return dest_version_id
518     
519     @backend_method
520     def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
521         """Move an object's data and metadata."""
522         
523         logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
524         dest_version_id = self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
525         self.delete_object(user, account, src_container, src_name)
526         return dest_version_id
527     
528     @backend_method
529     def delete_object(self, user, account, container, name, until=None):
530         """Delete/purge an object."""
531         
532         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
533         if user != account:
534             raise NotAllowedError
535         
536         if until is not None:
537             path = '/'.join((account, container, name))
538             node = self.node.node_lookup(path)
539             if node is None:
540                 return
541             versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
542             versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
543             for v in versions:
544                 self.mapper.map_remv(v)
545             self.node.node_purge_children(node, until, CLUSTER_DELETED)
546             try:
547                 props = self._get_version(node)
548             except NameError:
549                 pass
550             else:
551                 self.permissions.access_clear(path)
552             return
553         
554         path, node = self._lookup_object(account, container, name)
555         self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
556         self.permissions.access_clear(path)
557     
558     @backend_method
559     def list_versions(self, user, account, container, name):
560         """Return a list of all (version, version_timestamp) tuples for an object."""
561         
562         logger.debug("list_versions: %s %s %s", account, container, name)
563         self._can_read(user, account, container, name)
564         path, node = self._lookup_object(account, container, name)
565         return self.node.node_get_versions(node, ['serial', 'mtime'])
566     
567     @backend_method(autocommit=0)
568     def get_block(self, hash):
569         """Return a block's data."""
570         
571         logger.debug("get_block: %s", hash)
572         blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
573         if not blocks:
574             raise NameError('Block does not exist')
575         return blocks[0]
576     
577     @backend_method(autocommit=0)
578     def put_block(self, data):
579         """Store a block and return the hash."""
580         
581         logger.debug("put_block: %s", len(data))
582         hashes, absent = self.blocker.block_stor((data,))
583         return binascii.hexlify(hashes[0])
584     
585     @backend_method(autocommit=0)
586     def update_block(self, hash, data, offset=0):
587         """Update a known block and return the hash."""
588         
589         logger.debug("update_block: %s %s %s", hash, len(data), offset)
590         if offset == 0 and len(data) == self.block_size:
591             return self.put_block(data)
592         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
593         return binascii.hexlify(h)
594     
595     # Path functions.
596     
597     def _put_object_node(self, account, container, name):
598         path, parent = self._lookup_container(account, container)
599         path = '/'.join((path, name))
600         node = self.node.node_lookup(path)
601         if node is None:
602             node = self.node.node_create(parent, path)
603         return path, node
604     
605     def _put_path(self, user, parent, path):
606         node = self.node.node_create(parent, path)
607         self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
608         return node
609     
610     def _lookup_account(self, account, create=True):
611         node = self.node.node_lookup(account)
612         if node is None and create:
613             node = self._put_path(account, self.ROOTNODE, account) # User is account.
614         return account, node
615     
616     def _lookup_container(self, account, container):
617         path = '/'.join((account, container))
618         node = self.node.node_lookup(path)
619         if node is None:
620             raise NameError('Container does not exist')
621         return path, node
622     
623     def _lookup_object(self, account, container, name):
624         path = '/'.join((account, container, name))
625         node = self.node.node_lookup(path)
626         if node is None:
627             raise NameError('Object does not exist')
628         return path, node
629     
630     def _get_properties(self, node, until=None):
631         """Return properties until the timestamp given."""
632         
633         before = until if until is not None else inf
634         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
635         if props is None and until is not None:
636             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
637         if props is None:
638             raise NameError('Path does not exist')
639         return props
640     
641     def _get_statistics(self, node, until=None):
642         """Return count, sum of size and latest timestamp of everything under node."""
643         
644         if until is None:
645             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
646         else:
647             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
648         if stats is None:
649             stats = (0, 0, 0)
650         return stats
651     
652     def _get_version(self, node, version=None):
653         if version is None:
654             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
655             if props is None:
656                 raise NameError('Object does not exist')
657         else:
658             props = self.node.version_get_properties(version)
659             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
660                 raise IndexError('Version does not exist')
661         return props
662     
663     def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
664         
665         # Get source serial and size.
666         if src_version is not None:
667             src_props = self._get_version(src_node, src_version)
668             src_version_id = src_props[self.SERIAL]
669             size = src_props[self.SIZE]
670         else:
671             # Latest or create from scratch.
672             try:
673                 src_props = self._get_version(src_node)
674                 src_version_id = src_props[self.SERIAL]
675                 size = src_props[self.SIZE]
676             except NameError:
677                 src_version_id = None
678                 size = 0
679         if dest_size is not None:
680             size = dest_size
681         
682         # Move the latest version at destination to CLUSTER_HISTORY and create new.
683         if src_node == dest_node and src_version is None and src_version_id is not None:
684             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
685         else:
686             dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
687             if dest_props is not None:
688                 self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY)
689         dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
690         
691         return src_version_id, dest_version_id
692     
693     def _copy_data(self, src_version, dest_version):
694         hashmap = self.mapper.map_retr(src_version)
695         self.mapper.map_stor(dest_version, hashmap)
696     
697     def _get_metadata(self, version):
698         if version is None:
699             return {}
700         return dict(self.node.attribute_get(version))
701     
702     def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
703         """Create a new version and store metadata."""
704         
705         src_version_id, dest_version_id = self._copy_version(user, node, None, node)
706         if not replace:
707             if src_version_id is not None:
708                 self.node.attribute_copy(src_version_id, dest_version_id)
709             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
710             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
711         else:
712             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
713         if copy_data and src_version_id is not None:
714             self._copy_data(src_version_id, dest_version_id)
715         return dest_version_id
716     
717     def _list_limits(self, listing, marker, limit):
718         start = 0
719         if marker:
720             try:
721                 start = listing.index(marker) + 1
722             except ValueError:
723                 pass
724         if not limit or limit > 10000:
725             limit = 10000
726         return start, limit
727     
728     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
729         cont_prefix = path + '/'
730         prefix = cont_prefix + prefix
731         start = cont_prefix + marker if marker else None
732         before = until if until is not None else inf
733         filterq = ','.join(keys) if keys else None
734         
735         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
736         objects.extend([(p, None) for p in prefixes] if virtual else [])
737         objects.sort()
738         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
739         
740         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
741         return objects[start:start + limit]
742     
743     # Policy functions.
744     
745     def _check_policy(self, policy):
746         for k in policy.keys():
747             if policy[k] == '':
748                 policy[k] = self.default_policy.get(k)
749         for k, v in policy.iteritems():
750             if k == 'quota':
751                 q = int(v) # May raise ValueError.
752                 if q < 0:
753                     raise ValueError
754             elif k == 'versioning':
755                 if v not in ['auto', 'manual', 'none']:
756                     raise ValueError
757             else:
758                 raise ValueError
759     
760     # Access control functions.
761     
762     def _check_groups(self, groups):
763         # raise ValueError('Bad characters in groups')
764         pass
765     
766     def _check_permissions(self, path, permissions):
767         # raise ValueError('Bad characters in permissions')
768         
769         # Check for existing permissions.
770         paths = self.permissions.access_list(path)
771         if paths:
772             ae = AttributeError()
773             ae.data = paths
774             raise ae
775     
776     def _can_read(self, user, account, container, name):
777         if user == account:
778             return True
779         path = '/'.join((account, container, name))
780         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
781             raise NotAllowedError
782     
783     def _can_write(self, user, account, container, name):
784         if user == account:
785             return True
786         path = '/'.join((account, container, name))
787         if not self.permissions.access_check(path, self.WRITE, user):
788             raise NotAllowedError
789     
790     def _allowed_accounts(self, user):
791         allow = set()
792         for path in self.permissions.access_list_paths(user):
793             allow.add(path.split('/', 1)[0])
794         return sorted(allow)
795     
796     def _allowed_containers(self, user, account):
797         allow = set()
798         for path in self.permissions.access_list_paths(user, account):
799             allow.add(path.split('/', 2)[1])
800         return sorted(allow)