include path in messages sent to aquarium
[pithos] / snf-pithos-backend / pithos / backends / modular.py
index 950c6db..8b6458f 100644 (file)
@@ -36,17 +36,52 @@ import os
 import time
 import uuid as uuidlib
 import logging
+import hashlib
 import binascii
 
-from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
+from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \
+    AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists
 
-from pithos.lib.hashmap import HashMap
+# Stripped-down version of the HashMap class found in tools.
+class HashMap(list):
+
+    def __init__(self, blocksize, blockhash):
+        super(HashMap, self).__init__()
+        self.blocksize = blocksize
+        self.blockhash = blockhash
+
+    def _hash_raw(self, v):
+        h = hashlib.new(self.blockhash)
+        h.update(v)
+        return h.digest()
+
+    def hash(self):
+        if len(self) == 0:
+            return self._hash_raw('')
+        if len(self) == 1:
+            return self.__getitem__(0)
+
+        h = list(self)
+        s = 2
+        while s < len(h):
+            s = s * 2
+        h += [('\x00' * len(h[0]))] * (s - len(h))
+        while len(h) > 1:
+            h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
+        return h[0]
 
 # Default modules and settings.
 DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
 DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
 DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
 DEFAULT_BLOCK_PATH = 'data/'
+DEFAULT_BLOCK_UMASK = 0o022
+#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
+#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+
+QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
+QUEUE_CLIENT_ID = 'pithos'
+QUEUE_INSTANCE_ID = '1'
 
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
 
@@ -69,7 +104,10 @@ def backend_method(func=None, autocommit=1):
     def fn(self, *args, **kw):
         self.wrapper.execute()
         try:
+            self.messages = []
             ret = func(self, *args, **kw)
+            for m in self.messages:
+                self.queue.send(*m)
             self.wrapper.commit()
             return ret
         except:
