Remove error for overlapping permissions. Document.
[pithos] / pithos / backends / modular.py
index 585e7bf..5daa884 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2011 GRNET S.A. All rights reserved.
+# Copyright 2011-2012 GRNET S.A. All rights reserved.
 # 
 # Redistribution and use in source and binary forms, with or
 # without modification, are permitted provided that the following
 import sys
 import os
 import time
+import uuid as uuidlib
 import logging
 import binascii
 
-from base import NotAllowedError, QuotaError, BaseBackend
+from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
 
 from pithos.lib.hashmap import HashMap
 
+# Default modules and settings.
+DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
+DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
+DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
+DEFAULT_BLOCK_PATH = 'data/'
+#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
+#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+
+QUEUE_MESSAGE_KEY = '#'
+QUEUE_CLIENT_ID = 2 # Pithos.
+
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
 
 inf = float('inf')
@@ -77,34 +89,53 @@ class ModularBackend(BaseBackend):
     Uses modules for SQL functions and storage.
     """
     
-    def __init__(self, db_module, db_connection, block_module, block_path):
-        db_module = db_module or 'pithos.backends.lib.sqlalchemy'
-        block_module = block_module or 'pithos.backends.lib.hashfiler'
+    def __init__(self, db_module=None, db_connection=None,
+                 block_module=None, block_path=None,
+                 queue_module=None, queue_connection=None):
+        db_module = db_module or DEFAULT_DB_MODULE
+        db_connection = db_connection or DEFAULT_DB_CONNECTION
+        block_module = block_module or DEFAULT_BLOCK_MODULE
+        block_path = block_path or DEFAULT_BLOCK_PATH
+        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
+        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
         
         self.hash_algorithm = 'sha256'
         self.block_size = 4 * 1024 * 1024 # 4MB
         
-        self.default_policy = {'quota': 0, 'versioning': 'auto'}
+        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
         
-        __import__(db_module)
-        self.db_module = sys.modules[db_module]
-        self.wrapper = self.db_module.DBWrapper(db_connection)
+        def load_module(m):
+            __import__(m)
+            return sys.modules[m]
         
+        self.db_module = load_module(db_module)
+        self.wrapper = self.db_module.DBWrapper(db_connection)
         params = {'wrapper': self.wrapper}
         self.permissions = self.db_module.Permissions(**params)
         for x in ['READ', 'WRITE']:
             setattr(self, x, getattr(self.db_module, x))
         self.node = self.db_module.Node(**params)
-        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
+        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
             setattr(self, x, getattr(self.db_module, x))
         
-        __import__(block_module)
-        self.block_module = sys.modules[block_module]
-        
+        self.block_module = load_module(block_module)
         params = {'path': block_path,
                   'block_size': self.block_size,
                   'hash_algorithm': self.hash_algorithm}
         self.store = self.block_module.Store(**params)
+
+        if queue_module and queue_connection:
+            self.queue_module = load_module(queue_module)
+            params = {'exchange': queue_connection,
+                      'message_key': QUEUE_MESSAGE_KEY,
+                      'client_id': QUEUE_CLIENT_ID}
+            self.queue = self.queue_module.Queue(**params)
+        else:
+            class NoQueue:
+                def send(self, *args):
+                    pass
+            
+            self.queue = NoQueue()
     
     def close(self):
         self.wrapper.close()
@@ -119,10 +150,10 @@ class ModularBackend(BaseBackend):
         return allowed[start:start + limit]
     
     @backend_method
-    def get_account_meta(self, user, account, until=None):
-        """Return a dictionary with the account metadata."""
+    def get_account_meta(self, user, account, domain, until=None):
+        """Return a dictionary with the account metadata for the domain."""
         
-        logger.debug("get_account_meta: %s %s", account, until)
+        logger.debug("get_account_meta: %s %s %s", account, domain, until)
         path, node = self._lookup_account(account, user == account)
         if user != account:
             if until or node is None or account not in self._allowed_accounts(user):
@@ -146,7 +177,7 @@ class ModularBackend(BaseBackend):
         else:
             meta = {}
             if props is not None:
-                meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
+                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
             if until is not None:
                 meta.update({'until_timestamp': tstamp})
             meta.update({'name': account, 'count': count, 'bytes': bytes})
@@ -154,14 +185,14 @@ class ModularBackend(BaseBackend):
         return meta
     
     @backend_method
-    def update_account_meta(self, user, account, meta, replace=False):
-        """Update the metadata associated with the account."""
+    def update_account_meta(self, user, account, domain, meta, replace=False):
+        """Update the metadata associated with the account for the domain."""
         
-        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
+        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_account(account, True)
-        self._put_metadata(user, node, meta, replace)
+        self._put_metadata(user, node, domain, meta, replace)
     
     @backend_method
     def get_account_groups(self, user, account):
@@ -261,13 +292,13 @@ class ModularBackend(BaseBackend):
             start, limit = self._list_limits(allowed, marker, limit)
             return allowed[start:start + limit]
         node = self.node.node_lookup(account)
-        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
+        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
     
     @backend_method
-    def get_container_meta(self, user, account, container, until=None):
-        """Return a dictionary with the container metadata."""
+    def get_container_meta(self, user, account, container, domain, until=None):
+        """Return a dictionary with the container metadata for the domain."""
         
-        logger.debug("get_container_meta: %s %s %s", account, container, until)
+        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
         if user != account:
             if until or container not in self._allowed_containers(user, account):
                 raise NotAllowedError
@@ -285,7 +316,7 @@ class ModularBackend(BaseBackend):
         if user != account:
             meta = {'name': container}
         else:
-            meta = dict(self.node.attribute_get(props[self.SERIAL]))
+            meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
             if until is not None:
                 meta.update({'until_timestamp': tstamp})
             meta.update({'name': container, 'count': count, 'bytes': bytes})
@@ -293,14 +324,14 @@ class ModularBackend(BaseBackend):
         return meta
     
     @backend_method
-    def update_container_meta(self, user, account, container, meta, replace=False):
-        """Update the metadata associated with the container."""
+    def update_container_meta(self, user, account, container, domain, meta, replace=False):
+        """Update the metadata associated with the container for the domain."""
         
-        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
+        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
         if user != account:
             raise NotAllowedError
         path, node = self._lookup_container(account, container)
-        self._put_metadata(user, node, meta, replace)
+        self._put_metadata(user, node, domain, meta, replace)
     
     @backend_method
     def get_container_policy(self, user, account, container):
@@ -358,6 +389,7 @@ class ModularBackend(BaseBackend):
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
+            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
             return
         
         if self._get_statistics(node)[0] > 0:
@@ -367,12 +399,15 @@ class ModularBackend(BaseBackend):
             self.store.map_delete(h)
         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
         self.node.node_remove(node)
+        self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+    
+    # XXX: Up to here...
     
     @backend_method
-    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
+    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
         """Return a list of objects existing under a container."""
         
-        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
+        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
         allowed = []
         if user != account:
             if until:
@@ -386,13 +421,13 @@ class ModularBackend(BaseBackend):
                 if not allowed:
                     return []
         path, node = self._lookup_container(account, container)
-        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
+        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
     
     @backend_method
-    def list_object_meta(self, user, account, container, until=None):
-        """Return a list with all the container's object meta keys."""
+    def list_object_meta(self, user, account, container, domain, until=None):
+        """Return a list with all the container's object meta keys for the domain."""
         
