Remove error for overlapping permissions. Document.
[pithos] / pithos / backends / modular.py
index 94da72c..5daa884 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2011 GRNET S.A. All rights reserved.
+# Copyright 2011-2012 GRNET S.A. All rights reserved.
 # 
 # Redistribution and use in source and binary forms, with or
 # without modification, are permitted provided that the following
 # interpreted as representing official policies, either expressed
 # or implied, of GRNET S.A.
 
+import sys
 import os
 import time
-import sqlite3
+import uuid as uuidlib
 import logging
-import hashlib
 import binascii
 
-from base import NotAllowedError, BaseBackend
-from lib.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
-from lib.permissions import Permissions, READ, WRITE
-from lib.policy import Policy
-from lib.hashfiler import Mapper, Blocker
+from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend
+
+from pithos.lib.hashmap import HashMap
+
+# Default modules and settings.
+DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
+DEFAULT_DB_CONNECTION = 'sqlite:///backend.db'
+DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler'
+DEFAULT_BLOCK_PATH = 'data/'
+#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
+#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
+
+QUEUE_MESSAGE_KEY = '#'
+QUEUE_CLIENT_ID = 2 # Pithos.
 
 ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
 
 inf = float('inf')
 
+ULTIMATE_ANSWER = 42
+
 
 logger = logging.getLogger(__name__)
 
+
 def backend_method(func=None, autocommit=1):
     if func is None:
         def fn(func):
@@ -60,13 +72,13 @@ def backend_method(func=None, autocommit=1):
     if not autocommit:
         return func
     def fn(self, *args, **kw):
-        self.con.execute('begin deferred')
+        self.wrapper.execute()
         try:
             ret = func(self, *args, **kw)
-            self.con.commit()
+            self.wrapper.commit()
             return ret
         except:
-            self.con.rollback()
+            self.wrapper.rollback()
             raise
     return fn
 
@@ -74,87 +86,98 @@ def backend_method(func=None, autocommit=1):
 class ModularBackend(BaseBackend):
     """A modular backend.
     
-    Uses SQLite for storage.
+    Uses modules for SQL functions and storage.
     """
     
-    # TODO: Create account if not present in all functions.
-    
-    def __init__(self, db):
+    def __init__(self, db_module=None, db_connection=None,
+                 block_module=None, block_path=None,
+                 queue_module=None, queue_connection=None):
+        db_module = db_module or DEFAULT_DB_MODULE
+        db_connection = db_connection or DEFAULT_DB_CONNECTION
+        block_module = block_module or DEFAULT_BLOCK_MODULE
+        block_path = block_path or DEFAULT_BLOCK_PATH
+        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
+        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
+        
         self.hash_algorithm = 'sha256'
         self.block_size = 4 * 1024 * 1024 # 4MB
         
-        self.default_policy = {'quota': 0, 'versioning': 'auto'}
-        
-        basepath = os.path.split(db)[0]
-        if basepath and not os.path.exists(basepath):
-            os.makedirs(basepath)
-        if not os.path.isdir(basepath):
-            raise RuntimeError("Cannot open database at '%s'" % (db,))
-        
-        self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)        
-        
-        params = {'blocksize': self.block_size,
-                  'blockpath': basepath + '/blocks',
-                  'hashtype': self.hash_algorithm}
-        self.blocker = Blocker(**params)
-        
-        params = {'mappath': basepath + '/maps',
-                  'namelen': self.blocker.hashlen}
-        self.mapper = Mapper(**params)
-        
-        params = {'connection': self.con,
-                  'cursor': self.con.cursor()}
-        self.permissions = Permissions(**params)
-        self.policy = Policy(**params)
-        self.node = Node(**params)
-        
-        self.con.commit()
+        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
+        
+        def load_module(m):
+            __import__(m)
+            return sys.modules[m]
+        
+        self.db_module = load_module(db_module)
+        self.wrapper = self.db_module.DBWrapper(db_connection)
+        params = {'wrapper': self.wrapper}
+        self.permissions = self.db_module.Permissions(**params)
+        for x in ['READ', 'WRITE']:
+            setattr(self, x, getattr(self.db_module, x))
+        self.node = self.db_module.Node(**params)
+        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']:
+            setattr(self, x, getattr(self.db_module, x))
+        
+        self.block_module = load_module(block_module)
+        params = {'path': block_path,
+                  'block_size': self.block_size,
+                  'hash_algorithm': self.hash_algorithm}
+        self.store = self.block_module.Store(**params)
+
+        if queue_module and queue_connection:
+            self.queue_module = load_module(queue_module)
+            params = {'exchange': queue_connection,
+                      'message_key': QUEUE_MESSAGE_KEY,
+                      'client_id': QUEUE_CLIENT_ID}
+            self.queue = self.queue_module.Queue(**params)
+        else:
+            class NoQueue:
+                def send(self, *args):
+                    pass
+            
+            self.queue = NoQueue()
+    
+    def close(self):
+        self.wrapper.close()
     
     @backend_method
     def list_accounts(self, user, marker=None, limit=10000):
         """Return a list of accounts the user can access."""
         
+        logger.debug("list_accounts: %s %s %s", user, marker, limit)
         allowed = self._allowed_accounts(user)
         start, limit = self._list_limits(allowed, marker, limit)
         return allowed[start:start + limit]
     
     @backend_method
-    def get_account_meta(self, user, account, until=None):
-        """Return a dictionary with the account metadata."""
+    def get_account_meta(self, user, account, domain, until=None):
+        """Return a dictionary with the account metadata for the domain."""
         