@@ -84,39 +122,61 @@ class ModularBackend(BaseBackend):
     Uses modules for SQL functions and storage.
     """
     
-    def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None):
+    def __init__(self, db_module=None, db_connection=None,
+                 block_module=None, block_path=None, block_umask=None,
+                 queue_module=None, queue_connection=None):
         db_module = db_module or DEFAULT_DB_MODULE
         db_connection = db_connection or DEFAULT_DB_CONNECTION
         block_module = block_module or DEFAULT_BLOCK_MODULE
         block_path = block_path or DEFAULT_BLOCK_PATH
+        block_umask = block_umask or DEFAULT_BLOCK_UMASK
+        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
+        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
         
         self.hash_algorithm = 'sha256'
         self.block_size = 4 * 1024 * 1024 # 4MB
         
         self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
         
-        __import__(db_module)
-        self.db_module = sys.modules[db_module]
-        self.wrapper = self.db_module.DBWrapper(db_connection)
+        def load_module(m):
+            __import__(m)
+            return sys.modules[m]
         
+        self.db_module = load_module(db_module)
+        self.wrapper = self.db_module.DBWrapper(db_connection)
         params = {'wrapper': self.wrapper}
         self.permissions = self.db_module.Permissions(**params)
         for x in ['READ', 'WRITE']:
             setattr(self, x, getattr(self.db_module, x))
         self.node = self.db_module.Node(**params)
-        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
+        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
             setattr(self, x, getattr(self.db_module, x))
         
-        __import__(block_module)
-        self.block_module = sys.modules[block_module]
-        
+        self.block_module = load_module(block_module)
         params = {'path': block_path,
                   'block_size': self.block_size,
-                  'hash_algorithm': self.hash_algorithm}
+                  'hash_algorithm': self.hash_algorithm,
+                  'umask': block_umask}
         self.store = self.block_module.Store(**params)
+
+        if queue_module and queue_connection:
+            self.queue_module = load_module(queue_module)
+            params = {'exchange': queue_connection,
+                      'client_id': QUEUE_CLIENT_ID}
+            self.queue = self.queue_module.Queue(**params)
+        else:
+            class NoQueue:
+                def send(self, *args):
+                    pass
+                
+                def close(self):
+                    pass
+            
+            self.queue = NoQueue()
     
     def close(self):
         self.wrapper.close()
+        self.queue.close()
     
     @backend_method
     def list_accounts(self, user, marker=None, limit=10000):
@@ -128,10 +188,10 @@ class ModularBackend(BaseBackend):
         return allowed[start:start + limit]
     
     @backend_method
-    def get_account_meta(self, user, account, domain, until=None):
+    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
         """Return a dictionary with the account metadata for the domain."""
         
-        logger.debug("get_account_meta: %s %s %s", account, domain, until)
+        logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
         path, node = self._lookup_account(account, user == account)
         if user != account:
             if until or node is None or account not in self._allowed_accounts(user):
@@ -154,7 +214,7 @@ class ModularBackend(BaseBackend):
             meta = {'name': account}
         else:
             meta = {}
-            if props is not None:
+            if props is not None and include_user_defined:
                 meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
             if until is not None:
                 meta.update({'until_timestamp': tstamp})
@@ -166,7 +226,7 @@ class ModularBackend(BaseBackend):
     def update_account_meta(self, user, account, domain, meta, replace=False):
         """Update the metadata associated with the account for the domain."""
         
-        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
+        logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_account(account, True)
@@ -176,7 +236,7 @@ class ModularBackend(BaseBackend):
     def get_account_groups(self, user, account):
         """Return a dictionary with the user groups defined for this account."""
         
-        logger.debug("get_account_groups: %s", account)
+        logger.debug("get_account_groups: %s %s", user, account)
         if user != account:
             if account not in self._allowed_accounts(user):
                 raise NotAllowedError
@@ -188,7 +248,7 @@ class ModularBackend(BaseBackend):
     def update_account_groups(self, user, account, groups, replace=False):
         """Update the groups associated with the account."""
         
-        logger.debug("update_account_groups: %s %s %s", account, groups, replace)
+        logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
         if user != account:
             raise NotAllowedError
         self._lookup_account(account, True)
@@ -205,7 +265,7 @@ class ModularBackend(BaseBackend):
     def get_account_policy(self, user, account):
         """Return a dictionary with the account policy."""
         
-        logger.debug("get_account_policy: %s", account)
+        logger.debug("get_account_policy: %s %s", user, account)
         if user != account:
             if account not in self._allowed_accounts(user):
                 raise NotAllowedError
@@ -217,7 +277,7 @@ class ModularBackend(BaseBackend):
     def update_account_policy(self, user, account, policy, replace=False):
         """Update the policy associated with the account."""
         
-        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
+        logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_account(account, True)
@@ -228,12 +288,12 @@ class ModularBackend(BaseBackend):
     def put_account(self, user, account, policy={}):
         """Create a new account with the given name."""
         
-        logger.debug("put_account: %s %s", account, policy)
+        logger.debug("put_account: %s %s %s", user, account, policy)
         if user != account:
             raise NotAllowedError
         node = self.node.node_lookup(account)
         if node is not None:
-            raise NameError('Account already exists')
+            raise AccountExists('Account already exists')
         if policy:
             self._check_policy(policy)
         node = self._put_path(user, self.ROOTNODE, account)
@@ -243,40 +303,63 @@ class ModularBackend(BaseBackend):
     def delete_account(self, user, account):
         """Delete the account with the given name."""
         
-        logger.debug("delete_account: %s", account)
+        logger.debug("delete_account: %s %s", user, account)
         if user != account:
             raise NotAllowedError
         node = self.node.node_lookup(account)
         if node is None:
             return
         if not self.node.node_remove(node):
-            raise IndexError('Account is not empty')
+            raise AccountNotEmpty('Account is not empty')
         self.permissions.group_destroy(account)
     
     @backend_method
-    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
+    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
         """Return a list of containers existing under an account."""
         
-        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
+        logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
         if user != account:
             if until or account not in self._allowed_accounts(user):
                 raise NotAllowedError
             allowed = self._allowed_containers(user, account)
             start, limit = self._list_limits(allowed, marker, limit)
             return allowed[start:start + limit]
-        if shared:
-            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
-            allowed = list(set(allowed))
+        if shared or public:
+            allowed = set()
+            if shared:
+                allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)])
+            if public:
+                allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)])
+            allowed = sorted(allowed)
             start, limit = self._list_limits(allowed, marker, limit)
             return allowed[start:start + limit]
         node = self.node.node_lookup(account)
-        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
+        containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
+        start, limit = self._list_limits([x[0] for x in containers], marker, limit)
+        return containers[start:start + limit]
+    
+    @backend_method
+    def list_container_meta(self, user, account, container, domain, until=None):
+        """Return a list with all the container's object meta keys for the domain."""
+        
+        logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
+        allowed = []
+        if user != account:
+            if until:
+                raise NotAllowedError
+            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
+            if not allowed:
+                raise NotAllowedError
+        path, node = self._lookup_container(account, container)
+        before = until if until is not None else inf
+        allowed = self._get_formatted_paths(allowed)
+        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
     
     @backend_method
