In version_remove recompute nodes latest version
[pithos] / snf-pithos-backend / pithos / backends / lib / sqlite / node.py
index 752c3c0..0c69a31 100644 (file)
@@ -35,12 +35,14 @@ from time import time
 
 from dbworker import DBWorker
 
-from pithos.lib.filter import parse_filters
+from pithos.backends.filter import parse_filters
 
 
 ROOTNODE  = 0
 
-( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
+( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
+
+( MATCH_PREFIX, MATCH_EXACT ) = range(2)
 
 inf = float('inf')
 
@@ -85,11 +87,13 @@ _propnames = {
     'node'      : 1,
     'hash'      : 2,
     'size'      : 3,
-    'source'    : 4,
-    'mtime'     : 5,
-    'muser'     : 6,
-    'uuid'      : 7,
-    'cluster'   : 8
+    'type'      : 4,
+    'source'    : 5,
+    'mtime'     : 6,
+    'muser'     : 7,
+    'uuid'      : 8,
+    'checksum'  : 9,
+    'cluster'   : 10
 }
 
 
@@ -111,12 +115,15 @@ class Node(DBWorker):
                           ( node       integer primary key,
                             parent     integer default 0,
                             path       text    not null default '',
+                            latest_version     integer,
                             foreign key (parent)
                             references nodes(node)
                             on update cascade
                             on delete cascade ) """)
         execute(""" create unique index if not exists idx_nodes_path
                     on nodes(path) """)
+        execute(""" create index if not exists idx_nodes_parent
+                    on nodes(parent) """)
         
         execute(""" create table if not exists policy
                           ( node   integer,
@@ -145,10 +152,12 @@ class Node(DBWorker):
                             node       integer,
                             hash       text,
                             size       integer not null default 0,
+                            type       text    not null default '',
                             source     integer,
                             mtime      integer,
                             muser      text    not null default '',
                             uuid       text    not null default '',
+                            checksum   text    not null default '',
                             cluster    integer not null default 0,
                             foreign key (node)
                             references nodes(node)
@@ -195,6 +204,19 @@ class Node(DBWorker):
             return r[0]
         return None
     
+    def node_lookup_bulk(self, paths):
+       """Lookup the current nodes for the given paths.
+           Return () if the path is not found.
+        """
+        
+        placeholders = ','.join('?' for path in paths)
+        q = "select node from nodes where path in (%s)" % placeholders
+        self.execute(q, paths)
+        r = self.fetchall()
+        if r is not None:
+               return [row[0] for row in r]
+        return None
+    
     def node_get_properties(self, node):
         """Return the node's (parent, path).
            Return None if the node is not found.
@@ -207,10 +229,10 @@ class Node(DBWorker):
     def node_get_versions(self, node, keys=(), propnames=_propnames):
         """Return the properties of all versions at node.
            If keys is empty, return all properties in the order
-           (serial, node, hash, size, source, mtime, muser, uuid, cluster).
+           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
         """
         
-        q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
+        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
              "from versions "
              "where node = ?")
         self.execute(q, (node,))
@@ -235,7 +257,7 @@ class Node(DBWorker):
     def node_purge_children(self, parent, before=inf, cluster=0):
         """Delete all versions with the specified
            parent and cluster, and return
-           the hashes of versions deleted.
+           the hashes and size of versions deleted.
            Clears out nodes with no remaining versions.
         """
         
@@ -250,7 +272,7 @@ class Node(DBWorker):
         execute(q, args)
         nr, size = self.fetchone()
         if not nr:
-            return ()
+            return (), 0
         mtime = time()
         self.statistics_update(parent, -nr, -size, mtime, cluster)
         self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
@@ -277,12 +299,12 @@ class Node(DBWorker):
                                    "where node = n.node) = 0 "
                             "and parent = ?)")
         execute(q, (parent,))
-        return hashes
+        return hashes, size
     
     def node_purge(self, node, before=inf, cluster=0):
         """Delete all versions with the specified
            node and cluster, and return
-           the hashes of versions deleted.
+           the hashes and size of versions deleted.
            Clears out the node if it has no remaining versions.
         """
         
@@ -295,7 +317,7 @@ class Node(DBWorker):
         execute(q, args)
         nr, size = self.fetchone()
         if not nr:
-            return ()
+            return (), 0
         mtime = time()
         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
         
@@ -317,7 +339,7 @@ class Node(DBWorker):
                                    "where node = n.node) = 0 "
                             "and node = ?)")
         execute(q, (node,))
-        return hashes
+        return hashes, size
     
     def node_remove(self, node):
         """Remove the node specified.
@@ -413,13 +435,12 @@ class Node(DBWorker):
         parent, path = props
         
         # The latest version.
-        q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
-             "from versions "
-             "where serial = (select max(serial) "
-                             "from versions "
-                             "where node = ? and mtime < ?) "
+        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
+             "from versions v "
+             "where serial = %s "
              "and cluster != ?")
-        execute(q, (node, before, except_cluster))
+        subq, args = self._construct_latest_version_subquery(node=node, before=before)
+        execute(q % subq, args + [except_cluster])
         props = fetchone()
         if props is None:
             return None
@@ -428,14 +449,13 @@ class Node(DBWorker):
         # First level, just under node (get population).
         q = ("select count(serial), sum(size), max(mtime) "
              "from versions v "
-             "where serial = (select max(serial) "
-                             "from versions "
-                             "where node = v.node and mtime < ?) "
+             "where serial = %s "
              "and cluster != ? "
              "and node in (select node "
                           "from nodes "
                           "where parent = ?)")
-        execute(q, (before, except_cluster, node))
+        subq, args = self._construct_latest_version_subquery(node=None, before=before)
+        execute(q % subq, args + [except_cluster, node])
         r = fetchone()
         if r is None:
             return None
@@ -445,17 +465,16 @@ class Node(DBWorker):
             return (0, 0, mtime)
         
         # All children (get size and mtime).
-        # XXX: This is why the full path is stored.
+        # This is why the full path is stored.
         q = ("select count(serial), sum(size), max(mtime) "
              "from versions v "
-             "where serial = (select max(serial) "
-                             "from versions "
-                             "where node = v.node and mtime < ?) "
+             "where serial = %s "
              "and cluster != ? "
              "and node in (select node "
                           "from nodes "
                           "where path like ? escape '\\')")
-        execute(q, (before, except_cluster, self.escape_like(path) + '%'))
+        subq, args = self._construct_latest_version_subquery(node=None, before=before)
+        execute(q % subq, args + [except_cluster, self.escape_like(path) + '%'])
         r = fetchone()
         if r is None:
             return None
@@ -463,46 +482,85 @@ class Node(DBWorker):
         mtime = max(mtime, r[2])
         return (count, size, mtime)
     
-    def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
+    def nodes_set_latest_version(self, node, serial=None):
+       if not serial:
+               q = ("select serial from versions where node = ? order_by mtime desc limit 1")
+               self.execute(q, (node,))
+               r = self.fetchone()
+                       serial = r[0] of r else None
+       q = ("update nodes set latest_version = ? where node = ?")
+        props = (serial, node)
+        self.execute(q, props)
+    
+    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
         """Create a new version from the given properties.
            Return the (serial, mtime) of the new version.
         """
         
-        q = ("insert into versions (node, hash, size, source, mtime, muser, uuid, cluster) "
-             "values (?, ?, ?, ?, ?, ?, ?, ?)")
+        q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
+             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
         mtime = time()
-        props = (node, hash, size, source, mtime, muser, uuid, cluster)
+        props = (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
         serial = self.execute(q, props).lastrowid
         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
+        
+        self.nodes_set_latest_version(node, serial)
+        
         return serial, mtime
     
-    def version_lookup(self, node, before=inf, cluster=0):
+    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
         """Lookup the current version of the given node.
            Return a list with its properties:
-           (serial, node, hash, size, source, mtime, muser, uuid, cluster)
+           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
            or None if the current version is not found in the given cluster.
         """
         
-        q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
-             "from versions "
-             "where serial = (select max(serial) "
-                             "from versions "
-                             "where node = ? and mtime < ?) "
+        q = ("select %s "
+             "from versions v "
+             "where serial = %s "
              "and cluster = ?")
-        self.execute(q, (node, before, cluster))
+        subq, args = self._construct_latest_version_subquery(node=node, before=before)
+        if not all_props:
+            q = q % ("serial", subq)
+        else:
+            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
+        
+        self.execute(q, args + [cluster])
         props = self.fetchone()
         if props is not None:
             return props
         return None
+
+    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
+        """Lookup the current versions of the given nodes.
+           Return a list with their properties:
+           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
+        """
+        
+        if not nodes:
+               return ()
+        q = ("select %s "
+             "from versions "
+             "where serial in %s "
+             "and cluster = ? %s")
+        subq, args = self._construct_latest_versions_subquery(nodes=nodes, before = before)
+        if not all_props:
+            q = q % ("serial", subq, '')
+        else:
+            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster",  subq, 'order by node')
+        
+        args += [cluster]
+        self.execute(q, args)
+        return self.fetchall()
     
     def version_get_properties(self, serial, keys=(), propnames=_propnames):
         """Return a sequence of values for the properties of
            the version specified by serial and the keys, in the order given.
            If keys is empty, return all properties in the order
-           (serial, node, hash, size, source, mtime, muser, uuid, cluster).
+           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
         """
         
-        q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
+        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
              "from versions "
              "where serial = ?")
         self.execute(q, (serial,))
@@ -514,6 +572,14 @@ class Node(DBWorker):
             return r
         return [r[propnames[k]] for k in keys if k in propnames]
     
+    def version_put_property(self, serial, key, value):
+        """Set value for the property of version specified by key."""
+        
+        if key not in _propnames:
+            return
+        q = "update versions set %s = ? where serial = ?" % key
+        self.execute(q, (value, serial))
+    
     def version_recluster(self, serial, cluster):
         """Move the version into another cluster."""
         
@@ -549,7 +615,10 @@ class Node(DBWorker):
         
         q = "delete from versions where serial = ?"
         self.execute(q, (serial,))
-        return hash
+        
+        self.nodes_set_latest_version(node)
+        
+        return hash, size
     
     def attribute_get(self, serial, domain, keys=()):
         """Return a list of (key, value) pairs of the version specified by serial.
@@ -640,10 +709,18 @@ class Node(DBWorker):
         if not pathq:
             return None, None
         
-        subq = " and ("
-        subq += ' or '.join(("n.path like ? escape '\\'" for x in pathq))
-        subq += ")"
-        args = tuple([self.escape_like(x) + '%' for x in pathq])
+        subqlist = []
+        args = []
+        for path, match in pathq:
+            if match == MATCH_PREFIX:
+                subqlist.append("n.path like ? escape '\\'")
+                args.append(self.escape_like(path) + '%')
+            elif match == MATCH_EXACT:
+                subqlist.append("n.path = ?")
+                args.append(path)
+        
+        subq = ' and (' + ' or '.join(subqlist) + ')'
+        args = tuple(args)
         
         return subq, args
     
@@ -662,6 +739,53 @@ class Node(DBWorker):
         
         return subq, args
     
+    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
+        if before == inf:
+            q = ("n.latest_version ")
+            args = []
+        else:
+            q = ("(select max(serial) "
+                                  "from versions "
+                                  "where node = v.node and mtime < ?) ")
+            args = [before]
+        return q, args
+    
+    def _construct_latest_version_subquery(self, node=None, before=inf):
+        where_cond = "node = v.node"
+        args = []
+        if node:
+            where_cond = "node = ? "
+            args = [node]
+        
+        if before == inf:
+            q = ("(select latest_version "
+                   "from nodes "
+                   "where %s) ")
+        else:
+            q = ("(select max(serial) "
+                   "from versions "
+                   "where %s and mtime < ?) ")
+            args += [before]
+        return q % where_cond, args
+    
+    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
+        where_cond = ""
+        args = []
+        if nodes:
+            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
+            args = nodes
+        
+        if before == inf:
+            q = ("(select latest_version "
+                   "from nodes "
+                   "where %s ) ")
+        else:
+            q = ("(select max(serial) "
+                                "from versions "
+                                "where %s and mtime < ? group by node) ")
+            args += [before]
+        return q % where_cond, args
+    
     def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
         """Return a list with all keys pairs defined
            for all latest versions under parent that
@@ -671,9 +795,7 @@ class Node(DBWorker):
         # TODO: Use another table to store before=inf results.
         q = ("select distinct a.key "
              "from attributes a, versions v, nodes n "
-             "where v.serial = (select max(serial) "
-                               "from versions "
-                               "where node = v.node and mtime < ?) "
+             "where v.serial = %s "
              "and v.cluster != ? "
              "and v.node in (select node "
                             "from nodes "
@@ -681,7 +803,9 @@ class Node(DBWorker):
              "and a.serial = v.serial "
              "and a.domain = ? "
              "and n.node = v.node")
-        args = (before, except_cluster, parent, domain)
+        subq, subargs = self._construct_latest_version_subquery(node=None, before=before)
+        args = subargs + [except_cluster, parent, domain]
+        q = q % subq
         subq, subargs = self._construct_paths(pathq)
         if subq is not None:
             q += subq
@@ -691,7 +815,8 @@ class Node(DBWorker):
     
     def latest_version_list(self, parent, prefix='', delimiter=None,
                             start='', limit=10000, before=inf,
-                            except_cluster=0, pathq=[], domain=None, filterq=[], sizeq=None):
+                            except_cluster=0, pathq=[], domain=None,
+                            filterq=[], sizeq=None, all_props=False):
         """Return a (list of (path, serial) tuples, list of common prefixes)
            for the current versions of the paths with the given parent,
            matching the following criteria.
@@ -738,6 +863,8 @@ class Node(DBWorker):
            will always match.
            
            Limit applies to the first list of tuples returned.
+           
+           If all_props is True, return all properties after path, not just serial.
         """
         
         execute = self.execute
@@ -746,18 +873,22 @@ class Node(DBWorker):
             start = strprevling(prefix)
         nextling = strnextling(prefix)
         
-        q = ("select distinct n.path, v.serial "
+        q = ("select distinct n.path, %s "
              "from versions v, nodes n "
-             "where v.serial = (select max(serial) "
-                               "from versions "
-                               "where node = v.node and mtime < ?) "
+             "where v.serial = %s "
              "and v.cluster != ? "
              "and v.node in (select node "
                             "from nodes "
                             "where parent = ?) "
              "and n.node = v.node "
              "and n.path > ? and n.path < ?")
-        args = [before, except_cluster, parent, start, nextling]
+        subq, args = self._construct_versions_nodes_latest_version_subquery(before)
+        if not all_props:
+            q = q % ("v.serial", subq)
+        else:
+            q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
+        args += [except_cluster, parent, start, nextling]
+        start_index = len(args) - 2
         
         subq, subargs = self._construct_paths(pathq)
         if subq is not None:
@@ -796,7 +927,8 @@ class Node(DBWorker):
             props = fetchone()
             if props is None:
                 break
-            path, serial = props
+            path = props[0]
+            serial = props[1]
             idx = path.find(delimiter, pfz)
             
             if idx < 0:
@@ -815,7 +947,7 @@ class Node(DBWorker):
             if count >= limit: 
                 break
             
-            args[3] = strnextling(pf) # New start.
+            args[start_index] = strnextling(pf) # New start.
             execute(q, args)
         
         return matches, prefixes