-        logger.debug("get_account_meta: %s %s", account, until)
-        node = self._lookup_account(account, user == account)
+        logger.debug("get_account_meta: %s %s %s", account, domain, until)
+        path, node = self._lookup_account(account, user == account)
         if user != account:
             if until or node is None or account not in self._allowed_accounts(user):
                 raise NotAllowedError
         try:
             props = self._get_properties(node, until)
-            version_id = props[SERIAL]
-            mtime = props[MTIME]
+            mtime = props[self.MTIME]
         except NameError:
-            # Account does not exist before until.
-            version_id = None
+            props = None
             mtime = until
-        object_count, bytes, tstamp = self._get_statistics(node, account, until)
-        if mtime > tstamp:
-            tstamp = mtime
+        count, bytes, tstamp = self._get_statistics(node, until)
+        tstamp = max(tstamp, mtime)
         if until is None:
             modified = tstamp
         else:
-            modified = self._get_statistics(node, account)[2] # Overall last modification.
-            if mtime > modified:
-                modified = mtime
-        
-        # Proper count.
-        count = self.node.node_children(node)
-        object_count -= count
+            modified = self._get_statistics(node)[2] # Overall last modification.
+            modified = max(modified, mtime)
         
         if user != account:
             meta = {'name': account}
         else:
             meta = {}
-            if version_id is not None:
-                meta.update(dict(self.node.attribute_get(version_id)))
+            if props is not None:
+                meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
             if until is not None:
                 meta.update({'until_timestamp': tstamp})
             meta.update({'name': account, 'count': count, 'bytes': bytes})
@@ -162,14 +185,14 @@ class ModularBackend(BaseBackend):
         return meta
     
     @backend_method
-    def update_account_meta(self, user, account, meta, replace=False):
-        """Update the metadata associated with the account."""
+    def update_account_meta(self, user, account, domain, meta, replace=False):
+        """Update the metadata associated with the account for the domain."""
         
-        logger.debug("update_account_meta: %s %s %s", account, meta, replace)
+        logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
         if user != account:
             raise NotAllowedError
-        node = self._lookup_account(account, True)
-        self._put_metadata(user, node, meta, replace, False)
+        path, node = self._lookup_account(account, True)
+        self._put_metadata(user, node, domain, meta, replace)
     
     @backend_method
     def get_account_groups(self, user, account):
@@ -201,29 +224,42 @@ class ModularBackend(BaseBackend):
                 self.permissions.group_addmany(account, k, v)
     
     @backend_method
-    def put_account(self, user, account):
+    def get_account_policy(self, user, account):
+        """Return a dictionary with the account policy."""
+        
+        logger.debug("get_account_policy: %s", account)
+        if user != account:
+            if account not in self._allowed_accounts(user):
+                raise NotAllowedError
+            return {}
+        path, node = self._lookup_account(account, True)
+        return self._get_policy(node)
+    
+    @backend_method
+    def update_account_policy(self, user, account, policy, replace=False):
+        """Update the policy associated with the account."""
+        
+        logger.debug("update_account_policy: %s %s %s", account, policy, replace)
+        if user != account:
+            raise NotAllowedError
+        path, node = self._lookup_account(account, True)
+        self._check_policy(policy)
+        self._put_policy(node, policy, replace)
+    
+    @backend_method
+    def put_account(self, user, account, policy={}):
         """Create a new account with the given name."""
         
-        logger.debug("put_account: %s", account)
+        logger.debug("put_account: %s %s", account, policy)
         if user != account:
             raise NotAllowedError
         node = self.node.node_lookup(account)
         if node is not None:
             raise NameError('Account already exists')
-        node = self.node.node_create(ROOTNODE, account)
-        self.node.version_create(node, 0, None, account, CLUSTER_NORMAL)
-    
-    
-    
-    
-    
-    
-    
-    
-    
-    
-    
-    
+        if policy:
+            self._check_policy(policy)
+        node = self._put_path(user, self.ROOTNODE, account)
+        self._put_policy(node, policy, True)
     
     @backend_method
     def delete_account(self, user, account):
@@ -232,68 +268,70 @@ class ModularBackend(BaseBackend):
         logger.debug("delete_account: %s", account)
         if user != account:
             raise NotAllowedError
-        count = self._get_pathstats(account)[0]
-        if count > 0:
+        node = self.node.node_lookup(account)
+        if node is None:
+            return
+        if not self.node.node_remove(node):
             raise IndexError('Account is not empty')
-        sql = 'delete from versions where name = ?'
-        self.con.execute(sql, (account,))
         self.permissions.group_destroy(account)
     
     @backend_method
     def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None):
         """Return a list of containers existing under an account."""
         
-        logger.debug("list_containers: %s %s %s %s", account, marker, limit, until)
+        logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until)
         if user != account:
             if until or account not in self._allowed_accounts(user):
                 raise NotAllowedError
             allowed = self._allowed_containers(user, account)
             start, limit = self._list_limits(allowed, marker, limit)
             return allowed[start:start + limit]
-        else:
-            if shared:
-                allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
-                start, limit = self._list_limits(allowed, marker, limit)
-                return allowed[start:start + limit]
-        return [x[0] for x in self._list_objects(account, '', '/', marker, limit, False, [], until)]
+        if shared:
+            allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]
+            allowed = list(set(allowed))
+            start, limit = self._list_limits(allowed, marker, limit)
+            return allowed[start:start + limit]
+        node = self.node.node_lookup(account)
+        return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
     
     @backend_method
-    def get_container_meta(self, user, account, container, until=None):
-        """Return a dictionary with the container metadata."""
+    def get_container_meta(self, user, account, container, domain, until=None):
+        """Return a dictionary with the container metadata for the domain."""
         