-    def get_container_meta(self, user, account, container, domain, until=None):
+    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
         """Return a dictionary with the container metadata for the domain."""
         
-        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
+        logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
         if user != account:
             if until or container not in self._allowed_containers(user, account):
                 raise NotAllowedError
@@ -294,7 +377,9 @@ class ModularBackend(BaseBackend):
         if user != account:
             meta = {'name': container}
         else:
-            meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
+            meta = {}
+            if include_user_defined:
+                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
             if until is not None:
                 meta.update({'until_timestamp': tstamp})
             meta.update({'name': container, 'count': count, 'bytes': bytes})
@@ -305,17 +390,21 @@ class ModularBackend(BaseBackend):
     def update_container_meta(self, user, account, container, domain, meta, replace=False):
         """Update the metadata associated with the container for the domain."""
         
-        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
+        logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_container(account, container)
-        self._put_metadata(user, node, domain, meta, replace)
+        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
+        if src_version_id is not None:
+            versioning = self._get_policy(node)['versioning']
+            if versioning != 'auto':
+                self.node.version_remove(src_version_id)
     
     @backend_method
     def get_container_policy(self, user, account, container):
         """Return a dictionary with the container policy."""
         
-        logger.debug("get_container_policy: %s %s", account, container)
+        logger.debug("get_container_policy: %s %s %s", user, account, container)
         if user != account:
             if container not in self._allowed_containers(user, account):
                 raise NotAllowedError
@@ -327,7 +416,7 @@ class ModularBackend(BaseBackend):
     def update_container_policy(self, user, account, container, policy, replace=False):
         """Update the policy associated with the container."""
         
-        logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
+        logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_container(account, container)
@@ -338,7 +427,7 @@ class ModularBackend(BaseBackend):
     def put_container(self, user, account, container, policy={}):
         """Create a new container with the given name."""
         
-        logger.debug("put_container: %s %s %s", account, container, policy)
+        logger.debug("put_container: %s %s %s %s", user, account, container, policy)
         if user != account:
             raise NotAllowedError
         try:
@@ -346,7 +435,7 @@ class ModularBackend(BaseBackend):
         except NameError:
             pass
         else:
-            raise NameError('Container already exists')
+            raise ContainerExists('Container already exists')
         if policy:
             self._check_policy(policy)
         path = '/'.join((account, container))
@@ -354,70 +443,170 @@ class ModularBackend(BaseBackend):
         self._put_policy(node, policy, True)
     
     @backend_method
-    def delete_container(self, user, account, container, until=None):
+    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
         """Delete/purge the container with the given name."""
         
-        logger.debug("delete_container: %s %s %s", account, container, until)
+        logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_container(account, container)
         
         if until is not None:
-            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
+            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
+            self._report_size_change(user, account, -size, {'action': 'container purge', 'path':path})
             return
         
-        if self._get_statistics(node)[0] > 0:
-            raise IndexError('Container is not empty')
-        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
-        for h in hashes:
-            self.store.map_delete(h)
-        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
-        self.node.node_remove(node)
+        if not delimiter:
+            if self._get_statistics(node)[0] > 0:
+                raise ContainerNotEmpty('Container is not empty')
+            hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
+            for h in hashes:
+                self.store.map_delete(h)
+            self.node.node_purge_children(node, inf, CLUSTER_DELETED)
+            self.node.node_remove(node)
+            self._report_size_change(user, account, -size, {'action': 'container delete', 'path':path})
+        else:
+               # remove only contents
+            src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+            paths = []
+            for t in src_names:
+                path = '/'.join((account, container, t[0]))
+                node = t[2]
+                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
+                del_size = self._apply_versioning(account, container, src_version_id)
+                if del_size:
+                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
+                self._report_object_change(user, account, path, details={'action': 'object delete'})
+                paths.append(path)
+            self.permissions.access_clear_bulk(paths)
+    
+    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
+        if user != account and until:
+            raise NotAllowedError
+        if shared and public:
+            # get shared first
+            shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False)
+            objects = set()
+            if shared:
+                path, node = self._lookup_container(account, container)
+                shared = self._get_formatted_paths(shared)
+                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
+            
+            # get public
+            objects |= set(self._list_public_object_properties(user, account, container, prefix, all_props))
+            objects = list(objects)
+            
+            objects.sort(key=lambda x: x[0])
+            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
+            return objects[start:start + limit]
+        elif public:
+            objects = self._list_public_object_properties(user, account, container, prefix, all_props)
+            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
+            return objects[start:start + limit]
+        
+        allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
+        if shared and not allowed:
+            return []
+        path, node = self._lookup_container(account, container)
+        allowed = self._get_formatted_paths(allowed)
+        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
+        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
+        return objects[start:start + limit]
     