-        logger.debug("list_object_meta: %s %s %s", account, container, until)
+        logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
         allowed = []
         if user != account:
             if until:
@@ -402,13 +437,13 @@ class ModularBackend(BaseBackend):
                 raise NotAllowedError
         path, node = self._lookup_container(account, container)
         before = until if until is not None else inf
-        return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
+        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
     
     @backend_method
-    def get_object_meta(self, user, account, container, name, version=None):
-        """Return a dictionary with the object metadata."""
+    def get_object_meta(self, user, account, container, name, domain, version=None):
+        """Return a dictionary with the object metadata for the domain."""
         
-        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
+        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
         self._can_read(user, account, container, name)
         path, node = self._lookup_object(account, container, name)
         props = self._get_version(node, version)
@@ -423,20 +458,20 @@ class ModularBackend(BaseBackend):
                     raise NameError('Object does not exist')
                 modified = del_props[self.MTIME]
         
-        meta = dict(self.node.attribute_get(props[self.SERIAL]))
+        meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
         meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
         meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
-        meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
+        meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
         return meta
     
     @backend_method
-    def update_object_meta(self, user, account, container, name, meta, replace=False):
-        """Update the metadata associated with the object."""
+    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
+        """Update the metadata associated with the object for the domain and return the new version."""
         
-        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
+        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
         self._can_write(user, account, container, name)
         path, node = self._lookup_object(account, container, name)
