a65da33d7cf1e0af268c1d8d6bc23de7b46431d1
[pithos] / pithos / backends / modular.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import sqlite3
38 import logging
39 import hashlib
40 import binascii
41
42 from base import NotAllowedError, BaseBackend
43 from lib.hashfiler import Mapper, Blocker
44
45 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
46
47 inf = float('inf')
48
49
50 logger = logging.getLogger(__name__)
51
52 def backend_method(func=None, autocommit=1):
53     if func is None:
54         def fn(func):
55             return backend_method(func, autocommit)
56         return fn
57
58     if not autocommit:
59         return func
60     def fn(self, *args, **kw):
61         self.wrapper.execute()
62         try:
63             ret = func(self, *args, **kw)
64             self.wrapper.commit()
65             return ret
66         except:
67             self.wrapper.rollback()
68             raise
69     return fn
70
71
72 class ModularBackend(BaseBackend):
73     """A modular backend.
74     
75     Uses modules for SQL functions and hashfiler for storage.
76     """
77     
78     def __init__(self, mod, path, db):
79         self.hash_algorithm = 'sha256'
80         self.block_size = 4 * 1024 * 1024 # 4MB
81         
82         self.default_policy = {'quota': 0, 'versioning': 'auto'}
83         
84         if path and not os.path.exists(path):
85             os.makedirs(path)
86         if not os.path.isdir(path):
87             raise RuntimeError("Cannot open path '%s'" % (path,))
88         
89         __import__(mod)
90         self.mod = sys.modules[mod]
91         self.wrapper = self.mod.dbwrapper.DBWrapper(db)
92         
93         params = {'blocksize': self.block_size,
94                   'blockpath': os.path.join(path + '/blocks'),
95                   'hashtype': self.hash_algorithm}
96         self.blocker = Blocker(**params)
97         
98         params = {'mappath': os.path.join(path + '/maps'),
99                   'namelen': self.blocker.hashlen}
100         self.mapper = Mapper(**params)
101         
102         params = {'wrapper': self.wrapper}
103         self.permissions = self.mod.permissions.Permissions(**params)
104         for x in ['READ', 'WRITE']:
105             setattr(self, x, getattr(self.mod.permissions, x))
106         self.policy = self.mod.policy.Policy(**params)
107         self.node = self.mod.node.Node(**params)
108         for x in ['ROOTNODE', 'SERIAL', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
109             setattr(self, x, getattr(self.mod.node, x))
110     
111     @backend_method
112     def list_accounts(self, user, marker=None, limit=10000):
113         """Return a list of accounts the user can access."""
114         
115         logger.debug("list_accounts: %s %s %s", user, marker, limit)
116         allowed = self._allowed_accounts(user)
117         start, limit = self._list_limits(allowed, marker, limit)
118         return allowed[start:start + limit]
119     
120     @backend_method
121     def get_account_meta(self, user, account, until=None):
122         """Return a dictionary with the account metadata."""
123         
124         logger.debug("get_account_meta: %s %s", account, until)
125         path, node = self._lookup_account(account, user == account)
126         if user != account:
127             if until or node is None or account not in self._allowed_accounts(user):
128                 raise NotAllowedError
129         try:
130             props = self._get_properties(node, until)
131             mtime = props[self.MTIME]
132         except NameError:
133             props = None
134             mtime = until
135         count, bytes, tstamp = self._get_statistics(node, until)
136         tstamp = max(tstamp, mtime)
137         if until is None:
138             modified = tstamp
139         else:
140             modified = self._get_statistics(node)[2] # Overall last modification.
141             modified = max(modified, mtime)
142         
143         if user != account:
144             meta = {'name': account}
145         else:
146             meta = {}
147             if props is not None:
148                 meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
149             if until is not None:
150                 meta.update({'until_timestamp': tstamp})
151             meta.update({'name': account, 'count': count, 'bytes': bytes})
152         meta.update({'modified': modified})
153         return meta
154     
155     @backend_method
156     def update_account_meta(self, user, account, meta, replace=False):
157         """Update the metadata associated with the account."""
158         
159         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
160         if user != account:
161             raise NotAllowedError
162         path, node = self._lookup_account(account, True)
163         self._put_metadata(user, node, meta, replace, False)
164     
165     @backend_method
166     def get_account_groups(self, user, account):
167         """Return a dictionary with the user groups defined for this account."""
168         
169         logger.debug("get_account_groups: %s", account)
170         if user != account:
171             if account not in self._allowed_accounts(user):
172                 raise NotAllowedError
173             return {}
174         self._lookup_account(account, True)
175         return self.permissions.group_dict(account)
176     
177     @backend_method
178     def update_account_groups(self, user, account, groups, replace=False):
179         """Update the groups associated with the account."""
180         
181         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
182         if user != account:
183             raise NotAllowedError
184         self._lookup_account(account, True)
185         self._check_groups(groups)
186         if replace:
187             self.permissions.group_destroy(account)
188         for k, v in groups.iteritems():
189             if not replace: # If not already deleted.
190                 self.permissions.group_delete(account, k)
191             if v:
192                 self.permissions.group_addmany(account, k, v)
193     
194     @backend_method
195     def put_account(self, user, account):
196         """Create a new account with the given name."""
197         
198         logger.debug("put_account: %s", account)
199         if user != account:
200             raise NotAllowedError
201         node = self.node.node_lookup(account)
202         if node is not None:
203             raise NameError('Account already exists')
204         self._put_path(user, self.ROOTNODE, account)
205     
206     @backend_method
207     def delete_account(self, user, account):
208         """Delete the account with the given name."""
209         
210         logger.debug("delete_account: %s", account)
211         if user != account:
212             raise NotAllowedError
213         node = self.node.node_lookup(account)
214         if node is None:
215             return
216         if not self.node.node_remove(node):
217             raise IndexError('Account is not empty')
218         self.permissions.group_destroy(account)
219     
220     @backend_method
221     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
222         """Return a list of containers existing under an account."""
223         
224         logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
225         if user != account:
226             if until or account not in self._allowed_accounts(user):
227                 raise NotAllowedError
228             allowed = self._allowed_containers(user, account)
229             start, limit = self._list_limits(allowed, marker, limit)
230             return allowed[start:start + limit]
231         if shared:
232             allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
233             start, limit = self._list_limits(allowed, marker, limit)
234             return allowed[start:start + limit]
235         node = self.node.node_lookup(account)
236         return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
237     
238     @backend_method
239     def get_container_meta(self, user, account, container, until=None):
240         """Return a dictionary with the container metadata."""
241         
242         logger.debug("get_container_meta: %s %s %s", account, container, until)
243         if user != account:
244             if until or container not in self._allowed_containers(user, account):
245                 raise NotAllowedError
246         path, node = self._lookup_container(account, container)
247         props = self._get_properties(node, until)
248         mtime = props[self.MTIME]
249         count, bytes, tstamp = self._get_statistics(node, until)
250         tstamp = max(tstamp, mtime)
251         if until is None:
252             modified = tstamp
253         else:
254             modified = self._get_statistics(node)[2] # Overall last modification.
255             modified = max(modified, mtime)
256         
257         if user != account:
258             meta = {'name': container}
259         else:
260             meta = dict(self.node.attribute_get(props[self.SERIAL]))
261             if until is not None:
262                 meta.update({'until_timestamp': tstamp})
263             meta.update({'name': container, 'count': count, 'bytes': bytes})
264         meta.update({'modified': modified})
265         return meta
266     
267     @backend_method
268     def update_container_meta(self, user, account, container, meta, replace=False):
269         """Update the metadata associated with the container."""
270         
271         logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
272         if user != account:
273             raise NotAllowedError
274         path, node = self._lookup_container(account, container)
275         self._put_metadata(user, node, meta, replace, False)
276     
277     @backend_method
278     def get_container_policy(self, user, account, container):
279         """Return a dictionary with the container policy."""
280         
281         logger.debug("get_container_policy: %s %s", account, container)
282         if user != account:
283             if container not in self._allowed_containers(user, account):
284                 raise NotAllowedError
285             return {}
286         path = self._lookup_container(account, container)[0]
287         return self.policy.policy_get(path)
288     
289     @backend_method
290     def update_container_policy(self, user, account, container, policy, replace=False):
291         """Update the policy associated with the account."""
292         
293         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
294         if user != account:
295             raise NotAllowedError
296         path = self._lookup_container(account, container)[0]
297         self._check_policy(policy)
298         if replace:
299             for k, v in self.default_policy.iteritems():
300                 if k not in policy:
301                     policy[k] = v
302         self.policy.policy_set(path, policy)
303     
304     @backend_method
305     def put_container(self, user, account, container, policy=None):
306         """Create a new container with the given name."""
307         
308         logger.debug("put_container: %s %s %s", account, container, policy)
309         if user != account:
310             raise NotAllowedError
311         try:
312             path, node = self._lookup_container(account, container)
313         except NameError:
314             pass
315         else:
316             raise NameError('Container already exists')
317         if policy:
318             self._check_policy(policy)
319         path = '/'.join((account, container))
320         self._put_path(user, self._lookup_account(account, True)[1], path)
321         for k, v in self.default_policy.iteritems():
322             if k not in policy:
323                 policy[k] = v
324         self.policy.policy_set(path, policy)
325     
326     @backend_method
327     def delete_container(self, user, account, container, until=None):
328         """Delete/purge the container with the given name."""
329         
330         logger.debug("delete_container: %s %s %s", account, container, until)
331         if user != account:
332             raise NotAllowedError
333         path, node = self._lookup_container(account, container)
334         
335         if until is not None:
336             versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
337             for v in versions:
338                 self.mapper.map_remv(v)
339             self.node.node_purge_children(node, until, CLUSTER_DELETED)
340             return
341         
342         if self._get_statistics(node)[0] > 0:
343             raise IndexError('Container is not empty')
344         versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
345         for v in versions:
346             self.mapper.map_remv(v)
347         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
348         self.node.node_remove(node)
349         self.policy.policy_unset(path)
350     
351     @backend_method
352     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
353         """Return a list of objects existing under a container."""
354         
355         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
356         allowed = []
357         if user != account:
358             if until:
359                 raise NotAllowedError
360             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
361             if not allowed:
362                 raise NotAllowedError
363         else:
364             if shared:
365                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
366                 if not allowed:
367                     return []
368         path, node = self._lookup_container(account, container)
369         return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
370     
371     @backend_method
372     def list_object_meta(self, user, account, container, until=None):
373         """Return a list with all the container's object meta keys."""
374         
375         logger.debug("list_object_meta: %s %s %s", account, container, until)
376         allowed = []
377         if user != account:
378             if until:
379                 raise NotAllowedError
380             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
381             if not allowed:
382                 raise NotAllowedError
383         path, node = self._lookup_container(account, container)
384         before = until if until is not None else inf
385         return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
386     
387     @backend_method
388     def get_object_meta(self, user, account, container, name, version=None):
389         """Return a dictionary with the object metadata."""
390         
391         logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
392         self._can_read(user, account, container, name)
393         path, node = self._lookup_object(account, container, name)
394         props = self._get_version(node, version)
395         if version is None:
396             modified = props[self.MTIME]
397         else:
398             modified = self._get_version(node)[self.MTIME] # Overall last modification.
399         
400         meta = dict(self.node.attribute_get(props[self.SERIAL]))
401         meta.update({'name': name, 'bytes': props[self.SIZE]})
402         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
403         meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
404         return meta
405     
406     @backend_method
407     def update_object_meta(self, user, account, container, name, meta, replace=False):
408         """Update the metadata associated with the object."""
409         
410         logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
411         self._can_write(user, account, container, name)
412         path, node = self._lookup_object(account, container, name)
413         return self._put_metadata(user, node, meta, replace)
414     
415     @backend_method
416     def get_object_permissions(self, user, account, container, name):
417         """Return the path from which this object gets its permissions from,\
418         along with a dictionary containing the permissions."""
419         
420         logger.debug("get_object_permissions: %s %s %s", account, container, name)
421         self._can_read(user, account, container, name)
422         path = self._lookup_object(account, container, name)[0]
423         return self.permissions.access_inherit(path)
424     
425     @backend_method
426     def update_object_permissions(self, user, account, container, name, permissions):
427         """Update the permissions associated with the object."""
428         
429         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
430         if user != account:
431             raise NotAllowedError
432         path = self._lookup_object(account, container, name)[0]
433         self._check_permissions(path, permissions)
434         self.permissions.access_set(path, permissions)
435     
436     @backend_method
437     def get_object_public(self, user, account, container, name):
438         """Return the public URL of the object if applicable."""
439         
440         logger.debug("get_object_public: %s %s %s", account, container, name)
441         self._can_read(user, account, container, name)
442         path = self._lookup_object(account, container, name)[0]
443         if self.permissions.public_check(path):
444             return '/public/' + path
445         return None
446     
447     @backend_method
448     def update_object_public(self, user, account, container, name, public):
449         """Update the public status of the object."""
450         
451         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
452         self._can_write(user, account, container, name)
453         path = self._lookup_object(account, container, name)[0]
454         if not public:
455             self.permissions.public_unset(path)
456         else:
457             self.permissions.public_set(path)
458     
459     @backend_method
460     def get_object_hashmap(self, user, account, container, name, version=None):
461         """Return the object's size and a list with partial hashes."""
462         
463         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
464         self._can_read(user, account, container, name)
465         path, node = self._lookup_object(account, container, name)
466         props = self._get_version(node, version)
467         hashmap = self.mapper.map_retr(props[self.SERIAL])
468         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
469     
470     @backend_method
471     def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
472         """Create/update an object with the specified size and partial hashes."""
473         
474         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
475         if permissions is not None and user != account:
476             raise NotAllowedError
477         self._can_write(user, account, container, name)
478         missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
479         if missing:
480             ie = IndexError()
481             ie.data = [binascii.hexlify(x) for x in missing]
482             raise ie
483         if permissions is not None:
484             path = '/'.join((account, container, name))
485             self._check_permissions(path, permissions)
486         path, node = self._put_object_node(account, container, name)
487         src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
488         self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
489         if not replace_meta and src_version_id is not None:
490             self.node.attribute_copy(src_version_id, dest_version_id)
491         self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
492         if permissions is not None:
493             self.permissions.access_set(path, permissions)
494         return dest_version_id
495     
496     @backend_method
497     def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
498         """Copy an object's data and metadata."""
499         
500         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
501         if permissions is not None and user != account:
502             raise NotAllowedError
503         self._can_read(user, account, src_container, src_name)
504         self._can_write(user, account, dest_container, dest_name)
505         src_path, src_node = self._lookup_object(account, src_container, src_name)
506         if permissions is not None:
507             dest_path = '/'.join((account, container, name))
508             self._check_permissions(dest_path, permissions)
509         dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
510         src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
511         if src_version_id is not None:
512             self._copy_data(src_version_id, dest_version_id)
513         if not replace_meta and src_version_id is not None:
514             self.node.attribute_copy(src_version_id, dest_version_id)
515         self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
516         if permissions is not None:
517             self.permissions.access_set(dest_path, permissions)
518         return dest_version_id
519     
520     @backend_method
521     def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
522         """Move an object's data and metadata."""
523         
524         logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
525         dest_version_id = self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
526         self.delete_object(user, account, src_container, src_name)
527         return dest_version_id
528     
529     @backend_method
530     def delete_object(self, user, account, container, name, until=None):
531         """Delete/purge an object."""
532         
533         logger.debug("delete_object: %s %s %s %s", account, container, name, until)
534         if user != account:
535             raise NotAllowedError
536         
537         if until is not None:
538             path = '/'.join((account, container, name))
539             node = self.node.node_lookup(path)
540             if node is None:
541                 return
542             versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
543             versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
544             for v in versions:
545                 self.mapper.map_remv(v)
546             self.node.node_purge_children(node, until, CLUSTER_DELETED)
547             try:
548                 props = self._get_version(node)
549             except NameError:
550                 pass
551             else:
552                 self.permissions.access_clear(path)
553             return
554         
555         path, node = self._lookup_object(account, container, name)
556         self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
557         self.permissions.access_clear(path)
558     
559     @backend_method
560     def list_versions(self, user, account, container, name):
561         """Return a list of all (version, version_timestamp) tuples for an object."""
562         
563         logger.debug("list_versions: %s %s %s", account, container, name)
564         self._can_read(user, account, container, name)
565         path, node = self._lookup_object(account, container, name)
566         return self.node.node_get_versions(node, ['serial', 'mtime'])
567     
568     @backend_method(autocommit=0)
569     def get_block(self, hash):
570         """Return a block's data."""
571         
572         logger.debug("get_block: %s", hash)
573         blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
574         if not blocks:
575             raise NameError('Block does not exist')
576         return blocks[0]
577     
578     @backend_method(autocommit=0)
579     def put_block(self, data):
580         """Store a block and return the hash."""
581         
582         logger.debug("put_block: %s", len(data))
583         hashes, absent = self.blocker.block_stor((data,))
584         return binascii.hexlify(hashes[0])
585     
586     @backend_method(autocommit=0)
587     def update_block(self, hash, data, offset=0):
588         """Update a known block and return the hash."""
589         
590         logger.debug("update_block: %s %s %s", hash, len(data), offset)
591         if offset == 0 and len(data) == self.block_size:
592             return self.put_block(data)
593         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
594         return binascii.hexlify(h)
595     
596     # Path functions.
597     
598     def _put_object_node(self, account, container, name):
599         path, parent = self._lookup_container(account, container)
600         path = '/'.join((path, name))
601         node = self.node.node_lookup(path)
602         if node is None:
603             node = self.node.node_create(parent, path)
604         return path, node
605     
606     def _put_path(self, user, parent, path):
607         node = self.node.node_create(parent, path)
608         self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
609         return node
610     
611     def _lookup_account(self, account, create=True):
612         node = self.node.node_lookup(account)
613         if node is None and create:
614             node = self._put_path(account, self.ROOTNODE, account) # User is account.
615         return account, node
616     
617     def _lookup_container(self, account, container):
618         path = '/'.join((account, container))
619         node = self.node.node_lookup(path)
620         if node is None:
621             raise NameError('Container does not exist')
622         return path, node
623     
624     def _lookup_object(self, account, container, name):
625         path = '/'.join((account, container, name))
626         node = self.node.node_lookup(path)
627         if node is None:
628             raise NameError('Object does not exist')
629         return path, node
630     
631     def _get_properties(self, node, until=None):
632         """Return properties until the timestamp given."""
633         
634         before = until if until is not None else inf
635         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
636         if props is None and until is not None:
637             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
638         if props is None:
639             raise NameError('Path does not exist')
640         return props
641     
642     def _get_statistics(self, node, until=None):
643         """Return count, sum of size and latest timestamp of everything under node."""
644         
645         if until is None:
646             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
647         else:
648             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
649         if stats is None:
650             stats = (0, 0, 0)
651         return stats
652     
653     def _get_version(self, node, version=None):
654         if version is None:
655             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
656             if props is None:
657                 raise NameError('Object does not exist')
658         else:
659             props = self.node.version_get_properties(version)
660             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
661                 raise IndexError('Version does not exist')
662         return props
663     
664     def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
665         
666         # Get source serial and size.
667         if src_version is not None:
668             src_props = self._get_version(src_node, src_version)
669             src_version_id = src_props[self.SERIAL]
670             size = src_props[self.SIZE]
671         else:
672             # Latest or create from scratch.
673             try:
674                 src_props = self._get_version(src_node)
675                 src_version_id = src_props[self.SERIAL]
676                 size = src_props[self.SIZE]
677             except NameError:
678                 src_version_id = None
679                 size = 0
680         if dest_size is not None:
681             size = dest_size
682         
683         # Move the latest version at destination to CLUSTER_HISTORY and create new.
684         if src_node == dest_node and src_version is None and src_version_id is not None:
685             self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
686         else:
687             dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
688             if dest_props is not None:
689                 self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY)
690         dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
691         
692         return src_version_id, dest_version_id
693     
694     def _copy_data(self, src_version, dest_version):
695         hashmap = self.mapper.map_retr(src_version)
696         self.mapper.map_stor(dest_version, hashmap)
697     
698     def _get_metadata(self, version):
699         if version is None:
700             return {}
701         return dict(self.node.attribute_get(version))
702     
703     def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
704         """Create a new version and store metadata."""
705         
706         src_version_id, dest_version_id = self._copy_version(user, node, None, node)
707         if not replace:
708             if src_version_id is not None:
709                 self.node.attribute_copy(src_version_id, dest_version_id)
710             self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
711             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
712         else:
713             self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
714         if copy_data and src_version_id is not None:
715             self._copy_data(src_version_id, dest_version_id)
716         return dest_version_id
717     
718     def _list_limits(self, listing, marker, limit):
719         start = 0
720         if marker:
721             try:
722                 start = listing.index(marker) + 1
723             except ValueError:
724                 pass
725         if not limit or limit > 10000:
726             limit = 10000
727         return start, limit
728     
729     def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
730         cont_prefix = path + '/'
731         prefix = cont_prefix + prefix
732         start = cont_prefix + marker if marker else None
733         before = until if until is not None else inf
734         filterq = ','.join(keys) if keys else None
735         
736         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
737         objects.extend([(p, None) for p in prefixes] if virtual else [])
738         objects.sort(key=lambda x: x[0])
739         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
740         
741         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
742         return objects[start:start + limit]
743     
744     # Policy functions.
745     
746     def _check_policy(self, policy):
747         for k in policy.keys():
748             if policy[k] == '':
749                 policy[k] = self.default_policy.get(k)
750         for k, v in policy.iteritems():
751             if k == 'quota':
752                 q = int(v) # May raise ValueError.
753                 if q < 0:
754                     raise ValueError
755             elif k == 'versioning':
756                 if v not in ['auto', 'manual', 'none']:
757                     raise ValueError
758             else:
759                 raise ValueError
760     
761     # Access control functions.
762     
763     def _check_groups(self, groups):
764         # raise ValueError('Bad characters in groups')
765         pass
766     
767     def _check_permissions(self, path, permissions):
768         # raise ValueError('Bad characters in permissions')
769         
770         # Check for existing permissions.
771         paths = self.permissions.access_list(path)
772         if paths:
773             ae = AttributeError()
774             ae.data = paths
775             raise ae
776     
777     def _can_read(self, user, account, container, name):
778         if user == account:
779             return True
780         path = '/'.join((account, container, name))
781         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
782             raise NotAllowedError
783     
784     def _can_write(self, user, account, container, name):
785         if user == account:
786             return True
787         path = '/'.join((account, container, name))
788         if not self.permissions.access_check(path, self.WRITE, user):
789             raise NotAllowedError
790     
791     def _allowed_accounts(self, user):
792         allow = set()
793         for path in self.permissions.access_list_paths(user):
794             allow.add(path.split('/', 1)[0])
795         return sorted(allow)
796     
797     def _allowed_containers(self, user, account):
798         allow = set()
799         for path in self.permissions.access_list_paths(user, account):
800             allow.add(path.split('/', 2)[1])
801         return sorted(allow)