-    @backend_method
-    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
-        """Return a list of objects existing under a container."""
+    def _list_public_object_properties(self, user, account, container, prefix, all_props):
+        public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
+        paths, nodes = self._lookup_objects(public)
+        path = '/'.join((account, container))
+        cont_prefix = path + '/'
+        paths = [x[len(cont_prefix):] for x in paths]
+        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
+        objects = [(path,) + props for path, props in zip(paths, props)]
+        return objects
         
-        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
+    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
+        objects = []
+        while True:
+            marker = objects[-1] if objects else None
+            limit = 10000
+            l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public)
+            objects.extend(l)
+            if not l or len(l) < limit:
+                break
+        return objects
+    
+    def _list_object_permissions(self, user, account, container, prefix, shared, public):
         allowed = []
+        path = '/'.join((account, container, prefix)).rstrip('/')
         if user != account:
-            if until:
-                raise NotAllowedError
-            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
+            allowed = self.permissions.access_list_paths(user, path)
             if not allowed:
                 raise NotAllowedError
         else:
+            allowed = set()
             if shared:
-                allowed = self.permissions.access_list_shared('/'.join((account, container)))
-                if not allowed:
-                    return []
-        path, node = self._lookup_container(account, container)
-        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
+                allowed.update(self.permissions.access_list_shared(path))
+            if public:
+                allowed.update([x[0] for x in self.permissions.public_list(path)])
+            allowed = sorted(allowed)
+            if not allowed:
+                return []
+        return allowed
     
     @backend_method
-    def list_object_meta(self, user, account, container, domain, until=None):
-        """Return a list with all the container's object meta keys for the domain."""
+    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
+        """Return a list of object (name, version_id) tuples existing under a container."""
         
-        logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
-        allowed = []
-        if user != account:
-            if until:
-                raise NotAllowedError
-            allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
-            if not allowed:
-                raise NotAllowedError
-        path, node = self._lookup_container(account, container)
-        before = until if until is not None else inf
-        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
+        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
+        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
+    
+    @backend_method
+    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
+        """Return a list of object metadata dicts existing under a container."""
+        
+        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
+        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
+        objects = []
+        for p in props:
+            if len(p) == 2:
+                objects.append({'subdir': p[0]})
+            else:
+                objects.append({'name': p[0],
+                                'bytes': p[self.SIZE + 1],
+                                'type': p[self.TYPE + 1],
+                                'hash': p[self.HASH + 1],
+                                'version': p[self.SERIAL + 1],
+                                'version_timestamp': p[self.MTIME + 1],
+                                'modified': p[self.MTIME + 1] if until is None else None,
+                                'modified_by': p[self.MUSER + 1],
+                                'uuid': p[self.UUID + 1],
+                                'checksum': p[self.CHECKSUM + 1]})
+        return objects
+    
+    @backend_method
+    def list_object_permissions(self, user, account, container, prefix=''):
+        """Return a list of paths that enforce permissions under a container."""
+        
+        logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
+        return self._list_object_permissions(user, account, container, prefix, True, False)
+    
+    @backend_method
+    def list_object_public(self, user, account, container, prefix=''):
+        """Return a dict mapping paths to public ids for objects that are public under a container."""
+        
+        logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
+        public = {}
+        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
+            public[path] = p + ULTIMATE_ANSWER
+        return public
     
     @backend_method
-    def get_object_meta(self, user, account, container, name, domain, version=None):
+    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
         """Return a dictionary with the object metadata for the domain."""
         
-        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
+        logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
         self._can_read(user, account, container, name)
         path, node = self._lookup_object(account, container, name)
         props = self._get_version(node, version)