-        src_version_id, dest_version_id = self._put_metadata(user, node, meta, replace)
+        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
         self._apply_versioning(account, container, src_version_id)
         return dest_version_id
     
@@ -505,7 +540,7 @@ class ModularBackend(BaseBackend):
         hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
         return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
     
-    def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
+    def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
         if permissions is not None and user != account:
             raise NotAllowedError
         self._can_write(user, account, container, name)
@@ -516,10 +551,14 @@ class ModularBackend(BaseBackend):
         account_path, account_node = self._lookup_account(account, True)
         container_path, container_node = self._lookup_container(account, container)
         path, node = self._put_object_node(container_path, container_node, name)
-        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
+        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
         
         # Check quota.
-        size_delta = size # Change with versioning.
+        versioning = self._get_policy(container_node)['versioning']
+        if versioning != 'auto':
+            size_delta = size - 0 # TODO: Get previous size.
+        else:
+            size_delta = size
         if size_delta > 0:
             account_quota = long(self._get_policy(account_node)['quota'])
             container_quota = long(self._get_policy(container_node)['quota'])
@@ -528,16 +567,13 @@ class ModularBackend(BaseBackend):
                 # This must be executed in a transaction, so the version is never created if it fails.
                 raise QuotaError
         
-        if not replace_meta and src_version_id is not None:
-            self.node.attribute_copy(src_version_id, dest_version_id)
-        self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
         if permissions is not None:
             self.permissions.access_set(path, permissions)
-        self._apply_versioning(account, container, src_version_id)
-        return dest_version_id
+        self._apply_versioning(account, container, pre_version_id)
+        return pre_version_id, dest_version_id
     
     @backend_method
-    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
+    def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
         """Create/update an object with the specified size and partial hashes."""
         
         logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
@@ -552,48 +588,46 @@ class ModularBackend(BaseBackend):
             raise ie
         
         hash = map.hash()
-        dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
+        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
+        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
         self.store.map_put(hash, map)
+        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
         return dest_version_id
     
-    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
         self._can_read(user, src_account, src_container, src_name)
         path, node = self._lookup_object(src_account, src_container, src_name)
-        props = self._get_version(node, src_version)
+        # TODO: Will do another fetch of the properties in duplicate version...
+        props = self._get_version(node, src_version) # Check to see if source exists.
         src_version_id = props[self.SERIAL]
         hash = props[self.HASH]
         size = props[self.SIZE]
         
-        if (src_account, src_container, src_name) == (dest_account, dest_container, dest_name):
-            dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, dest_meta, replace_meta, permissions)
-        else:
-            if replace_meta:
-                meta = dest_meta
-            else:
-                meta = {}
-            dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
-            if not replace_meta:
-                self.node.attribute_copy(src_version_id, dest_version_id)
-                self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
+        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
+        pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
+        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
         return dest_version_id
     
     @backend_method
-    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
         """Copy an object's data and metadata."""
         
-        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
-        return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
+        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
+        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
+        return dest_version_id
     
     @backend_method
-    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
+    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
         """Move an object's data and metadata."""
         
-        logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
+        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
         if user != src_account:
             raise NotAllowedError
-        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
             self._delete_object(user, src_account, src_container, src_name)
+        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
         return dest_version_id
     
     def _delete_object(self, user, account, container, name, until=None):
@@ -614,10 +648,11 @@ class ModularBackend(BaseBackend):
                 props = self._get_version(node)
             except NameError:
                 self.permissions.access_clear(path)
+            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
             return
         
         path, node = self._lookup_object(account, container, name)
-        src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
+        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
         self._apply_versioning(account, container, src_version_id)
         self.permissions.access_clear(path)
     
@@ -639,12 +674,28 @@ class ModularBackend(BaseBackend):
         return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
     
     @backend_method
+    def get_uuid(self, user, uuid):
+        """Return the (account, container, name) for the UUID given."""
+        
+        logger.debug("get_uuid: %s", uuid)
+        info = self.node.latest_uuid(uuid)
+        if info is None:
+            raise NameError
+        path, serial = info
+        account, container, name = path.split('/', 2)
+        self._can_read(user, account, container, name)
+        return (account, container, name)
+    
+    @backend_method
     def get_public(self, user, public):
         """Return the (account, container, name) for the public id given."""
+        
         logger.debug("get_public: %s", public)
         if public is None or public < ULTIMATE_ANSWER:
             raise NameError
         path = self.permissions.public_path(public - ULTIMATE_ANSWER)
