-# Copyright 2011 GRNET S.A. All rights reserved.
+# Copyright 2011-2012 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
from dbworker import DBWorker
-from pithos.lib.filter import parse_filters
+from pithos.backends.filter import parse_filters
ROOTNODE = 0
-( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
+( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
+
+( MATCH_PREFIX, MATCH_EXACT ) = range(2)
inf = float('inf')
'node' : 1,
'hash' : 2,
'size' : 3,
- 'source' : 4,
- 'mtime' : 5,
- 'muser' : 6,
- 'uuid' : 7,
- 'cluster' : 8
+ 'type' : 4,
+ 'source' : 5,
+ 'mtime' : 6,
+ 'muser' : 7,
+ 'uuid' : 8,
+ 'checksum' : 9,
+ 'cluster' : 10
}
( node integer primary key,
parent integer default 0,
path text not null default '',
+ latest_version integer,
foreign key (parent)
references nodes(node)
on update cascade
on delete cascade ) """)
execute(""" create unique index if not exists idx_nodes_path
on nodes(path) """)
+ execute(""" create index if not exists idx_nodes_parent
+ on nodes(parent) """)
execute(""" create table if not exists policy
( node integer,
node integer,
hash text,
size integer not null default 0,
+ type text not null default '',
source integer,
mtime integer,
muser text not null default '',
uuid text not null default '',
+ checksum text not null default '',
cluster integer not null default 0,
foreign key (node)
references nodes(node)
return r[0]
return None
+ def node_lookup_bulk(self, paths):
+ """Lookup the current nodes for the given paths.
+ Return () if the path is not found.
+ """
+
+ placeholders = ','.join('?' for path in paths)
+ q = "select node from nodes where path in (%s)" % placeholders
+ self.execute(q, paths)
+ r = self.fetchall()
+ if r is not None:
+ return [row[0] for row in r]
+ return None
+
def node_get_properties(self, node):
"""Return the node's (parent, path).
Return None if the node is not found.
def node_get_versions(self, node, keys=(), propnames=_propnames):
"""Return the properties of all versions at node.
If keys is empty, return all properties in the order
- (serial, node, hash, size, source, mtime, muser, uuid, cluster).
+ (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
"""
- q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
+ q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
"from versions "
"where node = ?")
self.execute(q, (node,))
def node_purge_children(self, parent, before=inf, cluster=0):
"""Delete all versions with the specified
parent and cluster, and return
- the hashes of versions deleted.
+ the hashes and size of versions deleted.
Clears out nodes with no remaining versions.
"""
execute(q, args)
nr, size = self.fetchone()
if not nr:
- return ()
+ return (), 0
mtime = time()
self.statistics_update(parent, -nr, -size, mtime, cluster)
self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
"where node = n.node) = 0 "
"and parent = ?)")
execute(q, (parent,))
- return hashes
+ return hashes, size
def node_purge(self, node, before=inf, cluster=0):
"""Delete all versions with the specified
node and cluster, and return
- the hashes of versions deleted.
+ the hashes and size of versions deleted.
Clears out the node if it has no remaining versions.
"""
execute(q, args)
nr, size = self.fetchone()
if not nr:
- return ()
+ return (), 0
mtime = time()
self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
"where node = n.node) = 0 "
"and node = ?)")
execute(q, (node,))
- return hashes
+ return hashes, size
def node_remove(self, node):
"""Remove the node specified.
parent, path = props
# The latest version.
- q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
- "from versions "
- "where serial = (select max(serial) "
- "from versions "
- "where node = ? and mtime < ?) "
+ q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
+ "from versions v "
+ "where serial = %s "
"and cluster != ?")
- execute(q, (node, before, except_cluster))
+ subq, args = self._construct_latest_version_subquery(node=node, before=before)
+ execute(q % subq, args + [except_cluster])
props = fetchone()
if props is None:
return None
# First level, just under node (get population).
q = ("select count(serial), sum(size), max(mtime) "
"from versions v "
- "where serial = (select max(serial) "
- "from versions "
- "where node = v.node and mtime < ?) "
+ "where serial = %s "
"and cluster != ? "
"and node in (select node "
"from nodes "
"where parent = ?)")
- execute(q, (before, except_cluster, node))
+ subq, args = self._construct_latest_version_subquery(node=None, before=before)
+ execute(q % subq, args + [except_cluster, node])
r = fetchone()
if r is None:
return None
return (0, 0, mtime)
# All children (get size and mtime).
- # XXX: This is why the full path is stored.
+ # This is why the full path is stored.
q = ("select count(serial), sum(size), max(mtime) "
"from versions v "
- "where serial = (select max(serial) "
- "from versions "
- "where node = v.node and mtime < ?) "
+ "where serial = %s "
"and cluster != ? "
"and node in (select node "
"from nodes "
"where path like ? escape '\\')")
- execute(q, (before, except_cluster, self.escape_like(path) + '%'))
+ subq, args = self._construct_latest_version_subquery(node=None, before=before)
+ execute(q % subq, args + [except_cluster, self.escape_like(path) + '%'])
r = fetchone()
if r is None:
return None
mtime = max(mtime, r[2])
return (count, size, mtime)
- def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
+ def nodes_set_latest_version(self, node, serial=None):
+ if not serial:
+ q = ("select serial from versions where node = ? order_by mtime desc limit 1")
+ self.execute(q, (node,))
+ r = self.fetchone()
+ serial = r[0] of r else None
+ q = ("update nodes set latest_version = ? where node = ?")
+ props = (serial, node)
+ self.execute(q, props)
+
+ def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
"""Create a new version from the given properties.
Return the (serial, mtime) of the new version.
"""
- q = ("insert into versions (node, hash, size, source, mtime, muser, uuid, cluster) "
- "values (?, ?, ?, ?, ?, ?, ?, ?)")
+ q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
+ "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
mtime = time()
- props = (node, hash, size, source, mtime, muser, uuid, cluster)
+ props = (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
serial = self.execute(q, props).lastrowid
self.statistics_update_ancestors(node, 1, size, mtime, cluster)
+
+ self.nodes_set_latest_version(node, serial)
+
return serial, mtime
- def version_lookup(self, node, before=inf, cluster=0):
+ def version_lookup(self, node, before=inf, cluster=0, all_props=True):
"""Lookup the current version of the given node.
Return a list with its properties:
- (serial, node, hash, size, source, mtime, muser, uuid, cluster)
+ (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
or None if the current version is not found in the given cluster.
"""
- q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
- "from versions "
- "where serial = (select max(serial) "
- "from versions "
- "where node = ? and mtime < ?) "
+ q = ("select %s "
+ "from versions v "
+ "where serial = %s "
"and cluster = ?")
- self.execute(q, (node, before, cluster))
+ subq, args = self._construct_latest_version_subquery(node=node, before=before)
+ if not all_props:
+ q = q % ("serial", subq)
+ else:
+ q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
+
+ self.execute(q, args + [cluster])
props = self.fetchone()
if props is not None:
return props
return None
+
+ def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
+ """Lookup the current versions of the given nodes.
+ Return a list with their properties:
+ (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
+ """
+
+ if not nodes:
+ return ()
+ q = ("select %s "
+ "from versions "
+ "where serial in %s "
+ "and cluster = ? %s")
+ subq, args = self._construct_latest_versions_subquery(nodes=nodes, before = before)
+ if not all_props:
+ q = q % ("serial", subq, '')
+ else:
+ q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq, 'order by node')
+
+ args += [cluster]
+ self.execute(q, args)
+ return self.fetchall()
def version_get_properties(self, serial, keys=(), propnames=_propnames):
"""Return a sequence of values for the properties of
the version specified by serial and the keys, in the order given.
If keys is empty, return all properties in the order
- (serial, node, hash, size, source, mtime, muser, uuid, cluster).
+ (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
"""
- q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
+ q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
"from versions "
"where serial = ?")
self.execute(q, (serial,))
return r
return [r[propnames[k]] for k in keys if k in propnames]
+ def version_put_property(self, serial, key, value):
+ """Set value for the property of version specified by key."""
+
+ if key not in _propnames:
+ return
+ q = "update versions set %s = ? where serial = ?" % key
+ self.execute(q, (value, serial))
+
def version_recluster(self, serial, cluster):
"""Move the version into another cluster."""
q = "delete from versions where serial = ?"
self.execute(q, (serial,))
- return hash
+
+ self.nodes_set_latest_version(node)
+
+ return hash, size
def attribute_get(self, serial, domain, keys=()):
"""Return a list of (key, value) pairs of the version specified by serial.
if not pathq:
return None, None
- subq = " and ("
- subq += ' or '.join(("n.path like ? escape '\\'" for x in pathq))
- subq += ")"
- args = tuple([self.escape_like(x) + '%' for x in pathq])
+ subqlist = []
+ args = []
+ for path, match in pathq:
+ if match == MATCH_PREFIX:
+ subqlist.append("n.path like ? escape '\\'")
+ args.append(self.escape_like(path) + '%')
+ elif match == MATCH_EXACT:
+ subqlist.append("n.path = ?")
+ args.append(path)
+
+ subq = ' and (' + ' or '.join(subqlist) + ')'
+ args = tuple(args)
return subq, args
return subq, args
+ def _construct_versions_nodes_latest_version_subquery(self, before=inf):
+ if before == inf:
+ q = ("n.latest_version ")
+ args = []
+ else:
+ q = ("(select max(serial) "
+ "from versions "
+ "where node = v.node and mtime < ?) ")
+ args = [before]
+ return q, args
+
+ def _construct_latest_version_subquery(self, node=None, before=inf):
+ where_cond = "node = v.node"
+ args = []
+ if node:
+ where_cond = "node = ? "
+ args = [node]
+
+ if before == inf:
+ q = ("(select latest_version "
+ "from nodes "
+ "where %s) ")
+ else:
+ q = ("(select max(serial) "
+ "from versions "
+ "where %s and mtime < ?) ")
+ args += [before]
+ return q % where_cond, args
+
+ def _construct_latest_versions_subquery(self, nodes=(), before=inf):
+ where_cond = ""
+ args = []
+ if nodes:
+ where_cond = "node in (%s) " % ','.join('?' for node in nodes)
+ args = nodes
+
+ if before == inf:
+ q = ("(select latest_version "
+ "from nodes "
+ "where %s ) ")
+ else:
+ q = ("(select max(serial) "
+ "from versions "
+ "where %s and mtime < ? group by node) ")
+ args += [before]
+ return q % where_cond, args
+
def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
"""Return a list with all keys pairs defined
for all latest versions under parent that
# TODO: Use another table to store before=inf results.
q = ("select distinct a.key "
"from attributes a, versions v, nodes n "
- "where v.serial = (select max(serial) "
- "from versions "
- "where node = v.node and mtime < ?) "
+ "where v.serial = %s "
"and v.cluster != ? "
"and v.node in (select node "
"from nodes "
"and a.serial = v.serial "
"and a.domain = ? "
"and n.node = v.node")
- args = (before, except_cluster, parent, domain)
+ subq, subargs = self._construct_latest_version_subquery(node=None, before=before)
+ args = subargs + [except_cluster, parent, domain]
+ q = q % subq
subq, subargs = self._construct_paths(pathq)
if subq is not None:
q += subq
def latest_version_list(self, parent, prefix='', delimiter=None,
start='', limit=10000, before=inf,
- except_cluster=0, pathq=[], domain=None, filterq=[], sizeq=None):
+ except_cluster=0, pathq=[], domain=None,
+ filterq=[], sizeq=None, all_props=False):
"""Return a (list of (path, serial) tuples, list of common prefixes)
for the current versions of the paths with the given parent,
matching the following criteria.
will always match.
Limit applies to the first list of tuples returned.
+
+ If all_props is True, return all properties after path, not just serial.
"""
execute = self.execute
start = strprevling(prefix)
nextling = strnextling(prefix)
- q = ("select distinct n.path, v.serial "
+ q = ("select distinct n.path, %s "
"from versions v, nodes n "
- "where v.serial = (select max(serial) "
- "from versions "
- "where node = v.node and mtime < ?) "
+ "where v.serial = %s "
"and v.cluster != ? "
"and v.node in (select node "
"from nodes "
"where parent = ?) "
"and n.node = v.node "
"and n.path > ? and n.path < ?")
- args = [before, except_cluster, parent, start, nextling]
+ subq, args = self._construct_versions_nodes_latest_version_subquery(before)
+ if not all_props:
+ q = q % ("v.serial", subq)
+ else:
+ q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
+ args += [except_cluster, parent, start, nextling]
+ start_index = len(args) - 2
subq, subargs = self._construct_paths(pathq)
if subq is not None:
props = fetchone()
if props is None:
break
- path, serial = props
+ path = props[0]
+ serial = props[1]
idx = path.find(delimiter, pfz)
if idx < 0:
if count >= limit:
break
- args[3] = strnextling(pf) # New start.
+ args[start_index] = strnextling(pf) # New start.
execute(q, args)
return matches, prefixes