optimize sorts
[pithos] / snf-pithos-backend / pithos / backends / modular.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import sys
35 import os
36 import time
37 import uuid as uuidlib
38 import logging
39 import hashlib
40 import binascii
41
42 from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
43
44 # Stripped-down version of the HashMap class found in tools.
45 class HashMap(list):
46
47     def __init__(self, blocksize, blockhash):
48         super(HashMap, self).__init__()
49         self.blocksize = blocksize
50         self.blockhash = blockhash
51
52     def _hash_raw(self, v):
53         h = hashlib.new(self.blockhash)
54         h.update(v)
55         return h.digest()
56
57     def hash(self):
58         if len(self) == 0:
59             return self._hash_raw('')
60         if len(self) == 1:
61             return self.__getitem__(0)
62
63         h = list(self)
64         s = 2
65         while s < len(h):
66             s = s * 2
67         h += [('\x00' * len(h[0]))] * (s - len(h))
68         while len(h) > 1:
69             h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
70         return h[0]
71
72 # Default modules and settings.
73 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
74 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
75 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
76 DEFAULT_BLOCK_PATH = 'data/'
77 DEFAULT_BLOCK_UMASK = 0o022
78 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
79 #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
80
81 QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
82 QUEUE_CLIENT_ID = 'pithos'
83 QUEUE_INSTANCE_ID = '1'
84
85 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
86
87 inf = float('inf')
88
89 ULTIMATE_ANSWER = 42
90
91
92 logger = logging.getLogger(__name__)
93
94
95 def backend_method(func=None, autocommit=1):
96     if func is None:
97         def fn(func):
98             return backend_method(func, autocommit)
99         return fn
100
101     if not autocommit:
102         return func
103     def fn(self, *args, **kw):
104         self.wrapper.execute()
105         try:
106             self.messages = []
107             ret = func(self, *args, **kw)
108             for m in self.messages:
109                 self.queue.send(*m)
110             self.wrapper.commit()
111             return ret
112         except:
113             self.wrapper.rollback()
114             raise
115     return fn
116
117
118 class ModularBackend(BaseBackend):
119     """A modular backend.
120     
121     Uses modules for SQL functions and storage.
122     """
123     
124     def __init__(self, db_module=None, db_connection=None,
125                  block_module=None, block_path=None, block_umask=None,
126                  queue_module=None, queue_connection=None):
127         db_module = db_module or DEFAULT_DB_MODULE
128         db_connection = db_connection or DEFAULT_DB_CONNECTION
129         block_module = block_module or DEFAULT_BLOCK_MODULE
130         block_path = block_path or DEFAULT_BLOCK_PATH
131         block_umask = block_umask or DEFAULT_BLOCK_UMASK
132         #queue_module = queue_module or DEFAULT_QUEUE_MODULE
133         #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
134         
135         self.hash_algorithm = 'sha256'
136         self.block_size = 4 * 1024 * 1024 # 4MB
137         
138         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
139         
140         def load_module(m):
141             __import__(m)
142             return sys.modules[m]
143         
144         self.db_module = load_module(db_module)
145         self.wrapper = self.db_module.DBWrapper(db_connection)
146         params = {'wrapper': self.wrapper}
147         self.permissions = self.db_module.Permissions(**params)
148         for x in ['READ', 'WRITE']:
149             setattr(self, x, getattr(self.db_module, x))
150         self.node = self.db_module.Node(**params)
151         for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
152             setattr(self, x, getattr(self.db_module, x))
153         
154         self.block_module = load_module(block_module)
155         params = {'path': block_path,
156                   'block_size': self.block_size,
157                   'hash_algorithm': self.hash_algorithm,
158                   'umask': block_umask}
159         self.store = self.block_module.Store(**params)
160
161         if queue_module and queue_connection:
162             self.queue_module = load_module(queue_module)
163             params = {'exchange': queue_connection,
164                       'client_id': QUEUE_CLIENT_ID}
165             self.queue = self.queue_module.Queue(**params)
166         else:
167             class NoQueue:
168                 def send(self, *args):
169                     pass
170                 
171                 def close(self):
172                     pass
173             
174             self.queue = NoQueue()
175     
176     def close(self):
177         self.wrapper.close()
178         self.queue.close()
179     
180     @backend_method
181     def list_accounts(self, user, marker=None, limit=10000):
182         """Return a list of accounts the user can access."""
183         
184         logger.debug("list_accounts: %s %s %s", user, marker, limit)
185         allowed = self._allowed_accounts(user)
186         start, limit = self._list_limits(allowed, marker, limit)
187         return allowed[start:start + limit]
188     
189     @backend_method
190     def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
191         """Return a dictionary with the account metadata for the domain."""
192         
193         logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
194         path, node = self._lookup_account(account, user == account)
195         if user != account:
196             if until or node is None or account not in self._allowed_accounts(user):
197                 raise NotAllowedError
198         try:
199             props = self._get_properties(node, until)
200             mtime = props[self.MTIME]
201         except NameError:
202             props = None
203             mtime = until
204         count, bytes, tstamp = self._get_statistics(node, until)
205         tstamp = max(tstamp, mtime)
206         if until is None:
207             modified = tstamp
208         else:
209             modified = self._get_statistics(node)[2] # Overall last modification.
210             modified = max(modified, mtime)
211         
212         if user != account:
213             meta = {'name': account}
214         else:
215             meta = {}
216             if props is not None and include_user_defined:
217                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
218             if until is not None:
219                 meta.update({'until_timestamp': tstamp})
220             meta.update({'name': account, 'count': count, 'bytes': bytes})
221         meta.update({'modified': modified})
222         return meta
223     
224     @backend_method
225     def update_account_meta(self, user, account, domain, meta, replace=False):
226         """Update the metadata associated with the account for the domain."""
227         
228         logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
229         if user != account:
230             raise NotAllowedError
231         path, node = self._lookup_account(account, True)
232         self._put_metadata(user, node, domain, meta, replace)
233     
234     @backend_method
235     def get_account_groups(self, user, account):
236         """Return a dictionary with the user groups defined for this account."""
237         
238         logger.debug("get_account_groups: %s %s", user, account)
239         if user != account:
240             if account not in self._allowed_accounts(user):
241                 raise NotAllowedError
242             return {}
243         self._lookup_account(account, True)
244         return self.permissions.group_dict(account)
245     
246     @backend_method
247     def update_account_groups(self, user, account, groups, replace=False):
248         """Update the groups associated with the account."""
249         
250         logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
251         if user != account:
252             raise NotAllowedError
253         self._lookup_account(account, True)
254         self._check_groups(groups)
255         if replace:
256             self.permissions.group_destroy(account)
257         for k, v in groups.iteritems():
258             if not replace: # If not already deleted.
259                 self.permissions.group_delete(account, k)
260             if v:
261                 self.permissions.group_addmany(account, k, v)
262     
263     @backend_method
264     def get_account_policy(self, user, account):
265         """Return a dictionary with the account policy."""
266         
267         logger.debug("get_account_policy: %s %s", user, account)
268         if user != account:
269             if account not in self._allowed_accounts(user):
270                 raise NotAllowedError
271             return {}
272         path, node = self._lookup_account(account, True)
273         return self._get_policy(node)
274     
275     @backend_method
276     def update_account_policy(self, user, account, policy, replace=False):
277         """Update the policy associated with the account."""
278         
279         logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
280         if user != account:
281             raise NotAllowedError
282         path, node = self._lookup_account(account, True)
283         self._check_policy(policy)
284         self._put_policy(node, policy, replace)
285     
286     @backend_method
287     def put_account(self, user, account, policy={}):
288         """Create a new account with the given name."""
289         
290         logger.debug("put_account: %s %s %s", user, account, policy)
291         if user != account:
292             raise NotAllowedError
293         node = self.node.node_lookup(account)
294         if node is not None:
295             raise NameError('Account already exists')
296         if policy:
297             self._check_policy(policy)
298         node = self._put_path(user, self.ROOTNODE, account)
299         self._put_policy(node, policy, True)
300     
301     @backend_method
302     def delete_account(self, user, account):
303         """Delete the account with the given name."""
304         
305         logger.debug("delete_account: %s %s", user, account)
306         if user != account:
307             raise NotAllowedError
308         node = self.node.node_lookup(account)
309         if node is None:
310             return
311         if not self.node.node_remove(node):
312             raise IndexError('Account is not empty')
313         self.permissions.group_destroy(account)
314     
315     @backend_method
316     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
317         """Return a list of containers existing under an account."""
318         
319         logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
320         if user != account:
321             if until or account not in self._allowed_accounts(user):
322                 raise NotAllowedError
323             allowed = self._allowed_containers(user, account)
324             start, limit = self._list_limits(allowed, marker, limit)
325             return allowed[start:start + limit]
326         if shared or public:
327             allowed = set()
328             if shared:
329                 allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
330             if public:
331                 allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
332             allowed = sorted(allowed)
333             start, limit = self._list_limits(allowed, marker, limit)
334             return allowed[start:start + limit]
335         node = self.node.node_lookup(account)
336         containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
337         start, limit = self._list_limits([x[0] for x in containers], marker, limit)
338         return containers[start:start + limit]
339     
340     @backend_method
341     def list_container_meta(self, user, account, container, domain, until=None):
342         """Return a list with all the container's object meta keys for the domain."""
343         
344         logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
345         allowed = []
346         if user != account:
347             if until:
348                 raise NotAllowedError
349             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
350             if not allowed:
351                 raise NotAllowedError
352         path, node = self._lookup_container(account, container)
353         before = until if until is not None else inf
354         allowed = self._get_formatted_paths(allowed)
355         return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
356     
357     @backend_method
358     def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
359         """Return a dictionary with the container metadata for the domain."""
360         
361         logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
362         if user != account:
363             if until or container not in self._allowed_containers(user, account):
364                 raise NotAllowedError
365         path, node = self._lookup_container(account, container)
366         props = self._get_properties(node, until)
367         mtime = props[self.MTIME]
368         count, bytes, tstamp = self._get_statistics(node, until)
369         tstamp = max(tstamp, mtime)
370         if until is None:
371             modified = tstamp
372         else:
373             modified = self._get_statistics(node)[2] # Overall last modification.
374             modified = max(modified, mtime)
375         
376         if user != account:
377             meta = {'name': container}
378         else:
379             meta = {}
380             if include_user_defined:
381                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
382             if until is not None:
383                 meta.update({'until_timestamp': tstamp})
384             meta.update({'name': container, 'count': count, 'bytes': bytes})
385         meta.update({'modified': modified})
386         return meta
387     
388     @backend_method
389     def update_container_meta(self, user, account, container, domain, meta, replace=False):
390         """Update the metadata associated with the container for the domain."""
391         
392         logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
393         if user != account:
394             raise NotAllowedError
395         path, node = self._lookup_container(account, container)
396         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
397         if src_version_id is not None:
398             versioning = self._get_policy(node)['versioning']
399             if versioning != 'auto':
400                 self.node.version_remove(src_version_id)
401     
402     @backend_method
403     def get_container_policy(self, user, account, container):
404         """Return a dictionary with the container policy."""
405         
406         logger.debug("get_container_policy: %s %s %s", user, account, container)
407         if user != account:
408             if container not in self._allowed_containers(user, account):
409                 raise NotAllowedError
410             return {}
411         path, node = self._lookup_container(account, container)
412         return self._get_policy(node)
413     
414     @backend_method
415     def update_container_policy(self, user, account, container, policy, replace=False):
416         """Update the policy associated with the container."""
417         
418         logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
419         if user != account:
420             raise NotAllowedError
421         path, node = self._lookup_container(account, container)
422         self._check_policy(policy)
423         self._put_policy(node, policy, replace)
424     
425     @backend_method
426     def put_container(self, user, account, container, policy={}):
427         """Create a new container with the given name."""
428         
429         logger.debug("put_container: %s %s %s %s", user, account, container, policy)
430         if user != account:
431             raise NotAllowedError
432         try:
433             path, node = self._lookup_container(account, container)
434         except NameError:
435             pass
436         else:
437             raise NameError('Container already exists')
438         if policy:
439             self._check_policy(policy)
440         path = '/'.join((account, container))
441         node = self._put_path(user, self._lookup_account(account, True)[1], path)
442         self._put_policy(node, policy, True)
443     
444     @backend_method
445     def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
446         """Delete/purge the container with the given name."""
447         
448         logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
449         if user != account:
450             raise NotAllowedError
451         path, node = self._lookup_container(account, container)
452         
453         if until is not None:
454             hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
455             for h in hashes:
456                 self.store.map_delete(h)
457             self.node.node_purge_children(node, until, CLUSTER_DELETED)
458             self._report_size_change(user, account, -size, {'action': 'container purge'})
459             return
460         
461         if self._get_statistics(node)[0] > 0:
462             raise IndexError('Container is not empty')
463         hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
464         for h in hashes:
465             self.store.map_delete(h)
466         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
467         self.node.node_remove(node)
468         self._report_size_change(user, account, -size, {'action': 'container delete'})
469     
470     def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
471         if user != account and until:
472             raise NotAllowedError
473         if shared and public:
474             # get shared first
475             shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
476             objects = []
477             if shared:
478                 path, node = self._lookup_container(account, container)
479                 shared = self._get_formatted_paths(shared)
480                 objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)
481             
482             # get public
483             objects.extend(self._list_public_object_properties(user, account, container, prefix, all_props))
484             
485             objects.sort(key=lambda x: x[0])
486             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
487             return objects[start:start + limit]
488         elif public:
489             objects = self._list_public_object_properties(user, account, container, prefix, all_props)
490             start, limit = self._list_limits([x[0] for x in objects], marker, limit)
491             return objects[start:start + limit]
492         
493         allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
494         if shared and not allowed:
495             return []
496         path, node = self._lookup_container(account, container)
497         allowed = self._get_formatted_paths(allowed)
498         objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
499         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
500         return objects[start:start + limit]
501     
502     def _list_public_object_properties(self, user, account, container, prefix, all_props):
503         public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
504         paths, nodes = self._lookup_objects(public)
505         path = '/'.join((account, container))
506         cont_prefix = path + '/'
507         paths = [x[len(cont_prefix):] for x in paths]
508         props = self.node.version_lookup_bulk(nodes, all_props=all_props)
509         objects = [(path,) + props for path, props in zip(paths, props)]
510         return objects
511         
512     def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
513         objects = []
514         while True:
515             marker = objects[-1] if objects else None
516             limit = 10000
517             l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
518             objects.extend(l)
519             if not l or len(l) < limit:
520                 break
521         return objects
522     
523     def _list_object_permissions(self, user, account, container, prefix, shared, public):
524         allowed = []
525         path = '/'.join((account, container, prefix)).rstrip('/')
526         if user != account:
527             allowed = self.permissions.access_list_paths(user, path)
528             if not allowed:
529                 raise NotAllowedError
530         else:
531             allowed = set()
532             if shared:
533                 allowed.update(self.permissions.access_list_shared(path))
534             if public:
535                 allowed.update([x[0] for x in self.permissions.public_list(path)])
536             allowed = sorted(allowed)
537             if not allowed:
538                 return []
539         return allowed
540     
541     @backend_method
542     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
543         """Return a list of object (name, version_id) tuples existing under a container."""
544         
545         logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
546         return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
547     
548     @backend_method
549     def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
550         """Return a list of object metadata dicts existing under a container."""
551         
552         logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
553         props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
554         objects = []
555         for p in props:
556             if len(p) == 2:
557                 objects.append({'subdir': p[0]})
558             else:
559                 objects.append({'name': p[0],
560                                 'bytes': p[self.SIZE + 1],
561                                 'type': p[self.TYPE + 1],
562                                 'hash': p[self.HASH + 1],
563                                 'version': p[self.SERIAL + 1],
564                                 'version_timestamp': p[self.MTIME + 1],
565                                 'modified': p[self.MTIME + 1] if until is None else None,
566                                 'modified_by': p[self.MUSER + 1],
567                                 'uuid': p[self.UUID + 1],
568                                 'checksum': p[self.CHECKSUM + 1]})
569         return objects
570     
571     @backend_method
572     def list_object_permissions(self, user, account, container, prefix=''):
573         """Return a list of paths that enforce permissions under a container."""
574         
575         logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
576         return self._list_object_permissions(user, account, container, prefix, True, False)
577     
578     @backend_method
579     def list_object_public(self, user, account, container, prefix=''):
580         """Return a dict mapping paths to public ids for objects that are public under a container."""
581         
582         logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
583         public = {}
584         for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
585             public[path] = p + ULTIMATE_ANSWER
586         return public
587     
588     @backend_method
589     def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
590         """Return a dictionary with the object metadata for the domain."""
591         
592         logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
593         self._can_read(user, account, container, name)
594         path, node = self._lookup_object(account, container, name)
595         props = self._get_version(node, version)
596         if version is None:
597             modified = props[self.MTIME]
598         else:
599             try:
600                 modified = self._get_version(node)[self.MTIME] # Overall last modification.
601             except NameError: # Object may be deleted.
602                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
603                 if del_props is None:
604                     raise NameError('Object does not exist')
605                 modified = del_props[self.MTIME]
606         
607         meta = {}
608         if include_user_defined:
609             meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
610         meta.update({'name': name,
611                      'bytes': props[self.SIZE],
612                      'type': props[self.TYPE],
613                      'hash': props[self.HASH],
614                      'version': props[self.SERIAL],
615                      'version_timestamp': props[self.MTIME],
616                      'modified': modified,
617                      'modified_by': props[self.MUSER],
618                      'uuid': props[self.UUID],
619                      'checksum': props[self.CHECKSUM]})
620         return meta
621     
622     @backend_method
623     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
624         """Update the metadata associated with the object for the domain and return the new version."""
625         
626         logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
627         self._can_write(user, account, container, name)
628         path, node = self._lookup_object(account, container, name)
629         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
630         self._apply_versioning(account, container, src_version_id)
631         return dest_version_id
632     
633     @backend_method
634     def get_object_permissions(self, user, account, container, name):
635         """Return the action allowed on the object, the path
636         from which the object gets its permissions from,
637         along with a dictionary containing the permissions."""
638         
639         logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
640         allowed = 'write'
641         permissions_path = self._get_permissions_path(account, container, name)
642         if user != account:
643             if self.permissions.access_check(permissions_path, self.WRITE, user):
644                 allowed = 'write'
645             elif self.permissions.access_check(permissions_path, self.READ, user):
646                 allowed = 'read'
647             else:
648                 raise NotAllowedError
649         self._lookup_object(account, container, name)
650         return (allowed, permissions_path, self.permissions.access_get(permissions_path))
651     
652     @backend_method
653     def update_object_permissions(self, user, account, container, name, permissions):
654         """Update the permissions associated with the object."""
655         
656         logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
657         if user != account:
658             raise NotAllowedError
659         path = self._lookup_object(account, container, name)[0]
660         self._check_permissions(path, permissions)
661         self.permissions.access_set(path, permissions)
662         self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
663     
664     @backend_method
665     def get_object_public(self, user, account, container, name):
666         """Return the public id of the object if applicable."""
667         
668         logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
669         self._can_read(user, account, container, name)
670         path = self._lookup_object(account, container, name)[0]
671         p = self.permissions.public_get(path)
672         if p is not None:
673             p += ULTIMATE_ANSWER
674         return p
675     
676     @backend_method
677     def update_object_public(self, user, account, container, name, public):
678         """Update the public status of the object."""
679         
680         logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
681         self._can_write(user, account, container, name)
682         path = self._lookup_object(account, container, name)[0]
683         if not public:
684             self.permissions.public_unset(path)
685         else:
686             self.permissions.public_set(path)
687     
688     @backend_method
689     def get_object_hashmap(self, user, account, container, name, version=None):
690         """Return the object's size and a list with partial hashes."""
691         
692         logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
693         self._can_read(user, account, container, name)
694         path, node = self._lookup_object(account, container, name)
695         props = self._get_version(node, version)
696         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
697         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
698     
699     def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
700         if permissions is not None and user != account:
701             raise NotAllowedError
702         self._can_write(user, account, container, name)
703         if permissions is not None:
704             path = '/'.join((account, container, name))
705             self._check_permissions(path, permissions)
706         
707         account_path, account_node = self._lookup_account(account, True)
708         container_path, container_node = self._lookup_container(account, container)
709         path, node = self._put_object_node(container_path, container_node, name)
710         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
711         
712         # Handle meta.
713         if src_version_id is None:
714             src_version_id = pre_version_id
715         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
716         
717         # Check quota.
718         del_size = self._apply_versioning(account, container, pre_version_id)
719         size_delta = size - del_size
720         if size_delta > 0:
721             account_quota = long(self._get_policy(account_node)['quota'])
722             container_quota = long(self._get_policy(container_node)['quota'])
723             if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
724                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
725                 # This must be executed in a transaction, so the version is never created if it fails.
726                 raise QuotaError
727         self._report_size_change(user, account, size_delta, {'action': 'object update'})
728         
729         if permissions is not None:
730             self.permissions.access_set(path, permissions)
731             self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
732         
733         self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
734         return dest_version_id
735     
736     @backend_method
737     def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
738         """Create/update an object with the specified size and partial hashes."""
739         
740         logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
741         if size == 0: # No such thing as an empty hashmap.
742             hashmap = [self.put_block('')]
743         map = HashMap(self.block_size, self.hash_algorithm)
744         map.extend([binascii.unhexlify(x) for x in hashmap])
745         missing = self.store.block_search(map)
746         if missing:
747             ie = IndexError()
748             ie.data = [binascii.hexlify(x) for x in missing]
749             raise ie
750         
751         hash = map.hash()
752         dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
753         self.store.map_put(hash, map)
754         return dest_version_id
755     
756     @backend_method
757     def update_object_checksum(self, user, account, container, name, version, checksum):
758         """Update an object's checksum."""
759         
760         logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
761         # Update objects with greater version and same hashmap and size (fix metadata updates).
762         self._can_write(user, account, container, name)
763         path, node = self._lookup_object(account, container, name)
764         props = self._get_version(node, version)
765         versions = self.node.node_get_versions(node)
766         for x in versions:
767             if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
768                 self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
769     
770     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
771         dest_version_ids = []
772         self._can_read(user, src_account, src_container, src_name)
773         path, node = self._lookup_object(src_account, src_container, src_name)
774         # TODO: Will do another fetch of the properties in duplicate version...
775         props = self._get_version(node, src_version) # Check to see if source exists.
776         src_version_id = props[self.SERIAL]
777         hash = props[self.HASH]
778         size = props[self.SIZE]
779         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
780         dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
781         if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
782                 self._delete_object(user, src_account, src_container, src_name)
783         
784         if delimiter:
785             prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
786             src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
787             paths = [elem[0] for elem in src_names]
788             nodes = [elem[2] for elem in src_names]
789             # TODO: Will do another fetch of the properties in duplicate version...
790             props = self._get_versions(nodes) # Check to see if source exists.
791             
792             for prop, path, node in zip(props, paths, nodes):
793                 src_version_id = prop[self.SERIAL]
794                 hash = prop[self.HASH]
795                 vtype = prop[self.TYPE]
796                 dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
797                 vdest_name = path.replace(prefix, dest_prefix, 1)
798                 dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
799                 if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
800                         self._delete_object(user, src_account, src_container, path)
801         return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
802     
803     @backend_method
804     def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
805         """Copy an object's data and metadata."""
806         
807         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
808         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
809         return dest_version_id
810     
811     @backend_method
812     def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
813         """Move an object's data and metadata."""
814         
815         logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
816         if user != src_account:
817             raise NotAllowedError
818         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
819         return dest_version_id
820     
821     def _delete_object(self, user, account, container, name, until=None, delimiter=None):
822         if user != account:
823             raise NotAllowedError
824         
825         if until is not None:
826             path = '/'.join((account, container, name))
827             node = self.node.node_lookup(path)
828             if node is None:
829                 return
830             hashes = []
831             size = 0
832             h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
833             hashes += h
834             size += s
835             h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
836             hashes += h
837             size += s
838             for h in hashes:
839                 self.store.map_delete(h)
840             self.node.node_purge(node, until, CLUSTER_DELETED)
841             try:
842                 props = self._get_version(node)
843             except NameError:
844                 self.permissions.access_clear(path)
845             self._report_size_change(user, account, -size, {'action': 'object purge'})
846             return
847         
848         path, node = self._lookup_object(account, container, name)
849         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
850         del_size = self._apply_versioning(account, container, src_version_id)
851         if del_size:
852             self._report_size_change(user, account, -del_size, {'action': 'object delete'})
853         self._report_object_change(user, account, path, details={'action': 'object delete'})
854         self.permissions.access_clear(path)
855         
856         if delimiter:
857             prefix = name + delimiter if not name.endswith(delimiter) else name
858             src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
859             paths = []
860             for t in src_names:
861                 path = '/'.join((account, container, t[0]))
862                 node = t[2]
863                 src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
864                 del_size = self._apply_versioning(account, container, src_version_id)
865                 if del_size:
866                     self._report_size_change(user, account, -del_size, {'action': 'object delete'})
867                 self._report_object_change(user, account, path, details={'action': 'object delete'})
868                 paths.append(path)
869             self.permissions.access_clear_bulk(paths)
870     
871     @backend_method
872     def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
873         """Delete/purge an object."""
874         
875         logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
876         self._delete_object(user, account, container, name, until, delimiter)
877     
878     @backend_method
879     def list_versions(self, user, account, container, name):
880         """Return a list of all (version, version_timestamp) tuples for an object."""
881         
882         logger.debug("list_versions: %s %s %s %s", user, account, container, name)
883         self._can_read(user, account, container, name)
884         path, node = self._lookup_object(account, container, name)
885         versions = self.node.node_get_versions(node)
886         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
887     
888     @backend_method
889     def get_uuid(self, user, uuid):
890         """Return the (account, container, name) for the UUID given."""
891         
892         logger.debug("get_uuid: %s %s", user, uuid)
893         info = self.node.latest_uuid(uuid)
894         if info is None:
895             raise NameError
896         path, serial = info
897         account, container, name = path.split('/', 2)
898         self._can_read(user, account, container, name)
899         return (account, container, name)
900     
901     @backend_method
902     def get_public(self, user, public):
903         """Return the (account, container, name) for the public id given."""
904         
905         logger.debug("get_public: %s %s", user, public)
906         if public is None or public < ULTIMATE_ANSWER:
907             raise NameError
908         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
909         if path is None:
910             raise NameError
911         account, container, name = path.split('/', 2)
912         self._can_read(user, account, container, name)
913         return (account, container, name)
914     
915     @backend_method(autocommit=0)
916     def get_block(self, hash):
917         """Return a block's data."""
918         
919         logger.debug("get_block: %s", hash)
920         block = self.store.block_get(binascii.unhexlify(hash))
921         if not block:
922             raise NameError('Block does not exist')
923         return block
924     
925     @backend_method(autocommit=0)
926     def put_block(self, data):
927         """Store a block and return the hash."""
928         
929         logger.debug("put_block: %s", len(data))
930         return binascii.hexlify(self.store.block_put(data))
931     
932     @backend_method(autocommit=0)
933     def update_block(self, hash, data, offset=0):
934         """Update a known block and return the hash."""
935         
936         logger.debug("update_block: %s %s %s", hash, len(data), offset)
937         if offset == 0 and len(data) == self.block_size:
938             return self.put_block(data)
939         h = self.store.block_update(binascii.unhexlify(hash), offset, data)
940         return binascii.hexlify(h)
941     
942     # Path functions.
943     
944     def _generate_uuid(self):
945         return str(uuidlib.uuid4())
946     
947     def _put_object_node(self, path, parent, name):
948         path = '/'.join((path, name))
949         node = self.node.node_lookup(path)
950         if node is None:
951             node = self.node.node_create(parent, path)
952         return path, node
953     
954     def _put_path(self, user, parent, path):
955         node = self.node.node_create(parent, path)
956         self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
957         return node
958     
959     def _lookup_account(self, account, create=True):
960         node = self.node.node_lookup(account)
961         if node is None and create:
962             node = self._put_path(account, self.ROOTNODE, account) # User is account.
963         return account, node
964     
965     def _lookup_container(self, account, container):
966         path = '/'.join((account, container))
967         node = self.node.node_lookup(path)
968         if node is None:
969             raise NameError('Container does not exist')
970         return path, node
971     
972     def _lookup_object(self, account, container, name):
973         path = '/'.join((account, container, name))
974         node = self.node.node_lookup(path)
975         if node is None:
976             raise NameError('Object does not exist')
977         return path, node
978     
979     def _lookup_objects(self, paths):
980         nodes = self.node.node_lookup_bulk(paths)
981         if nodes is None:
982             raise NameError('Object does not exist')
983         return paths, nodes
984     
985     def _get_properties(self, node, until=None):
986         """Return properties until the timestamp given."""
987         
988         before = until if until is not None else inf
989         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
990         if props is None and until is not None:
991             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
992         if props is None:
993             raise NameError('Path does not exist')
994         return props
995     
996     def _get_statistics(self, node, until=None):
997         """Return count, sum of size and latest timestamp of everything under node."""
998         
999         if until is None:
1000             stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1001         else:
1002             stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
1003         if stats is None:
1004             stats = (0, 0, 0)
1005         return stats
1006     
1007     def _get_version(self, node, version=None):
1008         if version is None:
1009             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1010             if props is None:
1011                 raise NameError('Object does not exist')
1012         else:
1013             try:
1014                 version = int(version)
1015             except ValueError:
1016                 raise IndexError('Version does not exist')
1017             props = self.node.version_get_properties(version)
1018             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1019                 raise IndexError('Version does not exist')
1020         return props
1021
1022     def _get_versions(self, nodes, version=None):
1023         if version is None:
1024             props = self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1025             if not props:
1026                 raise NameError('Object does not exist')
1027         else:
1028             try:
1029                 version = int(version)
1030             except ValueError:
1031                 raise IndexError('Version does not exist')
1032             props = self.node.version_get_properties(version)
1033             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
1034                 raise IndexError('Version does not exist')
1035         return props
1036     
1037     def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1038         """Create a new version of the node."""
1039         
1040         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1041         if props is not None:
1042             src_version_id = props[self.SERIAL]
1043             src_hash = props[self.HASH]
1044             src_size = props[self.SIZE]
1045             src_type = props[self.TYPE]
1046             src_checksum = props[self.CHECKSUM]
1047         else:
1048             src_version_id = None
1049             src_hash = None
1050             src_size = 0
1051             src_type = ''
1052             src_checksum = ''
1053         if size is None: # Set metadata.
1054             hash = src_hash # This way hash can be set to None (account or container).
1055             size = src_size
1056         if type is None:
1057             type = src_type
1058         if checksum is None:
1059             checksum = src_checksum
1060         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1061         
1062         if src_node is None:
1063             pre_version_id = src_version_id
1064         else:
1065             pre_version_id = None
1066             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1067             if props is not None:
1068                 pre_version_id = props[self.SERIAL]
1069         if pre_version_id is not None:
1070             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1071         
1072         dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1073         return pre_version_id, dest_version_id
1074     
1075     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1076         if src_version_id is not None:
1077             self.node.attribute_copy(src_version_id, dest_version_id)
1078         if not replace:
1079             self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
1080             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
1081         else:
1082             self.node.attribute_del(dest_version_id, domain)
1083             self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1084     
1085     def _put_metadata(self, user, node, domain, meta, replace=False):
1086         """Create a new version and store metadata."""
1087         
1088         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1089         self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1090         return src_version_id, dest_version_id
1091     
1092     def _list_limits(self, listing, marker, limit):
1093         start = 0
1094         if marker:
1095             try:
1096                 start = listing.index(marker) + 1
1097             except ValueError:
1098                 pass
1099         if not limit or limit > 10000:
1100             limit = 10000
1101         return start, limit
1102     
1103     def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1104         cont_prefix = path + '/'
1105         prefix = cont_prefix + prefix
1106         start = cont_prefix + marker if marker else None
1107         before = until if until is not None else inf
1108         filterq = keys if domain else []
1109         sizeq = size_range
1110         
1111         objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1112         objects.extend([(p, None) for p in prefixes] if virtual else [])
1113         objects.sort(key=lambda x: x[0])
1114         objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1115         return objects
1116         
1117     # Reporting functions.
1118     
1119     def _report_size_change(self, user, account, size, details={}):
1120         logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1121         account_node = self._lookup_account(account, True)[1]
1122         total = self._get_statistics(account_node)[1]
1123         details.update({'user': user, 'total': total})
1124         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1125     
1126     def _report_object_change(self, user, account, path, details={}):
1127         logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1128         details.update({'user': user})
1129         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1130     
1131     def _report_sharing_change(self, user, account, path, details={}):
1132         logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1133         details.update({'user': user})
1134         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1135     
1136     # Policy functions.
1137     
1138     def _check_policy(self, policy):
1139         for k in policy.keys():
1140             if policy[k] == '':
1141                 policy[k] = self.default_policy.get(k)
1142         for k, v in policy.iteritems():
1143             if k == 'quota':
1144                 q = int(v) # May raise ValueError.
1145                 if q < 0:
1146                     raise ValueError
1147             elif k == 'versioning':
1148                 if v not in ['auto', 'none']:
1149                     raise ValueError
1150             else:
1151                 raise ValueError
1152     
1153     def _put_policy(self, node, policy, replace):
1154         if replace:
1155             for k, v in self.default_policy.iteritems():
1156                 if k not in policy:
1157                     policy[k] = v
1158         self.node.policy_set(node, policy)
1159     
1160     def _get_policy(self, node):
1161         policy = self.default_policy.copy()
1162         policy.update(self.node.policy_get(node))
1163         return policy
1164     
1165     def _apply_versioning(self, account, container, version_id):
1166         """Delete the provided version if such is the policy.
1167            Return size of object removed.
1168         """
1169         
1170         if version_id is None:
1171             return 0
1172         path, node = self._lookup_container(account, container)
1173         versioning = self._get_policy(node)['versioning']
1174         if versioning != 'auto':
1175             hash, size = self.node.version_remove(version_id)
1176             self.store.map_delete(hash)
1177             return size
1178         return 0
1179     
1180     # Access control functions.
1181     
1182     def _check_groups(self, groups):
1183         # raise ValueError('Bad characters in groups')
1184         pass
1185     
1186     def _check_permissions(self, path, permissions):
1187         # raise ValueError('Bad characters in permissions')
1188         pass
1189     
1190     def _get_formatted_paths(self, paths):
1191         formatted = []
1192         for p in paths:
1193             node = self.node.node_lookup(p)
1194             if node is not None:
1195                 props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1196             if props is not None:
1197                 if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1198                     formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1199                 formatted.append((p, self.MATCH_EXACT))
1200         return formatted
1201     
1202     def _get_permissions_path(self, account, container, name):
1203         path = '/'.join((account, container, name))
1204         permission_paths = self.permissions.access_inherit(path)
1205         permission_paths.sort()
1206         permission_paths.reverse()
1207         for p in permission_paths:
1208             if p == path:
1209                 return p
1210             else:
1211                 if p.count('/') < 2:
1212                     continue
1213                 node = self.node.node_lookup(p)
1214                 if node is not None:
1215                     props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
1216                 if props is not None:
1217                     if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1218                         return p
1219         return None
1220     
1221     def _can_read(self, user, account, container, name):
1222         if user == account:
1223             return True
1224         path = '/'.join((account, container, name))
1225         if self.permissions.public_get(path) is not None:
1226             return True
1227         path = self._get_permissions_path(account, container, name)
1228         if not path:
1229             raise NotAllowedError
1230         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1231             raise NotAllowedError
1232     
1233     def _can_write(self, user, account, container, name):
1234         if user == account:
1235             return True
1236         path = '/'.join((account, container, name))
1237         path = self._get_permissions_path(account, container, name)
1238         if not path:
1239             raise NotAllowedError
1240         if not self.permissions.access_check(path, self.WRITE, user):
1241             raise NotAllowedError
1242     
1243     def _allowed_accounts(self, user):
1244         allow = set()
1245         for path in self.permissions.access_list_paths(user):
1246             allow.add(path.split('/', 1)[0])
1247         return sorted(allow)
1248     
1249     def _allowed_containers(self, user, account):
1250         allow = set()
1251         for path in self.permissions.access_list_paths(user, account):
1252             allow.add(path.split('/', 2)[1])
1253         return sorted(allow)