-        logger.debug("get_container_meta: %s %s %s", account, container, until)
+        logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
         if user != account:
             if until or container not in self._allowed_containers(user, account):
                 raise NotAllowedError
-        path, version_id, mtime = self._get_containerinfo(account, container, until)
-        count, bytes, tstamp = self._get_pathstats(path, until)
-        if mtime > tstamp:
-            tstamp = mtime
+        path, node = self._lookup_container(account, container)
+        props = self._get_properties(node, until)
+        mtime = props[self.MTIME]
+        count, bytes, tstamp = self._get_statistics(node, until)
+        tstamp = max(tstamp, mtime)
         if until is None:
             modified = tstamp
         else:
-            modified = self._get_pathstats(path)[2] # Overall last modification
-            if mtime > modified:
-                modified = mtime
+            modified = self._get_statistics(node)[2] # Overall last modification.
+            modified = max(modified, mtime)
         
         if user != account:
-            meta = {'name': container, 'modified': modified}
+            meta = {'name': container}
         else:
-            meta = self._get_metadata(path, version_id)
-            meta.update({'name': container, 'count': count, 'bytes': bytes, 'modified': modified})
+            meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
             if until is not None:
                 meta.update({'until_timestamp': tstamp})
+            meta.update({'name': container, 'count': count, 'bytes': bytes})
+        meta.update({'modified': modified})
         return meta
     
     @backend_method
-    def update_container_meta(self, user, account, container, meta, replace=False):
-        """Update the metadata associated with the container."""
+    def update_container_meta(self, user, account, container, domain, meta, replace=False):
+        """Update the metadata associated with the container for the domain."""
         
-        logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
+        logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
         if user != account:
             raise NotAllowedError
-        path, version_id, mtime = self._get_containerinfo(account, container)
-        self._put_metadata(user, path, meta, replace, False)
+        path, node = self._lookup_container(account, container)
+        self._put_metadata(user, node, domain, meta, replace)
     
     @backend_method
     def get_container_policy(self, user, account, container):
@@ -304,33 +342,29 @@ class ModularBackend(BaseBackend):
             if container not in self._allowed_containers(user, account):
                 raise NotAllowedError
             return {}
-        path = self._get_containerinfo(account, container)[0]
-        return self.policy.policy_get(path)
+        path, node = self._lookup_container(account, container)
+        return self._get_policy(node)
     
     @backend_method
     def update_container_policy(self, user, account, container, policy, replace=False):
-        """Update the policy associated with the account."""
+        """Update the policy associated with the container."""
         
         logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
         if user != account:
             raise NotAllowedError
-        path = self._get_containerinfo(account, container)[0]
+        path, node = self._lookup_container(account, container)
         self._check_policy(policy)
-        if replace:
-            for k, v in self.default_policy.iteritems():
-                if k not in policy:
-                    policy[k] = v
-        self.policy.policy_set(path, policy)
+        self._put_policy(node, policy, replace)
     
     @backend_method
-    def put_container(self, user, account, container, policy=None):
+    def put_container(self, user, account, container, policy={}):
         """Create a new container with the given name."""
         
         logger.debug("put_container: %s %s %s", account, container, policy)
         if user != account:
             raise NotAllowedError
         try:
-            path, version_id, mtime = self._get_containerinfo(account, container)
+            path, node = self._lookup_container(account, container)
         except NameError:
             pass
         else:
@@ -338,11 +372,8 @@ class ModularBackend(BaseBackend):
         if policy:
             self._check_policy(policy)
         path = '/'.join((account, container))
-        version_id = self._put_version(path, user)[0]
-        for k, v in self.default_policy.iteritems():
-            if k not in policy:
-                policy[k] = v
-        self.policy.policy_set(path, policy)
+        node = self._put_path(user, self._lookup_account(account, True)[1], path)
+        self._put_policy(node, policy, True)
     
     @backend_method
     def delete_container(self, user, account, container, until=None):
@@ -351,30 +382,32 @@ class ModularBackend(BaseBackend):
         logger.debug("delete_container: %s %s %s", account, container, until)
         if user != account:
             raise NotAllowedError
-        path, version_id, mtime = self._get_containerinfo(account, container)
+        path, node = self._lookup_container(account, container)
         
         if until is not None:
-            sql = '''select version_id from versions where name like ? and tstamp <= ?
-                        and version_id not in (select version_id from (%s))'''
-            sql = sql % self._sql_until() # Do not delete current versions.
-            c = self.con.execute(sql, (path + '/%', until))
-            for v in [x[0] for x in c.fetchall()]:
-                self._del_version(v)
+            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
+            for h in hashes:
+                self.store.map_delete(h)
+            self.node.node_purge_children(node, until, CLUSTER_DELETED)
+            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
             return
         
-        count = self._get_pathstats(path)[0]
-        if count > 0:
+        if self._get_statistics(node)[0] > 0:
             raise IndexError('Container is not empty')
-        sql = 'delete from versions where name = ? or name like ?' # May contain hidden items.
-        self.con.execute(sql, (path, path + '/%',))
-        self.policy.policy_unset(path)
-        self._copy_version(user, account, account, True, False) # New account version (for timestamp update).
+        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
+        for h in hashes:
+            self.store.map_delete(h)
+        self.node.node_purge_children(node, inf, CLUSTER_DELETED)
+        self.node.node_remove(node)
+        self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+    
+    # XXX: Up to here...
     
     @backend_method
-    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
+    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
         """Return a list of objects existing under a container."""
         
-        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
+        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
         allowed = []
         if user != account:
             if until:
@@ -385,14 +418,16 @@ class ModularBackend(BaseBackend):
         else:
             if shared:
                 allowed = self.permissions.access_list_shared('/'.join((account, container)))
