Modular backend progress.
authorAntony Chazapis <chazapis@gmail.com>
Tue, 2 Aug 2011 16:43:25 +0000 (19:43 +0300)
committerAntony Chazapis <chazapis@gmail.com>
Tue, 2 Aug 2011 16:43:25 +0000 (19:43 +0300)
pithos/backends/lib/node.py
pithos/backends/modular.py

index 6e31e1b..ef9f555 100644 (file)
@@ -38,7 +38,7 @@ from dbworker import DBWorker
 
 ROOTNODE  = 0
 
-( SERIAL, NODE, SIZE, SOURCE, MTIME, CLUSTER ) = range(6)
+( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
 
 inf = float('inf')
 
@@ -87,13 +87,14 @@ _propnames = {
     'size'      : 2,
     'source'    : 3,
     'mtime'     : 4,
-    'cluster'   : 5,
+    'muser'     : 5,
+    'cluster'   : 6,
 }
 
 
 class Node(DBWorker):
     """Nodes store path organization.
-       Versions store obejct history.
+       Versions store object history.
        Attributes store metadata.
     """
     
@@ -105,7 +106,7 @@ class Node(DBWorker):
         execute(""" create table if not exists nodes
                           ( node       integer primary key,
                             parent     integer not null default 0,
-                            path       text    not null default ''
+                            path       text    not null default '',
                             foreign key (parent)
                             references nodes(node)
                             on update cascade
@@ -118,6 +119,7 @@ class Node(DBWorker):
                             population integer not null default 0,
                             size       integer not null default 0,
                             mtime      integer,
+                            muser      text    not null default '',
                             cluster    integer not null default 0,
                             primary key (node, cluster)
                             foreign key (node)
@@ -154,6 +156,28 @@ class Node(DBWorker):
         q = "insert or ignore into nodes(node, parent) values (?, ?)"
         execute(q, (ROOTNODE, ROOTNODE))
     
+    def node_create(self, parent, path):
+        """Create a new node from the given properties.
+           Return the node identifier of the new node.
+        """
+        
+        q = ("insert into nodes (parent, path) "
+             "values (?, ?)")
+        props = (parent, path)
+        return self.execute(q, props).lastrowid
+    
+    def node_lookup(self, path):
+        """Lookup the current node of the given path.
+           Return None if the path is not found.
+        """
+        
+        q = ("select node from nodes where path = ?")
+        self.execute(q, (path,))
+        r = self.fetchone()
+        if r is not None:
+            return r[0]
+        return None
+    
     def node_update_ancestors(self, node, population, size, mtime, cluster=0):
         """Update the population properties of the given node.
            Population properties keep track the population and total
@@ -202,18 +226,15 @@ class Node(DBWorker):
             return (0, 0, 0)
         return r
     
-    def version_create(self, node, size, source, cluster=0):
-        """Create a new version from the given properties.
-           Return the (serial, mtime) of the new version.
-        """
+    def node_children(self, node):
+        """Return node's child count."""
         
-        q = ("insert into nodes (node, size, source, mtime, cluster) "
-             "values (?, ?, ?, ?, ?)")
-        mtime = time()
-        props = (node, path, size, source, mtime, cluster)
-        serial = self.execute(q, props).lastrowid
-        self.node_update_ancestors(node, 1, size, mtime, cluster)
-        return serial, mtime
+        q = "select count(node) from nodes where parent = ?"
+        self.execute(q, (node,))
+        r = fetchone()
+        if r is None:
+            return 0
+        return r
     
 #     def node_remove(self, serial, recursive=0):
 #         """Remove the node specified by serial.
@@ -236,14 +257,46 @@ class Node(DBWorker):
 #         self.node_update_ancestors(parent, -pop-1, -size-popsize)
 #         return True
     
+    def version_create(self, node, size, source, muser, cluster=0):
+        """Create a new version from the given properties.
+           Return the (serial, mtime) of the new version.
+        """
+        
+        q = ("insert into nodes (node, size, source, mtime, muser, cluster) "
+             "values (?, ?, ?, ?, ?)")
+        mtime = time()
+        props = (node, path, size, source, mtime, muser, cluster)
+        serial = self.execute(q, props).lastrowid
+        self.node_update_ancestors(node, 1, size, mtime, cluster)
+        return serial, mtime
+    
+    def version_lookup(self, node, before=inf, cluster=0):
+        """Lookup the current version of the given node.
+           Return a list with its properties:
+           (serial, node, size, source, mtime, muser, cluster)
+           or None if the current version is not found in the given cluster.
+        """
+        
+        q = ("select serial, node, size, source, mtime, muser, cluster "
+             "from versions "
+             "where serial = (select max(serial) "
+                             "from versions "
+                             "where node = ? and mtime < ?) "
+             "and cluster = ?")
+        self.execute(q, (node, before, cluster))
+        props = self.fetchone()
+        if props is not None:
+            return props
+        return None
+    
     def version_get_properties(self, serial, keys=(), propnames=_propnames):
         """Return a sequence of values for the properties of
            the version specified by serial and the keys, in the order given.
            If keys is empty, return all properties in the order
-           (serial, node, size, source, mtime, cluster).
+           (serial, node, size, source, mtime, muser, cluster).
         """
         
-        q = ("select serial, node, path, size, source, mtime, cluster "
+        q = ("select serial, node, path, size, source, mtime, muser, cluster "
              "from nodes "
              "where serial = ?")
         self.execute(q, (serial,))
@@ -273,45 +326,59 @@ class Node(DBWorker):
 #         vals += (serial,)
 #         self.execute(q, vals)
     
-    def version_lookup(self, node, before=inf, cluster=0):
-        """Lookup the current version of the given node.
-           Return a list with its properties:
-           (serial, node, size, source, mtime, cluster)
-           or None if the version is not found.
-        """
+    def version_recluster(self, serial, cluster):
+        """Move the version into another cluster."""
         
-        q = ("select serial, node, size, source, mtime, cluster "
-             "from nodes "
-             "where serial = (select max(serial) "
-                             "from nodes "
-                             "where node = ? and cluster = ? and mtime < ?)")
-        self.execute(q, (node, cluster, before))
-        props = self.fetchone()
-        if props is not None:
-            return props
-        return None
-    
-    def node_lookup(self, path):
-        """Lookup the current node of the given path.
-           Return None if the path is not found.
-        """
+        props = self.node_get_properties(source)
+        node = props[NODE]
+        size = props[SIZE]
+        mtime = props[MTIME]
+        oldcluster = props[CLUSTER]
+        if cluster == oldcluster:
+            return
         
-        q = ("select node from nodes where path = ?")
-        self.execute(q, (path,))
-        r = self.fetchone()
-        if r is not None:
-            return r[0]
-        return None
+        self.node_update_ancestors(node, -1, -size, mtime, oldcluster)
+        self.node_update_ancestors(node, 1, size, mtime, cluster)
+
+        q = "update nodes set parent = ?, path = ? where serial = ?"
+        self.execute(q, (parent, path, source))
     
-    def node_create(self, parent, path):
-        """Create a new node from the given properties.
-           Return the node identifier of the new node.
+#     def version_copy(self, serial, node, muser, copy_attr=True):
+#         """Copy the version specified by serial into
+#            a new version of node. Optionally copy attributes.
+#            Return the (serial, mtime) of the new version.
+#         """
+#         
+#         props = self.version_get_properties(serial)
+#         if props is None:
+#             return None
+#         size = props[SIZE]
+#         cluster = props[CLUSTER]
+#         new_serial, mtime = self.version_create(node, size, serial, muser, cluster)
+#         if copy_attr:
+#             self.attribute_copy(serial, new_serial)
+#         return (new_serial, mtime)
+    
+    def path_statistics(self, prefix, before=inf, except_cluster=0):
+        """Return population, total size and last mtime
+           for all latest versions under prefix that
+           do not belong to the cluster.
         """
         
-        q = ("insert into nodes (parent, path) "
-             "values (?, ?)")
-        props = (parent, path)
-        return self.execute(q, props).lastrowid
+        q = ("select count(serial), sum(size), max(mtime) "
+             "from versions v "
+             "where serial = (select max(serial) "
+                             "from versions "
+                             "where node = v.node and mtime < ?) "
+             "and cluster != ? "
+             "and node in (select node "
+                          "from nodes "
+                          "where path like ?")
+        self.execute(q, (before, except_cluster, prefix + '%'))
+        r = fetchone()
+        if r is None:
+            return (0, 0, 0)
+        return r
     
     def parse_filters(self, filterq):
         preterms = filterq.split(',')
@@ -538,44 +605,9 @@ class Node(DBWorker):
 #              "and path = ? and mtime between ? and ?")
 #         execute(q, args)
 #         return serials
-
-#     def node_move(self, source, parent, path):
-#         """Move the source node into another path,
-#            possibly, in another parent's namespace.
-#            The node is moved with its namespace.
-#         """
-#         props = self.node_get_properties(source)
-# 
-#         oldparent = props[PARENT]
-#         size = props[SIZE]
-#         population = props[POPULATION]
-#         popsize = props[POPSIZE]
-# 
-#         sizedelta = size + popsize
-#         popdelta = population + 1
-#         node_update_ancestors = self.node_update_ancestors
-#         node_update_ancestors(oldparent, -popdelta, -sizedelta)
-#         node_update_ancestors(parent, popdelta, sizedelta)
-# 
-#         q = "update nodes set parent = ?, path = ? where serial = ?"
-#         self.execute(q, (parent, path, source))
-
-    def version_copy(self, serial, node=None, copy_attr=True):
-        """Copy the version specified by serial into
-           a new version of node. Optionally copy attributes.
-           Return the (serial, mtime) of the new version.
-        """
-        
-        props = self.version_get_properties(serial)
-        size = props[SIZE]
-        cluster = props[CLUSTER]
-        new_serial, mtime = self.version_create(node, path, size, serial, cluster)
-        if copy_attr:
-            self.attr_copy(serial, new_serial)
-        return (new_serial, mtime)
     
-    def node_get_attributes(self, serial, keys=()):
-        """Return a list of (key, value) pairs of the node specified by serial.
+    def attribute_get(self, serial, keys=()):
+        """Return a list of (key, value) pairs of the version specified by serial.
            If keys is empty, return all attributes.
            Othwerise, return only those specified.
         """
@@ -591,8 +623,8 @@ class Node(DBWorker):
             execute(q, (serial,))
         return self.fetchall()
     
-    def attr_set(self, serial, items):
-        """Set the attributes of the node specified by serial.
+    def attribute_set(self, serial, items):
+        """Set the attributes of the version specified by serial.
            Receive attributes as an iterable of (key, value) pairs.
         """
         
@@ -600,8 +632,8 @@ class Node(DBWorker):
              "values (?, ?, ?)")
         self.executemany(q, ((serial, k, v) for k, v in items))
     
-    def attr_del(self, serial, keys=()):
-        """Delete attributes of the node specified by serial.
+    def attribute_del(self, serial, keys=()):
+        """Delete attributes of the version specified by serial.
            If keys is empty, delete all attributes.
            Otherwise delete those specified.
         """
@@ -623,7 +655,7 @@ class Node(DBWorker):
 #         self.execute(q, (parent,))
 #         return [r[0] for r in self.fetchall()]
     
-    def attr_copy(self, source, dest):
+    def attribute_copy(self, source, dest):
         q = ("insert or replace into attributes "
              "select ?, key, value from attributes "
              "where serial = ?")
index ea68234..94da72c 100644 (file)
@@ -39,10 +39,15 @@ import hashlib
 import binascii
 
 from base import NotAllowedError, BaseBackend
+from lib.node import Node, ROOTNODE, SERIAL, SIZE, MTIME, MUSER, CLUSTER
 from lib.permissions import Permissions, READ, WRITE
 from lib.policy import Policy
 from lib.hashfiler import Mapper, Blocker
 
+( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
+
+inf = float('inf')
+
 
 logger = logging.getLogger(__name__)
 
@@ -86,29 +91,7 @@ class ModularBackend(BaseBackend):
         if not os.path.isdir(basepath):
             raise RuntimeError("Cannot open database at '%s'" % (db,))
         
-        self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)
-        
-        sql = '''pragma foreign_keys = on'''
-        self.con.execute(sql)
-        
-        sql = '''create table if not exists versions (
-                    version_id integer primary key,
-                    name text,
-                    user text,
-                    tstamp integer not null,
-                    size integer default 0,
-                    hide integer default 0)'''
-        self.con.execute(sql)
-        sql = '''create table if not exists metadata (
-                    version_id integer,
-                    key text,
-                    value text,
-                    primary key (version_id, key)
-                    foreign key (version_id) references versions(version_id)
-                    on delete cascade)'''
-        self.con.execute(sql)
-        
-        self.con.commit()
+        self.con = sqlite3.connect(basepath + '/db', check_same_thread=False)        
         
         params = {'blocksize': self.block_size,
                   'blockpath': basepath + '/blocks',
@@ -123,6 +106,9 @@ class ModularBackend(BaseBackend):
                   'cursor': self.con.cursor()}
         self.permissions = Permissions(**params)
         self.policy = Policy(**params)
+        self.node = Node(**params)
+        
+        self.con.commit()
     
     @backend_method
     def list_accounts(self, user, marker=None, limit=10000):
@@ -137,41 +123,41 @@ class ModularBackend(BaseBackend):
         """Return a dictionary with the account metadata."""
         
         logger.debug("get_account_meta: %s %s", account, until)
+        node = self._lookup_account(account, user == account)
         if user != account:
-            if until or account not in self._allowed_accounts(user):
+            if until or node is None or account not in self._allowed_accounts(user):
                 raise NotAllowedError
-        else:
-            self._create_account(user, account)
         try:
-            version_id, mtime = self._get_accountinfo(account, until)
+            props = self._get_properties(node, until)
+            version_id = props[SERIAL]
+            mtime = props[MTIME]
         except NameError:
             # Account does not exist before until.
             version_id = None
             mtime = until
-        count, bytes, tstamp = self._get_pathstats(account, until)
+        object_count, bytes, tstamp = self._get_statistics(node, account, until)
         if mtime > tstamp:
             tstamp = mtime
         if until is None:
             modified = tstamp
         else:
-            modified = self._get_pathstats(account)[2] # Overall last modification
+            modified = self._get_statistics(node, account)[2] # Overall last modification.
             if mtime > modified:
                 modified = mtime
         
         # Proper count.
-        sql = 'select count(name) from (%s) where name glob ? and not name glob ?'
-        sql = sql % self._sql_until(until)
-        c = self.con.execute(sql, (account + '/*', account + '/*/*'))
-        row = c.fetchone()
-        count = row[0]
+        count = self.node.node_children(node)
+        object_count -= count
         
         if user != account:
             meta = {'name': account}
         else:
-            meta = self._get_metadata(account, version_id)
-            meta.update({'name': account, 'count': count, 'bytes': bytes})
+            meta = {}
+            if version_id is not None:
+                meta.update(dict(self.node.attribute_get(version_id)))
             if until is not None:
                 meta.update({'until_timestamp': tstamp})
+            meta.update({'name': account, 'count': count, 'bytes': bytes})
         meta.update({'modified': modified})
         return meta
     
@@ -182,7 +168,8 @@ class ModularBackend(BaseBackend):
         logger.debug("update_account_meta: %s %s %s", account, meta, replace)
         if user != account:
             raise NotAllowedError
-        self._put_metadata(user, account, meta, replace, False)
+        node = self._lookup_account(account, True)
+        self._put_metadata(user, node, meta, replace, False)
     
     @backend_method
     def get_account_groups(self, user, account):
@@ -193,7 +180,7 @@ class ModularBackend(BaseBackend):
             if account not in self._allowed_accounts(user):
                 raise NotAllowedError
             return {}
-        self._create_account(user, account)
+        self._lookup_account(account, True)
         return self.permissions.group_dict(account)
     
     @backend_method
@@ -203,7 +190,7 @@ class ModularBackend(BaseBackend):
         logger.debug("update_account_groups: %s %s %s", account, groups, replace)
         if user != account:
             raise NotAllowedError
-        self._create_account(user, account)
+        self._lookup_account(account, True)
         self._check_groups(groups)
         if replace:
             self.permissions.group_destroy(account)
@@ -220,13 +207,23 @@ class ModularBackend(BaseBackend):
         logger.debug("put_account: %s", account)
         if user != account:
             raise NotAllowedError
-        try:
-            version_id, mtime = self._get_accountinfo(account)
-        except NameError:
-            pass
-        else:
+        node = self.node.node_lookup(account)
+        if node is not None:
             raise NameError('Account already exists')
-        self._put_version(account, user)
+        node = self.node.node_create(ROOTNODE, account)
+        self.node.version_create(node, 0, None, account, CLUSTER_NORMAL)
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
     
     @backend_method
     def delete_account(self, user, account):
@@ -308,7 +305,7 @@ class ModularBackend(BaseBackend):
                 raise NotAllowedError
             return {}
         path = self._get_containerinfo(account, container)[0]
-        return self._get_policy(path)
+        return self.policy.policy_get(path)
     
     @backend_method
     def update_container_policy(self, user, account, container, policy, replace=False):
@@ -622,6 +619,22 @@ class ModularBackend(BaseBackend):
         h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
         return binascii.hexlify(h)
     
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
     def _sql_until(self, until=None):
         """Return the sql to get the latest versions until the timestamp given."""
         if until is None:
@@ -632,63 +645,12 @@ class ModularBackend(BaseBackend):
                     and hide = 0'''
         return sql % (until,)
     
-    def _get_pathstats(self, path, until=None):
-        """Return count and sum of size of everything under path and latest timestamp."""
-        
-        sql = 'select count(version_id), total(size), max(tstamp) from (%s) where name like ?'
-        sql = sql % self._sql_until(until)
-        c = self.con.execute(sql, (path + '/%',))
-        row = c.fetchone()
-        tstamp = row[2] if row[2] is not None else 0
-        return int(row[0]), int(row[1]), int(tstamp)
-    
-    def _get_version(self, path, version=None):
-        if version is None:
-            sql = '''select version_id, user, tstamp, size, hide from versions where name = ?
-                        order by version_id desc limit 1'''
-            c = self.con.execute(sql, (path,))
-            row = c.fetchone()
-            if not row or int(row[4]):
-                raise NameError('Object does not exist')
-        else:
-            # The database (sqlite) will not complain if the version is not an integer.
-            sql = '''select version_id, user, tstamp, size from versions where name = ?
-                        and version_id = ?'''
-            c = self.con.execute(sql, (path, version))
-            row = c.fetchone()
-            if not row:
-                raise IndexError('Version does not exist')
-        return str(row[0]), str(row[1]), int(row[2]), int(row[3])
-    
     def _put_version(self, path, user, size=0, hide=0):
         tstamp = int(time.time())
         sql = 'insert into versions (name, user, tstamp, size, hide) values (?, ?, ?, ?, ?)'
         id = self.con.execute(sql, (path, user, tstamp, size, hide)).lastrowid
         return str(id), tstamp
     
-    def _copy_version(self, user, src_path, dest_path, copy_meta=True, copy_data=True, src_version=None):
-        if src_version is not None:
-            src_version_id, muser, mtime, size = self._get_version(src_path, src_version)
-        else:
-            # Latest or create from scratch.
-            try:
-                src_version_id, muser, mtime, size = self._get_version(src_path)
-            except NameError:
-                src_version_id = None
-                size = 0
-        if not copy_data:
-            size = 0
-        dest_version_id = self._put_version(dest_path, user, size)[0]
-        if copy_meta and src_version_id is not None:
-            sql = 'insert into metadata select %s, key, value from metadata where version_id = ?'
-            sql = sql % dest_version_id
-            self.con.execute(sql, (src_version_id,))
-        if copy_data and src_version_id is not None:
-            # TODO: Copy properly.
-            hashmap = self.mapper.map_retr(src_version_id)
-            self.mapper.map_stor(dest_version_id, hashmap)
-        return src_version_id, dest_version_id
-    
     def _get_versioninfo(self, account, container, name, until=None):
         """Return path, latest version, associated timestamp and size until the timestamp given."""
         
@@ -698,13 +660,15 @@ class ModularBackend(BaseBackend):
         except ValueError:
             pass
         path = '/'.join(p)
-        sql = '''select version_id, tstamp, size from (%s) where name = ?'''
-        sql = sql % self._sql_until(until)
-        c = self.con.execute(sql, (path,))
-        row = c.fetchone()
-        if row is None:
-            raise NameError('Path does not exist')
-        return path, str(row[0]), int(row[1]), int(row[2])
+        node = self.node.node_lookup(path)
+        if node is not None:
+            props = self.node.version_lookup(node, until, CLUSTER_NORMAL)
+            # TODO: Do one lookup.
+            if props is None and until is not None:
+                props = self.node.version_lookup(node, until, CLUSTER_HISTORY)
+            if props is not None:
+                return path, props[SERIAL], props[MTIME], props[SIZE]
+        raise NameError('Path does not exist')
     
     def _get_accountinfo(self, account, until=None):
         try:
@@ -731,23 +695,6 @@ class ModularBackend(BaseBackend):
         except NameError:
             self._put_version(account, user)
     
-    def _get_metadata(self, path, version):
-        sql = 'select key, value from metadata where version_id = ?'
-        c = self.con.execute(sql, (version,))
-        return dict(c.fetchall())
-    
-    def _put_metadata(self, user, path, meta, replace=False, copy_data=True):
-        """Create a new version and store metadata."""
-        
-        src_version_id, dest_version_id = self._copy_version(user, path, path, not replace, copy_data)
-        for k, v in meta.iteritems():
-            if not replace and v == '':
-                sql = 'delete from metadata where version_id = ? and key = ?'
-                self.con.execute(sql, (dest_version_id, k))
-            else:
-                sql = 'insert or replace into metadata (version_id, key, value) values (?, ?, ?)'
-                self.con.execute(sql, (dest_version_id, k, v))
-    
     def _check_policy(self, policy):
         for k in policy.keys():
             if policy[k] == '':
@@ -763,9 +710,6 @@ class ModularBackend(BaseBackend):
             else:
                 raise ValueError
     
-    def _get_policy(self, path):
-        return self.policy.policy_get(path)
-    
     def _list_limits(self, listing, marker, limit):
         start = 0
         if marker:
@@ -827,6 +771,109 @@ class ModularBackend(BaseBackend):
         sql = 'delete from versions where version_id = ?'
         self.con.execute(sql, (version,))
     
+    # Path functions.
+    
+    def _lookup_account(self, account, create=True):
+        node = self.node.node_lookup(account)
+        if node is None and create:
+            node = self.node.node_create(ROOTNODE, account)
+            self.node.version_create(node, 0, None, account, CLUSTER_NORMAL)
+        return node
+    
+    def _lookup_container(self, account, container):
+        node = self.node.node_lookup('/'.join((account, container)))
+        if node is None:
+            raise NameError('Container does not exist')
+        return node
+    
+    def _lookup_object(self, account, container, name):
+        node = self.node.node_lookup('/'.join((account, container, name)))
+        if node is None:
+            raise NameError('Object does not exist')
+        return node
+    
+    def _get_properties(self, node, until=None):
+        """Return properties until the timestamp given."""
+        
+        before = until if until is not None else inf
+        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
+        # TODO: Do one lookup.
+        if props is None and until is not None:
+            props = self.node.version_lookup(node, before, CLUSTER_HISTORY)
+        if props is None:
+            raise NameError('Path does not exist')
+        return props
+    
+    def _get_statistics(self, node, path, until=None):
+        """Return count, sum of size and latest timestamp of everything under node/path."""
+        
+        if until is None:
+            return self.node.node_statistics(node, CLUSTER_NORMAL)
+        else:
+            return self.node.path_statistics(path + '/', until, CLUSTER_DELETED)
+    
+    def _get_version(self, node, version=None):
+        if version is None:
+            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+            if props is None:
+                raise NameError('Object does not exist')
+        else:
+            props = self.node.version_get_properties(version)
+            if props is None or props[CLUSTER] == CLUSTER_DELETE:
+                raise IndexError('Version does not exist')
+        return props
+    
+    def _copy_version(self, user, src_node, dest_node, copy_meta=True, copy_data=True, src_version=None):
+        # Get source serial and size.
+        if src_version is not None:
+            src_props = self._get_version(src_node, src_version)
+            src_version_id = src_props[SERIAL]
+            size = src_props[SIZE]
+        else:
+            # Latest or create from scratch.
+            try:
+                src_props = self._get_version(src_node)
+                src_version_id = src_props[SERIAL]
+                size = src_props[SIZE]
+            except NameError:
+                src_version_id = None
+                size = 0
+        if not copy_data:
+            size = 0
+        
+        # Move the latest version at destination to CLUSTER_HISTORY and create new.
+        if src_node == dest_node and src_version is None and src_version_id is not None:
+            self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
+        else:
+            dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
+            if dest_props is not None:
+                self.node.version_recluster(dest_props[SERIAL], CLUSTER_HISTORY)
+        dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, CLUSTER_NORMAL)
+        
+        # Copy meta and data.
+        if copy_meta and src_version_id is not None:
+            self.attribute_copy(src_version_id, dest_version_id)
+        if copy_data and src_version_id is not None:
+            hashmap = self.mapper.map_retr(src_version_id)
+            self.mapper.map_stor(dest_version_id, hashmap)
+        
+        return src_version_id, dest_version_id
+    
+    def _get_metadata(self, version):
+        if version is None:
+            return {}
+        return dict(self.node.attribute_get(version))
+    
+    def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
+        """Create a new version and store metadata."""
+        
+        src_version_id, dest_version_id = self._copy_version(user, node, node, not replace, copy_data)
+        if not replace:
+            self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
+            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
+        else:
+            self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
+    
     # Access control functions.
     
     def _check_groups(self, groups):