@@ -429,20 +618,29 @@ class ModularBackend(BaseBackend):
             except NameError: # Object may be deleted.
                 del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
                 if del_props is None:
-                    raise NameError('Object does not exist')
+                    raise ItemNotExists('Object does not exist')
                 modified = del_props[self.MTIME]
         
-        meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
-        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
-        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
-        meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
+        meta = {}
+        if include_user_defined:
+            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
+        meta.update({'name': name,
+                     'bytes': props[self.SIZE],
+                     'type': props[self.TYPE],
+                     'hash': props[self.HASH],
+                     'version': props[self.SERIAL],
+                     'version_timestamp': props[self.MTIME],
+                     'modified': modified,
+                     'modified_by': props[self.MUSER],
+                     'uuid': props[self.UUID],
+                     'checksum': props[self.CHECKSUM]})
         return meta
     
     @backend_method
     def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
         """Update the metadata associated with the object for the domain and return the new version."""
         
-        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
+        logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
         self._can_write(user, account, container, name)
         path, node = self._lookup_object(account, container, name)
         src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
@@ -455,35 +653,36 @@ class ModularBackend(BaseBackend):
         from which the object gets its permissions from,
         along with a dictionary containing the permissions."""
         
-        logger.debug("get_object_permissions: %s %s %s", account, container, name)
+        logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
         allowed = 'write'
+        permissions_path = self._get_permissions_path(account, container, name)
         if user != account:
-            path = '/'.join((account, container, name))
-            if self.permissions.access_check(path, self.WRITE, user):
+            if self.permissions.access_check(permissions_path, self.WRITE, user):
                 allowed = 'write'
-            elif self.permissions.access_check(path, self.READ, user):
+            elif self.permissions.access_check(permissions_path, self.READ, user):
                 allowed = 'read'
             else:
                 raise NotAllowedError
-        path = self._lookup_object(account, container, name)[0]
-        return (allowed,) + self.permissions.access_inherit(path)
+        self._lookup_object(account, container, name)
+        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
     
     @backend_method
     def update_object_permissions(self, user, account, container, name, permissions):
         """Update the permissions associated with the object."""
         
-        logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
+        logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
         if user != account:
             raise NotAllowedError
         path = self._lookup_object(account, container, name)[0]
         self._check_permissions(path, permissions)
         self.permissions.access_set(path, permissions)
+        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
     
     @backend_method
     def get_object_public(self, user, account, container, name):
         """Return the public id of the object if applicable."""
         
-        logger.debug("get_object_public: %s %s %s", account, container, name)
+        logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
         self._can_read(user, account, container, name)
         path = self._lookup_object(account, container, name)[0]
         p = self.permissions.public_get(path)
@@ -495,7 +694,7 @@ class ModularBackend(BaseBackend):
     def update_object_public(self, user, account, container, name, public):
         """Update the public status of the object."""
         
-        logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
+        logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
         self._can_write(user, account, container, name)
         path = self._lookup_object(account, container, name)[0]
         if not public:
@@ -507,14 +706,14 @@ class ModularBackend(BaseBackend):
     def get_object_hashmap(self, user, account, container, name, version=None):
         """Return the object's size and a list with partial hashes."""
         
-        logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
+        logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
         self._can_read(user, account, container, name)
         path, node = self._lookup_object(account, container, name)
         props = self._get_version(node, version)
         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
     
-    def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
+    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
         if permissions is not None and user != account:
             raise NotAllowedError
         self._can_write(user, account, container, name)
@@ -525,14 +724,16 @@ class ModularBackend(BaseBackend):
         account_path, account_node = self._lookup_account(account, True)
         container_path, container_node = self._lookup_container(account, container)
         path, node = self._put_object_node(container_path, container_node, name)
-        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
+        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
+        
+        # Handle meta.
+        if src_version_id is None:
+            src_version_id = pre_version_id
+        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
         
         # Check quota.
-        versioning = self._get_policy(container_node)['versioning']
-        if versioning != 'auto':
-            size_delta = size - 0 # TODO: Get previous size.
-        else:
-            size_delta = size
+        del_size = self._apply_versioning(account, container, pre_version_id)
+        size_delta = size - del_size
         if size_delta > 0:
             account_quota = long(self._get_policy(account_node)['quota'])
             container_quota = long(self._get_policy(container_node)['quota'])