-        path, version_id, mtime = self._get_containerinfo(account, container, until)
-        return self._list_objects(path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
+                if not allowed:
+                    return []
+        path, node = self._lookup_container(account, container)
+        return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed)
     
     @backend_method
-    def list_object_meta(self, user, account, container, until=None):
-        """Return a list with all the container's object meta keys."""
+    def list_object_meta(self, user, account, container, domain, until=None):
+        """Return a list with all the container's object meta keys for the domain."""
         
-        logger.debug("list_object_meta: %s %s %s", account, container, until)
+        logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
         allowed = []
         if user != account:
             if until:
@@ -400,54 +435,64 @@ class ModularBackend(BaseBackend):
             allowed = self.permissions.access_list_paths(user, '/'.join((account, container)))
             if not allowed:
                 raise NotAllowedError
-        path, version_id, mtime = self._get_containerinfo(account, container, until)
-        sql = '''select distinct m.key from (%s) o, metadata m
-                    where m.version_id = o.version_id and o.name like ?'''
-        sql = sql % self._sql_until(until)
-        param = (path + '/%',)
-        if allowed:
-            for x in allowed:
-                sql += ' and o.name like ?'
-                param += (x,)
-        c = self.con.execute(sql, param)
-        return [x[0] for x in c.fetchall()]
-    
-    @backend_method
-    def get_object_meta(self, user, account, container, name, version=None):
-        """Return a dictionary with the object metadata."""
-        
-        logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
+        path, node = self._lookup_container(account, container)
+        before = until if until is not None else inf
+        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
+    
+    @backend_method
+    def get_object_meta(self, user, account, container, name, domain, version=None):
+        """Return a dictionary with the object metadata for the domain."""
+        
+        logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
         self._can_read(user, account, container, name)
-        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
+        path, node = self._lookup_object(account, container, name)
+        props = self._get_version(node, version)
         if version is None:
-            modified = mtime
+            modified = props[self.MTIME]
         else:
-            modified = self._get_version(path, version)[2] # Overall last modification
-        
-        meta = self._get_metadata(path, version_id)
-        meta.update({'name': name, 'bytes': size})
-        meta.update({'version': version_id, 'version_timestamp': mtime})
-        meta.update({'modified': modified, 'modified_by': muser})
+            try:
+                modified = self._get_version(node)[self.MTIME] # Overall last modification.
+            except NameError: # Object may be deleted.
+                del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED)
+                if del_props is None:
+                    raise NameError('Object does not exist')
+                modified = del_props[self.MTIME]
+        
+        meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
+        meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
+        meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
+        meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]})
         return meta
     
     @backend_method
-    def update_object_meta(self, user, account, container, name, meta, replace=False):
-        """Update the metadata associated with the object."""
+    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
+        """Update the metadata associated with the object for the domain and return the new version."""
         
-        logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
+        logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
         self._can_write(user, account, container, name)
-        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name)
-        self._put_metadata(user, path, meta, replace)
+        path, node = self._lookup_object(account, container, name)
+        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
+        self._apply_versioning(account, container, src_version_id)
+        return dest_version_id
     
     @backend_method
     def get_object_permissions(self, user, account, container, name):
-        """Return the path from which this object gets its permissions from,\
+        """Return the action allowed on the object, the path
+        from which the object gets its permissions from,
         along with a dictionary containing the permissions."""
         
         logger.debug("get_object_permissions: %s %s %s", account, container, name)
-        self._can_read(user, account, container, name)
-        path = self._get_objectinfo(account, container, name)[0]
-        return self.permissions.access_inherit(path)
+        allowed = 'write'
+        if user != account:
+            path = '/'.join((account, container, name))
+            if self.permissions.access_check(path, self.WRITE, user):
+                allowed = 'write'
+            elif self.permissions.access_check(path, self.READ, user):
+                allowed = 'read'
+            else:
+                raise NotAllowedError
+        path = self._lookup_object(account, container, name)[0]
+        return (allowed,) + self.permissions.access_inherit(path)
     
     @backend_method
     def update_object_permissions(self, user, account, container, name, permissions):
@@ -456,20 +501,21 @@ class ModularBackend(BaseBackend):
         logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions)
         if user != account:
             raise NotAllowedError
-        path = self._get_objectinfo(account, container, name)[0]
+        path = self._lookup_object(account, container, name)[0]
         self._check_permissions(path, permissions)
         self.permissions.access_set(path, permissions)
     
     @backend_method
     def get_object_public(self, user, account, container, name):
-        """Return the public URL of the object if applicable."""
+        """Return the public id of the object if applicable."""
         
         logger.debug("get_object_public: %s %s %s", account, container, name)
         self._can_read(user, account, container, name)
-        path = self._get_objectinfo(account, container, name)[0]
-        if self.permissions.public_check(path):
-            return '/public/' + path
-        return None
+        path = self._lookup_object(account, container, name)[0]
+        p = self.permissions.public_get(path)
+        if p is not None:
+            p += ULTIMATE_ANSWER
+        return p
     
     @backend_method
     def update_object_public(self, user, account, container, name, public):
@@ -477,7 +523,7 @@ class ModularBackend(BaseBackend):
         
         logger.debug("update_object_public: %s %s %s %s", account, container, name, public)
         self._can_write(user, account, container, name)
-        path = self._get_objectinfo(account, container, name)[0]
+        path = self._lookup_object(account, container, name)[0]
         if not public:
             self.permissions.public_unset(path)
         else:
@@ -489,125 +535,187 @@ class ModularBackend(BaseBackend):
         
         logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version)
         self._can_read(user, account, container, name)
-        path, version_id, muser, mtime, size = self._get_objectinfo(account, container, name, version)
-        hashmap = self.mapper.map_retr(version_id)
-        return size, [binascii.hexlify(x) for x in hashmap]
+        path, node = self._lookup_object(account, container, name)
+        props = self._get_version(node, version)
+        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
+        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
     
-    @backend_method
-    def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
-        """Create/update an object with the specified size and partial hashes."""
-        
-        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+    def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False):
         if permissions is not None and user != account:
             raise NotAllowedError
         self._can_write(user, account, container, name)
-        missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
-        if missing:
-            ie = IndexError()
-            ie.data = missing
-            raise ie
-        path = self._get_containerinfo(account, container)[0]
-        path = '/'.join((path, name))
         if permissions is not None:
+            path = '/'.join((account, container, name))
             self._check_permissions(path, permissions)
-        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace_meta, False)
-        sql = 'update versions set size = ? where version_id = ?'
-        self.con.execute(sql, (size, dest_version_id))
-        self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
-        for k, v in meta.iteritems():
-            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
-            self.con.execute(sql, (dest_version_id, k, v))
+        
+        account_path, account_node = self._lookup_account(account, True)
+        container_path, container_node = self._lookup_container(account, container)
+        path, node = self._put_object_node(container_path, container_node, name)
+        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
+        
+        # Check quota.
+        versioning = self._get_policy(container_node)['versioning']
+        if versioning != 'auto':
+            size_delta = size - 0 # TODO: Get previous size.
+        else:
+            size_delta = size
+        if size_delta > 0:
+            account_quota = long(self._get_policy(account_node)['quota'])
+            container_quota = long(self._get_policy(container_node)['quota'])
+            if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
+               (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
+                # This must be executed in a transaction, so the version is never created if it fails.
+                raise QuotaError
+        
         if permissions is not None:
             self.permissions.access_set(path, permissions)
+        self._apply_versioning(account, container, pre_version_id)
+        return pre_version_id, dest_version_id
     
     @backend_method
-    def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
-        """Copy an object's data and metadata."""
+    def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
+        """Create/update an object with the specified size and partial hashes."""
         
