1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
42 from dbworker import DBWorker
44 from pithos.lib.filter import parse_filters
49 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
54 def strnextling(prefix):
55 """Return the first unicode string
56 greater than but not starting with given prefix.
57 strnextling('hello') -> 'hellp'
60 ## all strings start with the null string,
61 ## therefore we have to approximate strnextling('')
62 ## with the last unicode character supported by python
63 ## 0x10ffff for wide (32-bit unicode) python builds
64 ## 0x00ffff for narrow (16-bit unicode) python builds
65 ## We will not autodetect. 0xffff is safe enough.
74 def strprevling(prefix):
75 """Return an approximation of the last unicode string
76 less than but not starting with given prefix.
77 strprevling(u'hello') -> u'helln\\xffff'
80 ## There is no prevling for the null string
85 s += unichr(c-1) + unichr(0xffff)
101 class Node(DBWorker):
102 """Nodes store path organization and have multiple versions.
103 Versions store object history and have multiple attributes.
104 Attributes store metadata.
107 # TODO: Provide an interface for included and excluded clusters.
109 def __init__(self, **params):
110 DBWorker.__init__(self, **params)
111 metadata = MetaData()
115 columns.append(Column('node', Integer, primary_key=True))
116 columns.append(Column('parent', Integer,
117 ForeignKey('nodes.node',
120 autoincrement=False))
122 columns.append(Column('path', String(path_length), default='', nullable=False))
123 self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
127 columns.append(Column('node', Integer,
128 ForeignKey('nodes.node',
132 columns.append(Column('key', String(255), primary_key=True))
133 columns.append(Column('value', String(255)))
134 self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
136 #create statistics table
138 columns.append(Column('node', Integer,
139 ForeignKey('nodes.node',
143 columns.append(Column('population', Integer, nullable=False, default=0))
144 columns.append(Column('size', BigInteger, nullable=False, default=0))
145 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
146 columns.append(Column('cluster', Integer, nullable=False, default=0,
147 primary_key=True, autoincrement=False))
148 self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
150 #create versions table
152 columns.append(Column('serial', Integer, primary_key=True))
153 columns.append(Column('node', Integer,
154 ForeignKey('nodes.node',
156 onupdate='CASCADE')))
157 columns.append(Column('hash', String(255)))
158 columns.append(Column('size', BigInteger, nullable=False, default=0))
159 columns.append(Column('source', Integer))
160 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
161 columns.append(Column('muser', String(255), nullable=False, default=''))
162 columns.append(Column('uuid', String(64), nullable=False, default=''))
163 columns.append(Column('cluster', Integer, nullable=False, default=0))
164 self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
165 Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
166 Index('idx_versions_node_uuid', self.versions.c.uuid)
168 #create attributes table
170 columns.append(Column('serial', Integer,
171 ForeignKey('versions.serial',
175 columns.append(Column('domain', String(255), primary_key=True))
176 columns.append(Column('key', String(255), primary_key=True))
177 columns.append(Column('value', String(255)))
178 self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
180 metadata.create_all(self.engine)
182 # the following code creates an index of specific length
183 # this can be accompliced in sqlalchemy >= 0.7.3
184 # providing mysql_length option during index creation
185 insp = Inspector.from_engine(self.engine)
186 indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
187 if 'idx_nodes_path' not in indexes:
188 explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
189 s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
190 self.conn.execute(s).close()
192 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
193 self.nodes.c.parent == ROOTNODE))
194 rp = self.conn.execute(s)
198 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
201 def node_create(self, parent, path):
202 """Create a new node from the given properties.
203 Return the node identifier of the new node.
205 #TODO catch IntegrityError?
206 s = self.nodes.insert().values(parent=parent, path=path)
207 r = self.conn.execute(s)
208 inserted_primary_key = r.inserted_primary_key[0]
210 return inserted_primary_key
212 def node_lookup(self, path):
213 """Lookup the current node of the given path.
214 Return None if the path is not found.
217 # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
218 s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
219 r = self.conn.execute(s)
226 def node_get_properties(self, node):
227 """Return the node's (parent, path).
228 Return None if the node is not found.
231 s = select([self.nodes.c.parent, self.nodes.c.path])
232 s = s.where(self.nodes.c.node == node)
233 r = self.conn.execute(s)
238 def node_get_versions(self, node, keys=(), propnames=_propnames):
239 """Return the properties of all versions at node.
240 If keys is empty, return all properties in the order
241 (serial, node, hash, size, source, mtime, muser, uuid, cluster).
244 s = select([self.versions.c.serial,
245 self.versions.c.node,
246 self.versions.c.hash,
247 self.versions.c.size,
248 self.versions.c.source,
249 self.versions.c.mtime,
250 self.versions.c.muser,
251 self.versions.c.uuid,
252 self.versions.c.cluster], self.versions.c.node == node)
253 s = s.order_by(self.versions.c.serial)
254 r = self.conn.execute(s)
263 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
265 def node_count_children(self, node):
266 """Return node's child count."""
268 s = select([func.count(self.nodes.c.node)])
269 s = s.where(and_(self.nodes.c.parent == node,
270 self.nodes.c.node != ROOTNODE))
271 r = self.conn.execute(s)
276 def node_purge_children(self, parent, before=inf, cluster=0):
277 """Delete all versions with the specified
278 parent and cluster, and return
279 the hashes of versions deleted.
280 Clears out nodes with no remaining versions.
283 c1 = select([self.nodes.c.node],
284 self.nodes.c.parent == parent)
285 where_clause = and_(self.versions.c.node.in_(c1),
286 self.versions.c.cluster == cluster)
287 s = select([func.count(self.versions.c.serial),
288 func.sum(self.versions.c.size)])
289 s = s.where(where_clause)
291 s = s.where(self.versions.c.mtime <= before)
292 r = self.conn.execute(s)
297 nr, size = row[0], -row[1] if row[1] else 0
299 self.statistics_update(parent, -nr, size, mtime, cluster)
300 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
302 s = select([self.versions.c.hash])
303 s = s.where(where_clause)
304 r = self.conn.execute(s)
305 hashes = [row[0] for row in r.fetchall()]
309 s = self.versions.delete().where(where_clause)
310 r = self.conn.execute(s)
314 s = select([self.nodes.c.node],
315 and_(self.nodes.c.parent == parent,
316 select([func.count(self.versions.c.serial)],
317 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
318 rp = self.conn.execute(s)
319 nodes = [r[0] for r in rp.fetchall()]
321 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
322 self.conn.execute(s).close()
326 def node_purge(self, node, before=inf, cluster=0):
327 """Delete all versions with the specified
328 node and cluster, and return
329 the hashes of versions deleted.
330 Clears out the node if it has no remaining versions.
334 s = select([func.count(self.versions.c.serial),
335 func.sum(self.versions.c.size)])
336 where_clause = and_(self.versions.c.node == node,
337 self.versions.c.cluster == cluster)
338 s = s.where(where_clause)
340 s = s.where(self.versions.c.mtime <= before)
341 r = self.conn.execute(s)
343 nr, size = row[0], row[1]
348 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
350 s = select([self.versions.c.hash])
351 s = s.where(where_clause)
352 r = self.conn.execute(s)
353 hashes = [r[0] for r in r.fetchall()]
357 s = self.versions.delete().where(where_clause)
358 r = self.conn.execute(s)
362 s = select([self.nodes.c.node],
363 and_(self.nodes.c.node == node,
364 select([func.count(self.versions.c.serial)],
365 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
366 r = self.conn.execute(s)
369 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
370 self.conn.execute(s).close()
374 def node_remove(self, node):
375 """Remove the node specified.
376 Return false if the node has children or is not found.
379 if self.node_count_children(node):
383 s = select([func.count(self.versions.c.serial),
384 func.sum(self.versions.c.size),
385 self.versions.c.cluster])
386 s = s.where(self.versions.c.node == node)
387 s = s.group_by(self.versions.c.cluster)
388 r = self.conn.execute(s)
389 for population, size, cluster in r.fetchall():
390 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
393 s = self.nodes.delete().where(self.nodes.c.node == node)
394 self.conn.execute(s).close()
397 def policy_get(self, node):
398 s = select([self.policies.c.key, self.policies.c.value],
399 self.policies.c.node==node)
400 r = self.conn.execute(s)
401 d = dict(r.fetchall())
405 def policy_set(self, node, policy):
407 for k, v in policy.iteritems():
408 s = self.policies.update().where(and_(self.policies.c.node == node,
409 self.policies.c.key == k))
410 s = s.values(value = v)
411 rp = self.conn.execute(s)
414 s = self.policies.insert()
415 values = {'node':node, 'key':k, 'value':v}
416 r = self.conn.execute(s, values)
419 def statistics_get(self, node, cluster=0):
420 """Return population, total size and last mtime
421 for all versions under node that belong to the cluster.
424 s = select([self.statistics.c.population,
425 self.statistics.c.size,
426 self.statistics.c.mtime])
427 s = s.where(and_(self.statistics.c.node == node,
428 self.statistics.c.cluster == cluster))
429 r = self.conn.execute(s)
434 def statistics_update(self, node, population, size, mtime, cluster=0):
435 """Update the statistics of the given node.
436 Statistics keep track the population, total
437 size of objects and mtime in the node's namespace.
438 May be zero or positive or negative numbers.
440 s = select([self.statistics.c.population, self.statistics.c.size],
441 and_(self.statistics.c.node == node,
442 self.statistics.c.cluster == cluster))
443 rp = self.conn.execute(s)
447 prepopulation, presize = (0, 0)
449 prepopulation, presize = r
450 population += prepopulation
455 u = self.statistics.update().where(and_(self.statistics.c.node==node,
456 self.statistics.c.cluster==cluster))
457 u = u.values(population=population, size=size, mtime=mtime)
458 rp = self.conn.execute(u)
461 ins = self.statistics.insert()
462 ins = ins.values(node=node, population=population, size=size,
463 mtime=mtime, cluster=cluster)
464 self.conn.execute(ins).close()
466 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
467 """Update the statistics of the given node's parent.
468 Then recursively update all parents up to the root.
469 Population is not recursive.
475 props = self.node_get_properties(node)
479 self.statistics_update(parent, population, size, mtime, cluster)
481 population = 0 # Population isn't recursive
483 def statistics_latest(self, node, before=inf, except_cluster=0):
484 """Return population, total size and last mtime
485 for all latest versions under node that
486 do not belong to the cluster.
490 props = self.node_get_properties(node)
495 # The latest version.
496 s = select([self.versions.c.serial,
497 self.versions.c.node,
498 self.versions.c.hash,
499 self.versions.c.size,
500 self.versions.c.source,
501 self.versions.c.mtime,
502 self.versions.c.muser,
503 self.versions.c.uuid,
504 self.versions.c.cluster])
505 filtered = select([func.max(self.versions.c.serial)],
506 self.versions.c.node == node)
508 filtered = filtered.where(self.versions.c.mtime < before)
509 s = s.where(and_(self.versions.c.cluster != except_cluster,
510 self.versions.c.serial == filtered))
511 r = self.conn.execute(s)
518 # First level, just under node (get population).
519 v = self.versions.alias('v')
520 s = select([func.count(v.c.serial),
522 func.max(v.c.mtime)])
523 c1 = select([func.max(self.versions.c.serial)])
525 c1 = c1.where(self.versions.c.mtime < before)
526 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
527 s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
528 v.c.cluster != except_cluster,
530 rp = self.conn.execute(s)
536 mtime = max(mtime, r[2])
540 # All children (get size and mtime).
541 # XXX: This is why the full path is stored.
542 s = select([func.count(v.c.serial),
544 func.max(v.c.mtime)])
545 c1 = select([func.max(self.versions.c.serial)],
546 self.versions.c.node == v.c.node)
548 c1 = c1.where(self.versions.c.mtime < before)
549 c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
550 s = s.where(and_(v.c.serial == c1,
551 v.c.cluster != except_cluster,
553 rp = self.conn.execute(s)
558 size = r[1] - props[SIZE]
559 mtime = max(mtime, r[2])
560 return (count, size, mtime)
562 def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
563 """Create a new version from the given properties.
564 Return the (serial, mtime) of the new version.
568 s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
569 mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
570 serial = self.conn.execute(s).inserted_primary_key[0]
571 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
574 def version_lookup(self, node, before=inf, cluster=0):
575 """Lookup the current version of the given node.
576 Return a list with its properties:
577 (serial, node, hash, size, source, mtime, muser, uuid, cluster)
578 or None if the current version is not found in the given cluster.
581 v = self.versions.alias('v')
582 s = select([v.c.serial, v.c.node, v.c.hash,
583 v.c.size, v.c.source, v.c.mtime,
584 v.c.muser, v.c.uuid, v.c.cluster])
585 c = select([func.max(self.versions.c.serial)],
586 self.versions.c.node == node)
588 c = c.where(self.versions.c.mtime < before)
589 s = s.where(and_(v.c.serial == c,
590 v.c.cluster == cluster))
591 r = self.conn.execute(s)
598 def version_get_properties(self, serial, keys=(), propnames=_propnames):
599 """Return a sequence of values for the properties of
600 the version specified by serial and the keys, in the order given.
601 If keys is empty, return all properties in the order
602 (serial, node, hash, size, source, mtime, muser, uuid, cluster).
605 v = self.versions.alias()
606 s = select([v.c.serial, v.c.node, v.c.hash,
607 v.c.size, v.c.source, v.c.mtime,
608 v.c.muser, v.c.uuid, v.c.cluster], v.c.serial == serial)
609 rp = self.conn.execute(s)
617 return [r[propnames[k]] for k in keys if k in propnames]
619 def version_recluster(self, serial, cluster):
620 """Move the version into another cluster."""
622 props = self.version_get_properties(serial)
627 oldcluster = props[CLUSTER]
628 if cluster == oldcluster:
632 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
633 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
635 s = self.versions.update()
636 s = s.where(self.versions.c.serial == serial)
637 s = s.values(cluster = cluster)
638 self.conn.execute(s).close()
640 def version_remove(self, serial):
641 """Remove the serial specified."""
643 props = self.version_get_properties(serial)
649 cluster = props[CLUSTER]
652 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
654 s = self.versions.delete().where(self.versions.c.serial == serial)
655 self.conn.execute(s).close()
658 def attribute_get(self, serial, domain, keys=()):
659 """Return a list of (key, value) pairs of the version specified by serial.
660 If keys is empty, return all attributes.
661 Othwerise, return only those specified.
665 attrs = self.attributes.alias()
666 s = select([attrs.c.key, attrs.c.value])
667 s = s.where(and_(attrs.c.key.in_(keys),
668 attrs.c.serial == serial,
669 attrs.c.domain == domain))
671 attrs = self.attributes.alias()
672 s = select([attrs.c.key, attrs.c.value])
673 s = s.where(and_(attrs.c.serial == serial,
674 attrs.c.domain == domain))
675 r = self.conn.execute(s)
680 def attribute_set(self, serial, domain, items):
681 """Set the attributes of the version specified by serial.
682 Receive attributes as an iterable of (key, value) pairs.
687 s = self.attributes.update()
688 s = s.where(and_(self.attributes.c.serial == serial,
689 self.attributes.c.domain == domain,
690 self.attributes.c.key == k))
691 s = s.values(value = v)
692 rp = self.conn.execute(s)
695 s = self.attributes.insert()
696 s = s.values(serial=serial, domain=domain, key=k, value=v)
697 self.conn.execute(s).close()
699 def attribute_del(self, serial, domain, keys=()):
700 """Delete attributes of the version specified by serial.
701 If keys is empty, delete all attributes.
702 Otherwise delete those specified.
706 #TODO more efficient way to do this?
708 s = self.attributes.delete()
709 s = s.where(and_(self.attributes.c.serial == serial,
710 self.attributes.c.domain == domain,
711 self.attributes.c.key == key))
712 self.conn.execute(s).close()
714 s = self.attributes.delete()
715 s = s.where(and_(self.attributes.c.serial == serial,
716 self.attributes.c.domain == domain))
717 self.conn.execute(s).close()
719 def attribute_copy(self, source, dest):
720 s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
721 self.attributes.c.serial == source)
722 rp = self.conn.execute(s)
723 attributes = rp.fetchall()
725 for dest, domain, k, v in attributes:
727 s = self.attributes.update().where(and_(
728 self.attributes.c.serial == dest,
729 self.attributes.c.domain == domain,
730 self.attributes.c.key == k))
731 rp = self.conn.execute(s, value=v)
734 s = self.attributes.insert()
735 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
736 self.conn.execute(s, values).close()
738 def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
739 """Return a list with all keys pairs defined
740 for all latest versions under parent that
741 do not belong to the cluster.
744 # TODO: Use another table to store before=inf results.
745 a = self.attributes.alias('a')
746 v = self.versions.alias('v')
747 n = self.nodes.alias('n')
748 s = select([a.c.key]).distinct()
749 filtered = select([func.max(self.versions.c.serial)])
751 filtered = filtered.where(self.versions.c.mtime < before)
752 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
753 s = s.where(v.c.cluster != except_cluster)
754 s = s.where(v.c.node.in_(select([self.nodes.c.node],
755 self.nodes.c.parent == parent)))
756 s = s.where(a.c.serial == v.c.serial)
757 s = s.where(a.c.domain == domain)
758 s = s.where(n.c.node == v.c.node)
761 conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
763 s = s.where(or_(*conj))
764 rp = self.conn.execute(s)
767 return [r[0] for r in rows]
769 def latest_version_list(self, parent, prefix='', delimiter=None,
770 start='', limit=10000, before=inf,
771 except_cluster=0, pathq=[], domain=None, filterq=[]):
772 """Return a (list of (path, serial) tuples, list of common prefixes)
773 for the current versions of the paths with the given parent,
774 matching the following criteria.
776 The property tuple for a version is returned if all
777 of these conditions are true:
783 c. path starts with prefix (and paths in pathq)
785 d. version is the max up to before
787 e. version is not in cluster
789 f. the path does not have the delimiter occuring
790 after the prefix, or ends with the delimiter
792 g. serial matches the attribute filter query.
794 A filter query is a comma-separated list of
795 terms in one of these three forms:
798 an attribute with this key must exist
801 an attribute with this key must not exist
804 the attribute with this key satisfies the value
805 where ?op is one of ==, != <=, >=, <, >.
807 The list of common prefixes includes the prefixes
808 matching up to the first delimiter after prefix,
809 and are reported only once, as "virtual directories".
810 The delimiter is included in the prefixes.
812 If arguments are None, then the corresponding matching rule
815 Limit applies to the first list of tuples returned.
818 if not start or start < prefix:
819 start = strprevling(prefix)
820 nextling = strnextling(prefix)
822 a = self.attributes.alias('a')
823 v = self.versions.alias('v')
824 n = self.nodes.alias('n')
825 s = select([n.c.path, v.c.serial]).distinct()
826 filtered = select([func.max(self.versions.c.serial)])
828 filtered = filtered.where(self.versions.c.mtime < before)
829 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
830 s = s.where(v.c.cluster != except_cluster)
831 s = s.where(v.c.node.in_(select([self.nodes.c.node],
832 self.nodes.c.parent == parent)))
833 if domain and filterq:
834 s = s.where(a.c.serial == v.c.serial)
835 s = s.where(a.c.domain == domain)
837 s = s.where(n.c.node == v.c.node)
838 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
841 conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
844 s = s.where(or_(*conj))
846 if domain and filterq:
847 included, excluded, opers = parse_filters(filterq)
849 s = s.where(a.c.key.in_(x for x in included))
851 s = s.where(not_(a.c.key.in_(x for x in excluded)))
853 for k, o, v in opers:
854 s = s.where(or_(a.c.key == k and a.c.value.op(o)(v)))
856 s = s.order_by(n.c.path)
860 rp = self.conn.execute(s, start=start)
869 pappend = prefixes.append
871 mappend = matches.append
873 rp = self.conn.execute(s, start=start)
875 props = rp.fetchone()
879 idx = path.find(delimiter, pfz)
888 if idx + dz == len(path):
891 continue # Get one more, in case there is a path.
897 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
900 return matches, prefixes
902 def latest_uuid(self, uuid):
903 """Return a (path, serial) tuple, for the latest version of the given uuid."""
905 v = self.versions.alias('v')
906 n = self.nodes.alias('n')
907 s = select([n.c.path, v.c.serial])
908 filtered = select([func.max(self.versions.c.serial)])
909 s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
910 s = s.where(n.c.node == v.c.node)
912 r = self.conn.execute(s)