from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
from sqlalchemy.types import Text
from sqlalchemy.schema import Index, Sequence
-from sqlalchemy.sql import func, and_, or_, null, select, bindparam, text
+from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.engine.reflection import Inspector
from dbworker import DBWorker
+from pithos.lib.filter import parse_filters
+
+
ROOTNODE = 0
-( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
+( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
inf = float('inf')
s += unichr(c-1) + unichr(0xffff)
return s
-
_propnames = {
'serial' : 0,
'node' : 1,
'source' : 4,
'mtime' : 5,
'muser' : 6,
- 'cluster' : 7,
+ 'uuid' : 7,
+ 'cluster' : 8
}
columns.append(Column('source', Integer))
columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
columns.append(Column('muser', String(255), nullable=False, default=''))
+ columns.append(Column('uuid', String(64), nullable=False, default=''))
columns.append(Column('cluster', Integer, nullable=False, default=0))
self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
- Index('idx_versions_node_mtime', self.versions.c.node,
- self.versions.c.mtime)
+ Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
+ Index('idx_versions_node_uuid', self.versions.c.uuid)
#create attributes table
columns = []
ondelete='CASCADE',
onupdate='CASCADE'),
primary_key=True))
+ columns.append(Column('domain', String(255), primary_key=True))
columns.append(Column('key', String(255), primary_key=True))
columns.append(Column('value', String(255)))
self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
Return None if the path is not found.
"""
- s = select([self.nodes.c.node], self.nodes.c.path.like(path))
+ # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
+ s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
r = self.conn.execute(s)
row = r.fetchone()
r.close()
def node_get_versions(self, node, keys=(), propnames=_propnames):
"""Return the properties of all versions at node.
If keys is empty, return all properties in the order
- (serial, node, size, source, mtime, muser, cluster).
+ (serial, node, hash, size, source, mtime, muser, uuid, cluster).
"""
s = select([self.versions.c.serial,
self.versions.c.source,
self.versions.c.mtime,
self.versions.c.muser,
+ self.versions.c.uuid,
self.versions.c.cluster], self.versions.c.node == node)
s = s.order_by(self.versions.c.serial)
r = self.conn.execute(s)
self.statistics_update(parent, -nr, size, mtime, cluster)
self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
- s = select([self.versions.c.serial])
+ s = select([self.versions.c.hash])
s = s.where(where_clause)
r = self.conn.execute(s)
- hashes = [row[HASH] for row in r.fetchall()]
+ hashes = [row[0] for row in r.fetchall()]
r.close()
#delete versions
mtime = time()
self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
- s = select([self.versions.c.serial])
+ s = select([self.versions.c.hash])
s = s.where(where_clause)
r = self.conn.execute(s)
- hashes = [r[HASH] for r in r.fetchall()]
+ hashes = [r[0] for r in r.fetchall()]
r.close()
#delete versions
self.versions.c.source,
self.versions.c.mtime,
self.versions.c.muser,
+ self.versions.c.uuid,
self.versions.c.cluster])
filtered = select([func.max(self.versions.c.serial)],
self.versions.c.node == node)
self.versions.c.node == v.c.node)
if before != inf:
c1 = c1.where(self.versions.c.mtime < before)
- c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
+ c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
s = s.where(and_(v.c.serial == c1,
v.c.cluster != except_cluster,
v.c.node.in_(c2)))
mtime = max(mtime, r[2])
return (count, size, mtime)
- def version_create(self, node, hash, size, source, muser, cluster=0):
+ def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
"""Create a new version from the given properties.
Return the (serial, mtime) of the new version.
"""
mtime = time()
s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
- mtime=mtime, muser=muser, cluster=cluster)
+ mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
serial = self.conn.execute(s).inserted_primary_key[0]
self.statistics_update_ancestors(node, 1, size, mtime, cluster)
return serial, mtime
def version_lookup(self, node, before=inf, cluster=0):
"""Lookup the current version of the given node.
Return a list with its properties:
- (serial, node, hash, size, source, mtime, muser, cluster)
+ (serial, node, hash, size, source, mtime, muser, uuid, cluster)
or None if the current version is not found in the given cluster.
"""
v = self.versions.alias('v')
- s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
- v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
+ s = select([v.c.serial, v.c.node, v.c.hash,
+ v.c.size, v.c.source, v.c.mtime,
+ v.c.muser, v.c.uuid, v.c.cluster])
c = select([func.max(self.versions.c.serial)],
self.versions.c.node == node)
if before != inf:
"""Return a sequence of values for the properties of
the version specified by serial and the keys, in the order given.
If keys is empty, return all properties in the order
- (serial, node, hash, size, source, mtime, muser, cluster).
+ (serial, node, hash, size, source, mtime, muser, uuid, cluster).
"""
v = self.versions.alias()
- s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
- v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
+ s = select([v.c.serial, v.c.node, v.c.hash,
+ v.c.size, v.c.source, v.c.mtime,
+ v.c.muser, v.c.uuid, v.c.cluster], v.c.serial == serial)
rp = self.conn.execute(s)
r = rp.fetchone()
rp.close()
def version_remove(self, serial):
"""Remove the serial specified."""
- props = self.node_get_properties(serial)
+ props = self.version_get_properties(serial)
if not props:
return
node = props[NODE]
+ hash = props[HASH]
size = props[SIZE]
cluster = props[CLUSTER]
s = self.versions.delete().where(self.versions.c.serial == serial)
self.conn.execute(s).close()
- return True
+ return hash
- def attribute_get(self, serial, keys=()):
+ def attribute_get(self, serial, domain, keys=()):
"""Return a list of (key, value) pairs of the version specified by serial.
If keys is empty, return all attributes.
Othwerise, return only those specified.
attrs = self.attributes.alias()
s = select([attrs.c.key, attrs.c.value])
s = s.where(and_(attrs.c.key.in_(keys),
- attrs.c.serial == serial))
+ attrs.c.serial == serial,
+ attrs.c.domain == domain))
else:
attrs = self.attributes.alias()
s = select([attrs.c.key, attrs.c.value])
- s = s.where(attrs.c.serial == serial)
+ s = s.where(and_(attrs.c.serial == serial,
+ attrs.c.domain == domain))
r = self.conn.execute(s)
l = r.fetchall()
r.close()
return l
- def attribute_set(self, serial, items):
+ def attribute_set(self, serial, domain, items):
"""Set the attributes of the version specified by serial.
Receive attributes as an iterable of (key, value) pairs.
"""
for k, v in items:
s = self.attributes.update()
s = s.where(and_(self.attributes.c.serial == serial,
+ self.attributes.c.domain == domain,
self.attributes.c.key == k))
s = s.values(value = v)
rp = self.conn.execute(s)
rp.close()
if rp.rowcount == 0:
s = self.attributes.insert()
- s = s.values(serial=serial, key=k, value=v)
+ s = s.values(serial=serial, domain=domain, key=k, value=v)
self.conn.execute(s).close()
- def attribute_del(self, serial, keys=()):
+ def attribute_del(self, serial, domain, keys=()):
"""Delete attributes of the version specified by serial.
If keys is empty, delete all attributes.
Otherwise delete those specified.
for key in keys:
s = self.attributes.delete()
s = s.where(and_(self.attributes.c.serial == serial,
+ self.attributes.c.domain == domain,
self.attributes.c.key == key))
self.conn.execute(s).close()
else:
s = self.attributes.delete()
- s = s.where(self.attributes.c.serial == serial)
+ s = s.where(and_(self.attributes.c.serial == serial,
+ self.attributes.c.domain == domain))
self.conn.execute(s).close()
def attribute_copy(self, source, dest):
- s = select([dest, self.attributes.c.key, self.attributes.c.value],
+ s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
self.attributes.c.serial == source)
rp = self.conn.execute(s)
attributes = rp.fetchall()
rp.close()
- for dest, k, v in attributes:
+ for dest, domain, k, v in attributes:
#insert or replace
s = self.attributes.update().where(and_(
self.attributes.c.serial == dest,
+ self.attributes.c.domain == domain,
self.attributes.c.key == k))
rp = self.conn.execute(s, value=v)
rp.close()
if rp.rowcount == 0:
s = self.attributes.insert()
- values = {'serial':dest, 'key':k, 'value':v}
+ values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
self.conn.execute(s, values).close()
- def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
+ def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
"""Return a list with all keys pairs defined
for all latest versions under parent that
do not belong to the cluster.
s = s.where(v.c.node.in_(select([self.nodes.c.node],
self.nodes.c.parent == parent)))
s = s.where(a.c.serial == v.c.serial)
+ s = s.where(a.c.domain == domain)
s = s.where(n.c.node == v.c.node)
conj = []
for x in pathq:
- conj.append(n.c.path.like(x + '%'))
+ conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
if conj:
s = s.where(or_(*conj))
rp = self.conn.execute(s)
def latest_version_list(self, parent, prefix='', delimiter=None,
start='', limit=10000, before=inf,
- except_cluster=0, pathq=[], filterq=None):
+ except_cluster=0, pathq=[], domain=None, filterq=[]):
"""Return a (list of (path, serial) tuples, list of common prefixes)
for the current versions of the paths with the given parent,
matching the following criteria.
start = strprevling(prefix)
nextling = strnextling(prefix)
- a = self.attributes.alias('a')
v = self.versions.alias('v')
n = self.nodes.alias('n')
s = select([n.c.path, v.c.serial]).distinct()
s = s.where(v.c.cluster != except_cluster)
s = s.where(v.c.node.in_(select([self.nodes.c.node],
self.nodes.c.parent == parent)))
- if filterq:
- s = s.where(a.c.serial == v.c.serial)
s = s.where(n.c.node == v.c.node)
s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
conj = []
for x in pathq:
- conj.append(n.c.path.like(x + '%'))
+ conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
if conj:
s = s.where(or_(*conj))
- if filterq:
- s = s.where(a.c.key.in_(filterq.split(',')))
+ if domain and filterq:
+ a = self.attributes.alias('a')
+ included, excluded, opers = parse_filters(filterq)
+ if included:
+ subs = select([1])
+ subs = subs.where(a.c.serial == v.c.serial).correlate(v)
+ subs = subs.where(a.c.domain == domain)
+ subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
+ s = s.where(exists(subs))
+ if excluded:
+ subs = select([1])
+ subs = subs.where(a.c.serial == v.c.serial).correlate(v)
+ subs = subs.where(a.c.domain == domain)
+ subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
+ s = s.where(not_(exists(subs)))
+ if opers:
+ subs = select([1])
+ subs = subs.where(a.c.serial == v.c.serial).correlate(v)
+ subs = subs.where(a.c.domain == domain)
+ subs = subs.where(and_(*[and_(a.c.key.op('=')(k), a.c.value.op(o)(v)) for k, o, v in opers]))
+ s = s.where(exists(subs))
s = s.order_by(n.c.path)
rp.close()
return matches, prefixes
+
+ def latest_uuid(self, uuid):
+ """Return a (path, serial) tuple, for the latest version of the given uuid."""
+
+ v = self.versions.alias('v')
+ n = self.nodes.alias('n')
+ s = select([n.c.path, v.c.serial])
+ filtered = select([func.max(self.versions.c.serial)])
+ s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
+ s = s.where(n.c.node == v.c.node)
+
+ r = self.conn.execute(s)
+ l = r.fetchone()
+ r.close()
+ return l