-        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
-        if permissions is not None and user != account:
-            raise NotAllowedError
-        self._can_read(user, account, src_container, src_name)
-        self._can_write(user, account, dest_container, dest_name)
-        self._get_containerinfo(account, src_container)
-        if src_version is None:
-            src_path = self._get_objectinfo(account, src_container, src_name)[0]
-        else:
-            src_path = '/'.join((account, src_container, src_name))
-        dest_path = self._get_containerinfo(account, dest_container)[0]
-        dest_path = '/'.join((dest_path, dest_name))
-        if permissions is not None:
-            self._check_permissions(dest_path, permissions)
-        src_version_id, dest_version_id = self._copy_version(user, src_path, dest_path, not replace_meta, True, src_version)
-        for k, v in dest_meta.iteritems():
-            sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
-            self.con.execute(sql, (dest_version_id, k, v))
-        if permissions is not None:
-            self.permissions.access_set(dest_path, permissions)
+        logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+        if size == 0: # No such thing as an empty hashmap.
+            hashmap = [self.put_block('')]
+        map = HashMap(self.block_size, self.hash_algorithm)
+        map.extend([binascii.unhexlify(x) for x in hashmap])
+        missing = self.store.block_search(map)
+        if missing:
+            ie = IndexError()
+            ie.data = [binascii.hexlify(x) for x in missing]
+            raise ie
+        
+        hash = map.hash()
+        pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
+        self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
+        self.store.map_put(hash, map)
+        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
+        return dest_version_id
+    
+    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
+        self._can_read(user, src_account, src_container, src_name)
+        path, node = self._lookup_object(src_account, src_container, src_name)
+        # TODO: Will do another fetch of the properties in duplicate version...
+        props = self._get_version(node, src_version) # Check to see if source exists.
+        src_version_id = props[self.SERIAL]
+        hash = props[self.HASH]
+        size = props[self.SIZE]
+        
+        is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid.
+        pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy)
+        self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
+        return dest_version_id
     
     @backend_method
-    def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
-        """Move an object's data and metadata."""
+    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
+        """Copy an object's data and metadata."""
         
-        logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
-        self.copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
-        self.delete_object(user, account, src_container, src_name)
+        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
+        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
+        return dest_version_id
     
     @backend_method
-    def delete_object(self, user, account, container, name, until=None):
-        """Delete/purge an object."""
+    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
+        """Move an object's data and metadata."""
         
-        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
+        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
+        if user != src_account:
+            raise NotAllowedError
+        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
+        if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
+            self._delete_object(user, src_account, src_container, src_name)
+        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
+        return dest_version_id
+    
+    def _delete_object(self, user, account, container, name, until=None):
         if user != account:
             raise NotAllowedError
         
         if until is not None:
             path = '/'.join((account, container, name))
-            sql = '''select version_id from versions where name = ? and tstamp <= ?'''
-            c = self.con.execute(sql, (path, until))
-            for v in [x[0] in c.fetchall()]:
-                self._del_version(v)
+            node = self.node.node_lookup(path)
+            if node is None:
+                return
+            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
+            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
+            for h in hashes:
+                self.store.map_delete(h)
+            self.node.node_purge(node, until, CLUSTER_DELETED)
             try:
-                version_id = self._get_version(path)[0]
+                props = self._get_version(node)
             except NameError:
-                pass
-            else:
                 self.permissions.access_clear(path)
+            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
             return
         