@@ -540,17 +741,20 @@ class ModularBackend(BaseBackend):
                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
                 # This must be executed in a transaction, so the version is never created if it fails.
                 raise QuotaError
+        self._report_size_change(user, account, size_delta, {'action': 'object update', 'path':path})
         
         if permissions is not None:
             self.permissions.access_set(path, permissions)
-        self._apply_versioning(account, container, pre_version_id)
-        return pre_version_id, dest_version_id
+            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
+        
+        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
+        return dest_version_id
     
     @backend_method
-    def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
+    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
         """Create/update an object with the specified size and partial hashes."""
         
-        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
         if size == 0: # No such thing as an empty hashmap.
             hashmap = [self.put_block('')]
         map = HashMap(self.block_size, self.hash_algorithm)
@@ -562,12 +766,26 @@ class ModularBackend(BaseBackend):
             raise ie
         
         hash = map.hash()
-        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
-        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
+        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
         self.store.map_put(hash, map)
         return dest_version_id
     
-    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
+    @backend_method
+    def update_object_checksum(self, user, account, container, name, version, checksum):
+        """Update an object's checksum."""
+        
+        logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
+        # Update objects with greater version and same hashmap and size (fix metadata updates).
+        self._can_write(user, account, container, name)
+        path, node = self._lookup_object(account, container, name)
+        props = self._get_version(node, version)
+        versions = self.node.node_get_versions(node)
+        for x in versions:
+            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
+                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
+    
+    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
+        dest_version_ids = []
         self._can_read(user, src_account, src_container, src_name)
         path, node = self._lookup_object(src_account, src_container, src_name)
         # TODO: Will do another fetch of the properties in duplicate version...
@@ -575,32 +793,51 @@ class ModularBackend(BaseBackend):
         src_version_id = props[self.SERIAL]
         hash = props[self.HASH]
         size = props[self.SIZE]
-        
         is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
-        pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
-        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
-        return dest_version_id
+        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
+        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
+               self._delete_object(user, src_account, src_container, src_name)
+        
+        if delimiter:
+            prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
+            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+            src_names.sort(key=lambda x: x[2]) # order by nodes
+            paths = [elem[0] for elem in src_names]
+            nodes = [elem[2] for elem in src_names]
+            # TODO: Will do another fetch of the properties in duplicate version...
+            props = self._get_versions(nodes) # Check to see if source exists.
+            
+            for prop, path, node in zip(props, paths, nodes):
+                src_version_id = prop[self.SERIAL]
+                hash = prop[self.HASH]
+                vtype = prop[self.TYPE]
+                size = prop[self.SIZE]
+                dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name
+                vdest_name = path.replace(prefix, dest_prefix, 1)
+                dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
+                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
+                       self._delete_object(user, src_account, src_container, path)
+        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
     
     @backend_method
-    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
+    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
         """Copy an object's data and metadata."""
         
-        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
-        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
+        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
+        return dest_version_id
     
     @backend_method
-    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
+    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
         """Move an object's data and metadata."""
         
-        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
+        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
         if user != src_account:
             raise NotAllowedError
-        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
-        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
-            self._delete_object(user, src_account, src_container, src_name)
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
         return dest_version_id
     
-    def _delete_object(self, user, account, container, name, until=None):
+    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
         if user != account:
             raise NotAllowedError
         
@@ -609,8 +846,14 @@ class ModularBackend(BaseBackend):
             node = self.node.node_lookup(path)
             if node is None:
                 return
-            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
-            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
+            hashes = []
+            size = 0
+            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
+            hashes += h
+            size += s
+            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
+            hashes += h
+            size += s
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge(node, until, CLUSTER_DELETED)
@@ -618,25 +861,44 @@ class ModularBackend(BaseBackend):
                 props = self._get_version(node)
             except NameError:
                 self.permissions.access_clear(path)
+            self._report_size_change(user, account, -size, {'action': 'object purge', 'path':path})
             return
         
         path, node = self._lookup_object(account, container, name)
-        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
-        self._apply_versioning(account, container, src_version_id)
+        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
+        del_size = self._apply_versioning(account, container, src_version_id)
+        if del_size:
+            self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
+        self._report_object_change(user, account, path, details={'action': 'object delete'})
         self.permissions.access_clear(path)