+        if path is None:
+            raise NameError
         account, container, name = path.split('/', 2)
         self._can_read(user, account, container, name)
         return (account, container, name)
@@ -678,6 +729,9 @@ class ModularBackend(BaseBackend):
     
     # Path functions.
     
+    def _generate_uuid(self):
+        return str(uuidlib.uuid4())
+    
     def _put_object_node(self, path, parent, name):
         path = '/'.join((path, name))
         node = self.node.node_lookup(path)
@@ -687,7 +741,7 @@ class ModularBackend(BaseBackend):
     
     def _put_path(self, user, parent, path):
         node = self.node.node_create(parent, path)
-        self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
+        self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
         return node
     
     def _lookup_account(self, account, create=True):
@@ -747,10 +801,10 @@ class ModularBackend(BaseBackend):
                 raise IndexError('Version does not exist')
         return props
     
-    def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
+    def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
         """Create a new version of the node."""
         
-        props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
         if props is not None:
             src_version_id = props[self.SERIAL]
             src_hash = props[self.HASH]
@@ -762,25 +816,36 @@ class ModularBackend(BaseBackend):
         if size is None:
             hash = src_hash # This way hash can be set to None.
             size = src_size
+        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
         
+        if src_node is None:
+            pre_version_id = src_version_id
+        else:
+            pre_version_id = None
+            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+            if props is not None:
+                pre_version_id = props[self.SERIAL]
+        if pre_version_id is not None:
+            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
+        
+        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
+        return pre_version_id, dest_version_id
+    
+    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
         if src_version_id is not None:
-            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
-        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
-        return src_version_id, dest_version_id
+            self.node.attribute_copy(src_version_id, dest_version_id)
+        if not replace:
+            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
+            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
+        else:
+            self.node.attribute_del(dest_version_id, domain)
+            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
     
-    def _put_metadata(self, user, node, meta, replace=False):
+    def _put_metadata(self, user, node, domain, meta, replace=False):
         """Create a new version and store metadata."""
         
         src_version_id, dest_version_id = self._put_version_duplicate(user, node)
-        
-        # TODO: Merge with other functions that update metadata...
-        if not replace:
-            if src_version_id is not None:
-                self.node.attribute_copy(src_version_id, dest_version_id)
-            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
-            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
-        else:
-            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
+        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
         return src_version_id, dest_version_id
     
     def _list_limits(self, listing, marker, limit):
@@ -794,13 +859,15 @@ class ModularBackend(BaseBackend):
             limit = 10000
         return start, limit
     
-    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
+    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
         cont_prefix = path + '/'
         prefix = cont_prefix + prefix
         start = cont_prefix + marker if marker else None
         before = until if until is not None else inf
+        filterq = keys if domain else []
+        sizeq = size_range
         
-        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, keys)
+        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
         objects.extend([(p, None) for p in prefixes] if virtual else [])
         objects.sort(key=lambda x: x[0])
         objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
@@ -845,6 +912,7 @@ class ModularBackend(BaseBackend):
         if versioning != 'auto':
             hash = self.node.version_remove(version_id)
             self.store.map_delete(hash)
+            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
     
     # Access control functions.
     
@@ -854,18 +922,35 @@ class ModularBackend(BaseBackend):
     
     def _check_permissions(self, path, permissions):
         # raise ValueError('Bad characters in permissions')
-        
-        # Check for existing permissions.
-        paths = self.permissions.access_list(path)
-        if paths:
-            ae = AttributeError()
-            ae.data = paths
-            raise ae
+        pass
+    
+    def _get_permissions_path(self, account, container, name):
+        path = '/'.join((account, container, name))
+        permission_paths = self.permissions.access_inherit(path)
+        permission_paths.sort()
+        permission_paths.reverse()
+        for p in permission_paths:
+            if p == path:
+                return p
+            else:
+                try:
+                    parts = p.split('/', 2)
+                    if len(parts) != 3:
+                        return None
+                    path, node = self._lookup_object(*p.split('/', 2))
+                    props = self._get_version(node)
+                    # XXX: Put type in properties...
+                    meta = dict(self.node.attribute_get(props[self.SERIAL], 'pithos'))
+                    if meta['Content-Type'] == 'application/directory':
+                        return p
+                except NameError:
+                    pass
+        return None
     
     def _can_read(self, user, account, container, name):
         if user == account:
             return True
-        path = '/'.join((account, container, name))
+        path = self._get_permissions_path(account, container, name)
         if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
             raise NotAllowedError