-        path = self._get_objectinfo(account, container, name)[0]
-        self._put_version(path, user, 0, 1)
+        path, node = self._lookup_object(account, container, name)
+        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
+        self._apply_versioning(account, container, src_version_id)
         self.permissions.access_clear(path)
     
     @backend_method
+    def delete_object(self, user, account, container, name, until=None):
+        """Delete/purge an object."""
+        
+        logger.debug("delete_object: %s %s %s %s", account, container, name, until)
+        self._delete_object(user, account, container, name, until)
+    
+    @backend_method
     def list_versions(self, user, account, container, name):
         """Return a list of all (version, version_timestamp) tuples for an object."""
         
         logger.debug("list_versions: %s %s %s", account, container, name)
         self._can_read(user, account, container, name)
-        # This will even show deleted versions.
-        path = '/'.join((account, container, name))
-        sql = '''select distinct version_id, tstamp from versions where name = ? and hide = 0'''
-        c = self.con.execute(sql, (path,))
-        return [(int(x[0]), int(x[1])) for x in c.fetchall()]
+        path, node = self._lookup_object(account, container, name)
+        versions = self.node.node_get_versions(node)
+        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
+    
+    @backend_method
+    def get_uuid(self, user, uuid):
+        """Return the (account, container, name) for the UUID given."""
+        
+        logger.debug("get_uuid: %s", uuid)
+        info = self.node.latest_uuid(uuid)
+        if info is None:
+            raise NameError
+        path, serial = info
+        account, container, name = path.split('/', 2)
+        self._can_read(user, account, container, name)
+        return (account, container, name)
+    
+    @backend_method
+    def get_public(self, user, public):
+        """Return the (account, container, name) for the public id given."""
+        
+        logger.debug("get_public: %s", public)
+        if public is None or public < ULTIMATE_ANSWER:
+            raise NameError
+        path = self.permissions.public_path(public - ULTIMATE_ANSWER)
+        if path is None:
+            raise NameError
+        account, container, name = path.split('/', 2)
+        self._can_read(user, account, container, name)
+        return (account, container, name)
     
     @backend_method(autocommit=0)
     def get_block(self, hash):
         """Return a block's data."""
         
         logger.debug("get_block: %s", hash)
-        blocks = self.blocker.block_retr((binascii.unhexlify(hash),))
-        if not blocks:
+        block = self.store.block_get(binascii.unhexlify(hash))
+        if not block:
             raise NameError('Block does not exist')
-        return blocks[0]
+        return block
     
     @backend_method(autocommit=0)
     def put_block(self, data):
-        """Create a block and return the hash."""
+        """Store a block and return the hash."""
         
         logger.debug("put_block: %s", len(data))
-        hashes, absent = self.blocker.block_stor((data,))
-        return binascii.hexlify(hashes[0])
+        return binascii.hexlify(self.store.block_put(data))
     
     @backend_method(autocommit=0)
     def update_block(self, hash, data, offset=0):
@@ -616,201 +724,67 @@ class ModularBackend(BaseBackend):
         logger.debug("update_block: %s %s %s", hash, len(data), offset)
         if offset == 0 and len(data) == self.block_size:
             return self.put_block(data)
-        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
+        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
         return binascii.hexlify(h)
     
+    # Path functions.
     
+    def _generate_uuid(self):
+        return str(uuidlib.uuid4())
     
-    
-    
-    
-    
-    
-    
-    
-    
-    
-    
-    
-    
-    
-    
-    def _sql_until(self, until=None):
-        """Return the sql to get the latest versions until the timestamp given."""
-        if until is None:
-            until = int(time.time())
-        sql = '''select version_id, name, tstamp, size from versions v
-                    where version_id = (select max(version_id) from versions
-                                        where v.name = name and tstamp <= %s)
-                    and hide = 0'''
-        return sql % (until,)
-    
-    def _put_version(self, path, user, size=0, hide=0):
-        tstamp = int(time.time())
-        sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)'
-        id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid
-        return str(id), tstamp
-    
-    def _get_versioninfo(self, account, container, name, until=None):
-        """Return path, latest version, associated timestamp and size until the timestamp given."""
-        
-        p = (account, container, name)
-        try:
-            p = p[:p.index(None)]
-        except ValueError:
-            pass
-        path = '/'.join(p)
+    def _put_object_node(self, path, parent, name):
+        path = '/'.join((path, name))
         node = self.node.node_lookup(path)