+        
+        if delimiter:
+            prefix = name + delimiter if not name.endswith(delimiter) else name
+            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
+            paths = []
+            for t in src_names:
+               path = '/'.join((account, container, t[0]))
+               node = t[2]
+                src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
+                del_size = self._apply_versioning(account, container, src_version_id)
+                if del_size:
+                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
+                self._report_object_change(user, account, path, details={'action': 'object delete'})
+                paths.append(path)
+            self.permissions.access_clear_bulk(paths)
     
     @backend_method
-    def delete_object(self, user, account, container, name, until=None):
+    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
         """Delete/purge an object."""
         
-        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
-        self._delete_object(user, account, container, name, until)
+        logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
+        self._delete_object(user, account, container, name, until, delimiter)
     
     @backend_method
     def list_versions(self, user, account, container, name):
         """Return a list of all (version, version_timestamp) tuples for an object."""
         
-        logger.debug("list_versions: %s %s %s", account, container, name)
+        logger.debug("list_versions: %s %s %s %s", user, account, container, name)
         self._can_read(user, account, container, name)
         path, node = self._lookup_object(account, container, name)
         versions = self.node.node_get_versions(node)
@@ -646,7 +908,7 @@ class ModularBackend(BaseBackend):
     def get_uuid(self, user, uuid):
         """Return the (account, container, name) for the UUID given."""
         
-        logger.debug("get_uuid: %s", uuid)
+        logger.debug("get_uuid: %s %s", user, uuid)
         info = self.node.latest_uuid(uuid)
         if info is None:
             raise NameError
@@ -659,7 +921,7 @@ class ModularBackend(BaseBackend):
     def get_public(self, user, public):
         """Return the (account, container, name) for the public id given."""
         
-        logger.debug("get_public: %s", public)
+        logger.debug("get_public: %s %s", user, public)
         if public is None or public < ULTIMATE_ANSWER:
             raise NameError
         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
@@ -676,7 +938,7 @@ class ModularBackend(BaseBackend):
         logger.debug("get_block: %s", hash)
         block = self.store.block_get(binascii.unhexlify(hash))
         if not block:
-            raise NameError('Block does not exist')
+            raise ItemNotExists('Block does not exist')
         return block
     
     @backend_method(autocommit=0)
@@ -710,7 +972,7 @@ class ModularBackend(BaseBackend):
     
     def _put_path(self, user, parent, path):
         node = self.node.node_create(parent, path)
-        self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
+        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
         return node
     
     def _lookup_account(self, account, create=True):
@@ -723,16 +985,20 @@ class ModularBackend(BaseBackend):
         path = '/'.join((account, container))
         node = self.node.node_lookup(path)
         if node is None:
-            raise NameError('Container does not exist')
+            raise ItemNotExists('Container does not exist')
         return path, node
     
     def _lookup_object(self, account, container, name):
         path = '/'.join((account, container, name))
         node = self.node.node_lookup(path)
         if node is None:
-            raise NameError('Object does not exist')
+            raise ItemNotExists('Object does not exist')
         return path, node
     
+    def _lookup_objects(self, paths):
+        nodes = self.node.node_lookup_bulk(paths)
+        return paths, nodes
+    
     def _get_properties(self, node, until=None):
         """Return properties until the timestamp given."""
         
@@ -741,7 +1007,7 @@ class ModularBackend(BaseBackend):
         if props is None and until is not None:
             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
         if props is None:
-            raise NameError('Path does not exist')
+            raise ItemNotExists('Path does not exist')
         return props
     
     def _get_statistics(self, node, until=None):
@@ -759,18 +1025,21 @@ class ModularBackend(BaseBackend):
         if version is None:
             props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
             if props is None:
-                raise NameError('Object does not exist')
+                raise ItemNotExists('Object does not exist')
         else:
             try:
                 version = int(version)
             except ValueError:
-                raise IndexError('Version does not exist')
+                raise VersionNotExists('Version does not exist')
             props = self.node.version_get_properties(version)
             if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
-                raise IndexError('Version does not exist')
+                raise VersionNotExists('Version does not exist')
         return props
+
+    def _get_versions(self, nodes):
+        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
     
-    def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
+    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
         """Create a new version of the node."""
         
         props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
@@ -778,13 +1047,21 @@ class ModularBackend(BaseBackend):
             src_version_id = props[self.SERIAL]
             src_hash = props[self.HASH]
             src_size = props[self.SIZE]