-        if node is not None:
-            props = self.node.version_lookup(node, until, CLUSTER_NORMAL)
-            # TODO: Do one lookup.
-            if props is None and until is not None:
-                props = self.node.version_lookup(node, until, CLUSTER_HISTORY)
-            if props is not None:
-                return path, props[SERIAL], props[MTIME], props[SIZE]
-        raise NameError('Path does not exist')
-    
-    def _get_accountinfo(self, account, until=None):
-        try:
-            path, version_id, mtime, size = self._get_versioninfo(account, None, None, until)
-            return version_id, mtime
-        except:
-            raise NameError('Account does not exist')
-    
-    def _get_containerinfo(self, account, container, until=None):
-        try:
-            path, version_id, mtime, size = self._get_versioninfo(account, container, None, until)
-            return path, version_id, mtime
-        except:
-            raise NameError('Container does not exist')
-    
-    def _get_objectinfo(self, account, container, name, version=None):
-        path = '/'.join((account, container, name))
-        version_id, muser, mtime, size = self._get_version(path, version)
-        return path, version_id, muser, mtime, size
-    
-    def _create_account(self, user, account):
-        try:
-            self._get_accountinfo(account)
-        except NameError:
-            self._put_version(account, user)
-    
-    def _check_policy(self, policy):
-        for k in policy.keys():
-            if policy[k] == '':
-                policy[k] = self.default_policy.get(k)
-        for k, v in policy.iteritems():
-            if k == 'quota':
-                q = int(v) # May raise ValueError.
-                if q < 0:
-                    raise ValueError
-            elif k == 'versioning':
-                if v not in ['auto', 'manual', 'none']:
-                    raise ValueError
-            else:
-                raise ValueError
-    
-    def _list_limits(self, listing, marker, limit):
-        start = 0
-        if marker:
-            try:
-                start = listing.index(marker) + 1
-            except ValueError:
-                pass
-        if not limit or limit > 10000:
-            limit = 10000
-        return start, limit
-    
-    def _list_objects(self, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
-        cont_prefix = path + '/'
-        if keys and len(keys) > 0:
-            sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
-                        m.version_id = o.version_id and m.key in (%s)'''
-            sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
-            param = (cont_prefix + prefix + '%',) + tuple(keys)
-            if allowed:
-                sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')'
-                param += tuple([x + '%' for x in allowed])
-            sql += ' order by o.name'
-        else:
-            sql = 'select name, version_id from (%s) where name like ?'
-            sql = sql % self._sql_until(until)
-            param = (cont_prefix + prefix + '%',)
-            if allowed:
-                sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')'
-                param += tuple([x + '%' for x in allowed])
-            sql += ' order by name'
-        c = self.con.execute(sql, param)
-        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
-        if delimiter:
-            pseudo_objects = []
-            for x in objects:
-                pseudo_name = x[0]
-                i = pseudo_name.find(delimiter, len(prefix))
-                if not virtual:
-                    # If the delimiter is not found, or the name ends
-                    # with the delimiter's first occurence.
-                    if i == -1 or len(pseudo_name) == i + len(delimiter):
-                        pseudo_objects.append(x)
-                else:
-                    # If the delimiter is found, keep up to (and including) the delimiter.
-                    if i != -1:
-                        pseudo_name = pseudo_name[:i + len(delimiter)]
-                    if pseudo_name not in [y[0] for y in pseudo_objects]:
-                        if pseudo_name == x[0]:
-                            pseudo_objects.append(x)
-                        else:
-                            pseudo_objects.append((pseudo_name, None))
-            objects = pseudo_objects
-        
-        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
-        return objects[start:start + limit]
-    
-    def _del_version(self, version):
-        self.mapper.map_remv(version)
-        sql = 'delete from versions where version_id = ?'
-        self.con.execute(sql, (version,))
+        if node is None:
+            node = self.node.node_create(parent, path)
+        return path, node
     
-    # Path functions.
+    def _put_path(self, user, parent, path):
+        node = self.node.node_create(parent, path)
+        self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL)
+        return node
     
     def _lookup_account(self, account, create=True):
         node = self.node.node_lookup(account)
         if node is None and create:
-            node = self.node.node_create(ROOTNODE, account)
-            self.node.version_create(node, 0, None, account, CLUSTER_NORMAL)
-        return node
+            node = self._put_path(account, self.ROOTNODE, account) # User is account.
+        return account, node
     
     def _lookup_container(self, account, container):
-        node = self.node.node_lookup('/'.join((account, container)))
+        path = '/'.join((account, container))
+        node = self.node.node_lookup(path)
         if node is None:
             raise NameError('Container does not exist')
-        return node
+        return path, node
     
     def _lookup_object(self, account, container, name):
-        node = self.node.node_lookup('/'.join((account, container, name)))
+        path = '/'.join((account, container, name))
+        node = self.node.node_lookup(path)
         if node is None:
             raise NameError('Object does not exist')
-        return node
+        return path, node
     
     def _get_properties(self, node, until=None):
         """Return properties until the timestamp given."""
         
         before = until if until is not None else inf
         props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
-        # TODO: Do one lookup.
         if props is None and until is not None:
             props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
         if props is None:
             raise NameError('Path does not exist')
         return props
     
-    def _get_statistics(self, node, path, until=None):
-        """Return count, sum of size and latest timestamp of everything under node/path."""
+    def _get_statistics(self, node, until=None):
+        """Return count, sum of size and latest timestamp of everything under node."""
         
         if until is None:
-            return self.node.node_statistics(node, CLUSTER_NORMAL)
+            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
         else:
-            return self.node.path_statistics(path + '/', until, CLUSTER_DELETED)
+            stats = self.node.statistics_latest(node, until, CLUSTER_DELETED)
+        if stats is None:
+            stats = (0, 0, 0)
+        return stats
     
     def _get_version(self, node, version=None):
         if version is None:
@@ -818,61 +792,127 @@ class ModularBackend(BaseBackend):
             if props is None:
                 raise NameError('Object does not exist')
         else:
+            try:
+                version = int(version)
+            except ValueError:
+                raise IndexError('Version does not exist')
             props = self.node.version_get_properties(version)
-            if props is None or props[CLUSTER] == CLUSTER_DELETE:
+            if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
                 raise IndexError('Version does not exist')
         return props
     
-    def _copy_version(self, user, src_node, dest_node, copy_meta=True, copy_data=True, src_version=None):
-        # Get source serial and size.
-        if src_version is not None:
-            src_props = self._get_version(src_node, src_version)
-            src_version_id = src_props[SERIAL]
-            size = src_props[SIZE]
+    def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False):
+        """Create a new version of the node."""
+        
+        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
+        if props is not None:
+            src_version_id = props[self.SERIAL]
+            src_hash = props[self.HASH]
+            src_size = props[self.SIZE]
         else:
-            # Latest or create from scratch.
-            try:
-                src_props = self._get_version(src_node)
-                src_version_id = src_props[SERIAL]
-                size = src_props[SIZE]
-            except NameError:
-                src_version_id = None
-                size = 0
-        if not copy_data:
-            size = 0
-        
-        # Move the latest version at destination to CLUSTER_HISTORY and create new.
-        if src_node == dest_node and src_version is None and src_version_id is not None:
-            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
+            src_version_id = None
+            src_hash = None
+            src_size = 0
+        if size is None:
+            hash = src_hash # This way hash can be set to None.
+            size = src_size
+        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
+        
+        if src_node is None:
+            pre_version_id = src_version_id
         else:
-            dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
-            if dest_props is not None:
-                self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY)
-        dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, CLUSTER_NORMAL)
-        
-        # Copy meta and data.
-        if copy_meta and src_version_id is not None:
-            self.attribute_copy(src_version_id, dest_version_id)
-        if copy_data and src_version_id is not None:
-            hashmap = self.mapper.map_retr(src_version_id)
-            self.mapper.map_stor(dest_version_id, hashmap)
+            pre_version_id = None
+            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+            if props is not None:
+                pre_version_id = props[self.SERIAL]
+        if pre_version_id is not None:
+            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
         
-        return src_version_id, dest_version_id
+        dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster)
+        return pre_version_id, dest_version_id
     
-    def _get_metadata(self, version):
-        if version is None:
-            return {}
-        return dict(self.node.attribute_get(version))
+    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
+        if src_version_id is not None:
+            self.node.attribute_copy(src_version_id, dest_version_id)
+        if not replace:
+            self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
+            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
+        else:
+            self.node.attribute_del(dest_version_id, domain)
+            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
     
-    def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
+    def _put_metadata(self, user, node, domain, meta, replace=False):
         """Create a new version and store metadata."""
         
-        src_version_id, dest_version_id = self._copy_version(user, node, node, not replace, copy_data)
-        if not replace:
-            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
-            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
-        else:
-            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
+        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
+        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
+        return src_version_id, dest_version_id
+    
+    def _list_limits(self, listing, marker, limit):
+        start = 0
+        if marker:
+            try:
+                start = listing.index(marker) + 1
+            except ValueError:
+                pass
+        if not limit or limit > 10000:
+            limit = 10000
+        return start, limit
+    
+    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]):
+        cont_prefix = path + '/'
+        prefix = cont_prefix + prefix
+        start = cont_prefix + marker if marker else None
+        before = until if until is not None else inf
+        filterq = keys if domain else []
+        sizeq = size_range
+        
+        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq)
+        objects.extend([(p, None) for p in prefixes] if virtual else [])
+        objects.sort(key=lambda x: x[0])
+        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
+        
+        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
+        return objects[start:start + limit]
+    
+    # Policy functions.
+    
+    def _check_policy(self, policy):
+        for k in policy.keys():
+            if policy[k] == '':
+                policy[k] = self.default_policy.get(k)
+        for k, v in policy.iteritems():
+            if k == 'quota':
+                q = int(v) # May raise ValueError.
+                if q < 0:
+                    raise ValueError
+            elif k == 'versioning':
+                if v not in ['auto', 'none']:
+                    raise ValueError
+            else:
+                raise ValueError
+    
+    def _put_policy(self, node, policy, replace):
+        if replace:
+            for k, v in self.default_policy.iteritems():
+                if k not in policy:
+                    policy[k] = v
+        self.node.policy_set(node, policy)
+    
+    def _get_policy(self, node):
+        policy = self.default_policy.copy()
+        policy.update(self.node.policy_get(node))
+        return policy
+    
+    def _apply_versioning(self, account, container, version_id):
+        if version_id is None:
+            return
+        path, node = self._lookup_container(account, container)
+        versioning = self._get_policy(node)['versioning']
+        if versioning != 'auto':
+            hash = self.node.version_remove(version_id)
+            self.store.map_delete(hash)
+            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
     
     # Access control functions.
     
@@ -882,26 +922,43 @@ class ModularBackend(BaseBackend):
     
     def _check_permissions(self, path, permissions):
         # raise ValueError('Bad characters in permissions')
-        
-        # Check for existing permissions.
-        paths = self.permissions.access_list(path)
-        if paths:
-            ae = AttributeError()
-            ae.data = paths
-            raise ae
+        pass
+    
+    def _get_permissions_path(self, account, container, name):
+        path = '/'.join((account, container, name))
+        permission_paths = self.permissions.access_inherit(path)
+        permission_paths.sort()
+        permission_paths.reverse()
+        for p in permission_paths:
+            if p == path:
+                return p
+            else:
+                try:
+                    parts = p.split('/', 2)
+                    if len(parts) != 3:
+                        return None
+                    path, node = self._lookup_object(*p.split('/', 2))
+                    props = self._get_version(node)
+                    # XXX: Put type in properties...
+                    meta = dict(self.node.attribute_get(props[self.SERIAL], 'pithos'))
+                    if meta['Content-Type'] == 'application/directory':
+                        return p
+                except NameError:
+                    pass
+        return None
     
     def _can_read(self, user, account, container, name):
         if user == account:
             return True
-        path = '/'.join((account, container, name))
-        if not self.permissions.access_check(path, READ, user) and not self.permissions.access_check(path, WRITE, user):
+        path = self._get_permissions_path(account, container, name)
+        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
             raise NotAllowedError
     
     def _can_write(self, user, account, container, name):
         if user == account:
             return True
         path = '/'.join((account, container, name))
-        if not self.permissions.access_check(path, WRITE, user):
+        if not self.permissions.access_check(path, self.WRITE, user):
             raise NotAllowedError
     
     def _allowed_accounts(self, user):