+            src_type = props[self.TYPE]
+            src_checksum = props[self.CHECKSUM]
         else:
             src_version_id = None
             src_hash = None
             src_size = 0
-        if size is None:
-            hash = src_hash # This way hash can be set to None.
+            src_type = ''
+            src_checksum = ''
+        if size is None: # Set metadata.
+            hash = src_hash # This way hash can be set to None (account or container).
             size = src_size
+        if type is None:
+            type = src_type
+        if checksum is None:
+            checksum = src_checksum
         uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
         
         if src_node is None:
@@ -797,7 +1074,7 @@ class ModularBackend(BaseBackend):
         if pre_version_id is not None:
             self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
         
-        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
+        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
         return pre_version_id, dest_version_id
     
     def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
@@ -828,7 +1105,7 @@ class ModularBackend(BaseBackend):
             limit = 10000
         return start, limit
     
-    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
+    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
         cont_prefix = path + '/'
         prefix = cont_prefix + prefix
         start = cont_prefix + marker if marker else None
@@ -836,13 +1113,30 @@ class ModularBackend(BaseBackend):
         filterq = keys if domain else []
         sizeq = size_range
         
-        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
+        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
         objects.extend([(p, None) for p in prefixes] if virtual else [])
         objects.sort(key=lambda x: x[0])
-        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
+        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
+        return objects
         
-        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
-        return objects[start:start + limit]
+    # Reporting functions.
+    
+    def _report_size_change(self, user, account, size, details={}):
+        account_node = self._lookup_account(account, True)[1]
+        total = self._get_statistics(account_node)[1]
+        details.update({'user': user, 'total': total})
+        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
+        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
+    
+    def _report_object_change(self, user, account, path, details={}):
+        details.update({'user': user})
+        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
+        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
+    
+    def _report_sharing_change(self, user, account, path, details={}):
+        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
+        details.update({'user': user})
+        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
     
     # Policy functions.
     
@@ -874,13 +1168,19 @@ class ModularBackend(BaseBackend):
         return policy
     
     def _apply_versioning(self, account, container, version_id):
+        """Delete the provided version if such is the policy.
+           Return size of object removed.
+        """
+        
         if version_id is None:
-            return
+            return 0
         path, node = self._lookup_container(account, container)
         versioning = self._get_policy(node)['versioning']
         if versioning != 'auto':
-            hash = self.node.version_remove(version_id)
+            hash, size = self.node.version_remove(version_id)
             self.store.map_delete(hash)
+            return size
+        return 0
     
     # Access control functions.
     
@@ -890,18 +1190,48 @@ class ModularBackend(BaseBackend):
     
     def _check_permissions(self, path, permissions):
         # raise ValueError('Bad characters in permissions')
-        
-        # Check for existing permissions.
-        paths = self.permissions.access_list(path)
-        if paths:
-            ae = AttributeError()
-            ae.data = paths
-            raise ae
+        pass
+    
+    def _get_formatted_paths(self, paths):
+        formatted = []
+        for p in paths:
+            node = self.node.node_lookup(p)
+            if node is not None:
+                props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+            if props is not None:
+                if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
+                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
+                formatted.append((p, self.MATCH_EXACT))
+        return formatted
+    
+    def _get_permissions_path(self, account, container, name):
+        path = '/'.join((account, container, name))
+        permission_paths = self.permissions.access_inherit(path)
+        permission_paths.sort()
+        permission_paths.reverse()
+        for p in permission_paths:
+            if p == path:
+                return p
+            else:
+                if p.count('/') < 2:
+                    continue
+                node = self.node.node_lookup(p)
+                if node is not None:
+                    props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+                if props is not None:
+                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
+                        return p
+        return None
     
     def _can_read(self, user, account, container, name):
         if user == account:
             return True
         path = '/'.join((account, container, name))
+        if self.permissions.public_get(path) is not None:
+            return True
+        path = self._get_permissions_path(account, container, name)
+        if not path:
+            raise NotAllowedError
         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
             raise NotAllowedError
     
@@ -909,6 +1239,9 @@ class ModularBackend(BaseBackend):
         if user == account:
             return True
         path = '/'.join((account, container, name))
+        path = self._get_permissions_path(account, container, name)
+        if not path:
+            raise NotAllowedError
         if not self.permissions.access_check(path, self.WRITE, user):
             raise NotAllowedError