1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
42 from dbworker import DBWorker
44 from pithos.lib.filter import parse_filters
49 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
54 def strnextling(prefix):
55 """Return the first unicode string
56 greater than but not starting with given prefix.
57 strnextling('hello') -> 'hellp'
60 ## all strings start with the null string,
61 ## therefore we have to approximate strnextling('')
62 ## with the last unicode character supported by python
63 ## 0x10ffff for wide (32-bit unicode) python builds
64 ## 0x00ffff for narrow (16-bit unicode) python builds
65 ## We will not autodetect. 0xffff is safe enough.
74 def strprevling(prefix):
75 """Return an approximation of the last unicode string
76 less than but not starting with given prefix.
77 strprevling(u'hello') -> u'helln\\xffff'
80 ## There is no prevling for the null string
85 s += unichr(c-1) + unichr(0xffff)
101 class Node(DBWorker):
102 """Nodes store path organization and have multiple versions.
103 Versions store object history and have multiple attributes.
104 Attributes store metadata.
107 # TODO: Provide an interface for included and excluded clusters.
109 def __init__(self, **params):
110 DBWorker.__init__(self, **params)
111 metadata = MetaData()
115 columns.append(Column('node', Integer, primary_key=True))
116 columns.append(Column('parent', Integer,
117 ForeignKey('nodes.node',
120 autoincrement=False))
122 columns.append(Column('path', String(path_length), default='', nullable=False))
123 self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
124 Index('idx_nodes_path', self.nodes.c.path, unique=True)
128 columns.append(Column('node', Integer,
129 ForeignKey('nodes.node',
133 columns.append(Column('key', String(255), primary_key=True))
134 columns.append(Column('value', String(255)))
135 self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
137 #create statistics table
139 columns.append(Column('node', Integer,
140 ForeignKey('nodes.node',
144 columns.append(Column('population', Integer, nullable=False, default=0))
145 columns.append(Column('size', BigInteger, nullable=False, default=0))
146 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
147 columns.append(Column('cluster', Integer, nullable=False, default=0,
148 primary_key=True, autoincrement=False))
149 self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
151 #create versions table
153 columns.append(Column('serial', Integer, primary_key=True))
154 columns.append(Column('node', Integer,
155 ForeignKey('nodes.node',
157 onupdate='CASCADE')))
158 columns.append(Column('hash', String(255)))
159 columns.append(Column('size', BigInteger, nullable=False, default=0))
160 columns.append(Column('source', Integer))
161 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
162 columns.append(Column('muser', String(255), nullable=False, default=''))
163 columns.append(Column('uuid', String(64), nullable=False, default=''))
164 columns.append(Column('cluster', Integer, nullable=False, default=0))
165 self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
166 Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
167 Index('idx_versions_node_uuid', self.versions.c.uuid)
169 #create attributes table
171 columns.append(Column('serial', Integer,
172 ForeignKey('versions.serial',
176 columns.append(Column('domain', String(255), primary_key=True))
177 columns.append(Column('key', String(255), primary_key=True))
178 columns.append(Column('value', String(255)))
179 self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
181 metadata.create_all(self.engine)
183 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
184 self.nodes.c.parent == ROOTNODE))
185 rp = self.conn.execute(s)
189 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
192 def node_create(self, parent, path):
193 """Create a new node from the given properties.
194 Return the node identifier of the new node.
196 #TODO catch IntegrityError?
197 s = self.nodes.insert().values(parent=parent, path=path)
198 r = self.conn.execute(s)
199 inserted_primary_key = r.inserted_primary_key[0]
201 return inserted_primary_key
203 def node_lookup(self, path):
204 """Lookup the current node of the given path.
205 Return None if the path is not found.
208 # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
209 s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
210 r = self.conn.execute(s)
217 def node_get_properties(self, node):
218 """Return the node's (parent, path).
219 Return None if the node is not found.
222 s = select([self.nodes.c.parent, self.nodes.c.path])
223 s = s.where(self.nodes.c.node == node)
224 r = self.conn.execute(s)
229 def node_get_versions(self, node, keys=(), propnames=_propnames):
230 """Return the properties of all versions at node.
231 If keys is empty, return all properties in the order
232 (serial, node, hash, size, source, mtime, muser, uuid, cluster).
235 s = select([self.versions.c.serial,
236 self.versions.c.node,
237 self.versions.c.hash,
238 self.versions.c.size,
239 self.versions.c.source,
240 self.versions.c.mtime,
241 self.versions.c.muser,
242 self.versions.c.uuid,
243 self.versions.c.cluster], self.versions.c.node == node)
244 s = s.order_by(self.versions.c.serial)
245 r = self.conn.execute(s)
254 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
256 def node_count_children(self, node):
257 """Return node's child count."""
259 s = select([func.count(self.nodes.c.node)])
260 s = s.where(and_(self.nodes.c.parent == node,
261 self.nodes.c.node != ROOTNODE))
262 r = self.conn.execute(s)
267 def node_purge_children(self, parent, before=inf, cluster=0):
268 """Delete all versions with the specified
269 parent and cluster, and return
270 the hashes of versions deleted.
271 Clears out nodes with no remaining versions.
274 c1 = select([self.nodes.c.node],
275 self.nodes.c.parent == parent)
276 where_clause = and_(self.versions.c.node.in_(c1),
277 self.versions.c.cluster == cluster)
278 s = select([func.count(self.versions.c.serial),
279 func.sum(self.versions.c.size)])
280 s = s.where(where_clause)
282 s = s.where(self.versions.c.mtime <= before)
283 r = self.conn.execute(s)
288 nr, size = row[0], -row[1] if row[1] else 0
290 self.statistics_update(parent, -nr, size, mtime, cluster)
291 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
293 s = select([self.versions.c.hash])
294 s = s.where(where_clause)
295 r = self.conn.execute(s)
296 hashes = [row[0] for row in r.fetchall()]
300 s = self.versions.delete().where(where_clause)
301 r = self.conn.execute(s)
305 s = select([self.nodes.c.node],
306 and_(self.nodes.c.parent == parent,
307 select([func.count(self.versions.c.serial)],
308 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
309 rp = self.conn.execute(s)
310 nodes = [r[0] for r in rp.fetchall()]
312 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
313 self.conn.execute(s).close()
317 def node_purge(self, node, before=inf, cluster=0):
318 """Delete all versions with the specified
319 node and cluster, and return
320 the hashes of versions deleted.
321 Clears out the node if it has no remaining versions.
325 s = select([func.count(self.versions.c.serial),
326 func.sum(self.versions.c.size)])
327 where_clause = and_(self.versions.c.node == node,
328 self.versions.c.cluster == cluster)
329 s = s.where(where_clause)
331 s = s.where(self.versions.c.mtime <= before)
332 r = self.conn.execute(s)
334 nr, size = row[0], row[1]
339 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
341 s = select([self.versions.c.hash])
342 s = s.where(where_clause)
343 r = self.conn.execute(s)
344 hashes = [r[0] for r in r.fetchall()]
348 s = self.versions.delete().where(where_clause)
349 r = self.conn.execute(s)
353 s = select([self.nodes.c.node],
354 and_(self.nodes.c.node == node,
355 select([func.count(self.versions.c.serial)],
356 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
357 r = self.conn.execute(s)
360 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
361 self.conn.execute(s).close()
365 def node_remove(self, node):
366 """Remove the node specified.
367 Return false if the node has children or is not found.
370 if self.node_count_children(node):
374 s = select([func.count(self.versions.c.serial),
375 func.sum(self.versions.c.size),
376 self.versions.c.cluster])
377 s = s.where(self.versions.c.node == node)
378 s = s.group_by(self.versions.c.cluster)
379 r = self.conn.execute(s)
380 for population, size, cluster in r.fetchall():
381 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
384 s = self.nodes.delete().where(self.nodes.c.node == node)
385 self.conn.execute(s).close()
388 def policy_get(self, node):
389 s = select([self.policies.c.key, self.policies.c.value],
390 self.policies.c.node==node)
391 r = self.conn.execute(s)
392 d = dict(r.fetchall())
396 def policy_set(self, node, policy):
398 for k, v in policy.iteritems():
399 s = self.policies.update().where(and_(self.policies.c.node == node,
400 self.policies.c.key == k))
401 s = s.values(value = v)
402 rp = self.conn.execute(s)
405 s = self.policies.insert()
406 values = {'node':node, 'key':k, 'value':v}
407 r = self.conn.execute(s, values)
410 def statistics_get(self, node, cluster=0):
411 """Return population, total size and last mtime
412 for all versions under node that belong to the cluster.
415 s = select([self.statistics.c.population,
416 self.statistics.c.size,
417 self.statistics.c.mtime])
418 s = s.where(and_(self.statistics.c.node == node,
419 self.statistics.c.cluster == cluster))
420 r = self.conn.execute(s)
425 def statistics_update(self, node, population, size, mtime, cluster=0):
426 """Update the statistics of the given node.
427 Statistics keep track the population, total
428 size of objects and mtime in the node's namespace.
429 May be zero or positive or negative numbers.
431 s = select([self.statistics.c.population, self.statistics.c.size],
432 and_(self.statistics.c.node == node,
433 self.statistics.c.cluster == cluster))
434 rp = self.conn.execute(s)
438 prepopulation, presize = (0, 0)
440 prepopulation, presize = r
441 population += prepopulation
446 u = self.statistics.update().where(and_(self.statistics.c.node==node,
447 self.statistics.c.cluster==cluster))
448 u = u.values(population=population, size=size, mtime=mtime)
449 rp = self.conn.execute(u)
452 ins = self.statistics.insert()
453 ins = ins.values(node=node, population=population, size=size,
454 mtime=mtime, cluster=cluster)
455 self.conn.execute(ins).close()
457 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
458 """Update the statistics of the given node's parent.
459 Then recursively update all parents up to the root.
460 Population is not recursive.
466 props = self.node_get_properties(node)
470 self.statistics_update(parent, population, size, mtime, cluster)
472 population = 0 # Population isn't recursive
474 def statistics_latest(self, node, before=inf, except_cluster=0):
475 """Return population, total size and last mtime
476 for all latest versions under node that
477 do not belong to the cluster.
481 props = self.node_get_properties(node)
486 # The latest version.
487 s = select([self.versions.c.serial,
488 self.versions.c.node,
489 self.versions.c.hash,
490 self.versions.c.size,
491 self.versions.c.source,
492 self.versions.c.mtime,
493 self.versions.c.muser,
494 self.versions.c.uuid,
495 self.versions.c.cluster])
496 filtered = select([func.max(self.versions.c.serial)],
497 self.versions.c.node == node)
499 filtered = filtered.where(self.versions.c.mtime < before)
500 s = s.where(and_(self.versions.c.cluster != except_cluster,
501 self.versions.c.serial == filtered))
502 r = self.conn.execute(s)
509 # First level, just under node (get population).
510 v = self.versions.alias('v')
511 s = select([func.count(v.c.serial),
513 func.max(v.c.mtime)])
514 c1 = select([func.max(self.versions.c.serial)])
516 c1 = c1.where(self.versions.c.mtime < before)
517 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
518 s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
519 v.c.cluster != except_cluster,
521 rp = self.conn.execute(s)
527 mtime = max(mtime, r[2])
531 # All children (get size and mtime).
532 # XXX: This is why the full path is stored.
533 s = select([func.count(v.c.serial),
535 func.max(v.c.mtime)])
536 c1 = select([func.max(self.versions.c.serial)],
537 self.versions.c.node == v.c.node)
539 c1 = c1.where(self.versions.c.mtime < before)
540 c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
541 s = s.where(and_(v.c.serial == c1,
542 v.c.cluster != except_cluster,
544 rp = self.conn.execute(s)
549 size = r[1] - props[SIZE]
550 mtime = max(mtime, r[2])
551 return (count, size, mtime)
553 def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
554 """Create a new version from the given properties.
555 Return the (serial, mtime) of the new version.
559 s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
560 mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
561 serial = self.conn.execute(s).inserted_primary_key[0]
562 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
565 def version_lookup(self, node, before=inf, cluster=0):
566 """Lookup the current version of the given node.
567 Return a list with its properties:
568 (serial, node, hash, size, source, mtime, muser, uuid, cluster)
569 or None if the current version is not found in the given cluster.
572 v = self.versions.alias('v')
573 s = select([v.c.serial, v.c.node, v.c.hash,
574 v.c.size, v.c.source, v.c.mtime,
575 v.c.muser, v.c.uuid, v.c.cluster])
576 c = select([func.max(self.versions.c.serial)],
577 self.versions.c.node == node)
579 c = c.where(self.versions.c.mtime < before)
580 s = s.where(and_(v.c.serial == c,
581 v.c.cluster == cluster))
582 r = self.conn.execute(s)
589 def version_get_properties(self, serial, keys=(), propnames=_propnames):
590 """Return a sequence of values for the properties of
591 the version specified by serial and the keys, in the order given.
592 If keys is empty, return all properties in the order
593 (serial, node, hash, size, source, mtime, muser, uuid, cluster).
596 v = self.versions.alias()
597 s = select([v.c.serial, v.c.node, v.c.hash,
598 v.c.size, v.c.source, v.c.mtime,
599 v.c.muser, v.c.uuid, v.c.cluster], v.c.serial == serial)
600 rp = self.conn.execute(s)
608 return [r[propnames[k]] for k in keys if k in propnames]
610 def version_recluster(self, serial, cluster):
611 """Move the version into another cluster."""
613 props = self.version_get_properties(serial)
618 oldcluster = props[CLUSTER]
619 if cluster == oldcluster:
623 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
624 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
626 s = self.versions.update()
627 s = s.where(self.versions.c.serial == serial)
628 s = s.values(cluster = cluster)
629 self.conn.execute(s).close()
631 def version_remove(self, serial):
632 """Remove the serial specified."""
634 props = self.version_get_properties(serial)
640 cluster = props[CLUSTER]
643 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
645 s = self.versions.delete().where(self.versions.c.serial == serial)
646 self.conn.execute(s).close()
649 def attribute_get(self, serial, domain, keys=()):
650 """Return a list of (key, value) pairs of the version specified by serial.
651 If keys is empty, return all attributes.
652 Othwerise, return only those specified.
656 attrs = self.attributes.alias()
657 s = select([attrs.c.key, attrs.c.value])
658 s = s.where(and_(attrs.c.key.in_(keys),
659 attrs.c.serial == serial,
660 attrs.c.domain == domain))
662 attrs = self.attributes.alias()
663 s = select([attrs.c.key, attrs.c.value])
664 s = s.where(and_(attrs.c.serial == serial,
665 attrs.c.domain == domain))
666 r = self.conn.execute(s)
671 def attribute_set(self, serial, domain, items):
672 """Set the attributes of the version specified by serial.
673 Receive attributes as an iterable of (key, value) pairs.
678 s = self.attributes.update()
679 s = s.where(and_(self.attributes.c.serial == serial,
680 self.attributes.c.domain == domain,
681 self.attributes.c.key == k))
682 s = s.values(value = v)
683 rp = self.conn.execute(s)
686 s = self.attributes.insert()
687 s = s.values(serial=serial, domain=domain, key=k, value=v)
688 self.conn.execute(s).close()
690 def attribute_del(self, serial, domain, keys=()):
691 """Delete attributes of the version specified by serial.
692 If keys is empty, delete all attributes.
693 Otherwise delete those specified.
697 #TODO more efficient way to do this?
699 s = self.attributes.delete()
700 s = s.where(and_(self.attributes.c.serial == serial,
701 self.attributes.c.domain == domain,
702 self.attributes.c.key == key))
703 self.conn.execute(s).close()
705 s = self.attributes.delete()
706 s = s.where(and_(self.attributes.c.serial == serial,
707 self.attributes.c.domain == domain))
708 self.conn.execute(s).close()
710 def attribute_copy(self, source, dest):
711 s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
712 self.attributes.c.serial == source)
713 rp = self.conn.execute(s)
714 attributes = rp.fetchall()
716 for dest, domain, k, v in attributes:
718 s = self.attributes.update().where(and_(
719 self.attributes.c.serial == dest,
720 self.attributes.c.domain == domain,
721 self.attributes.c.key == k))
722 rp = self.conn.execute(s, value=v)
725 s = self.attributes.insert()
726 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
727 self.conn.execute(s, values).close()
729 def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
730 """Return a list with all keys pairs defined
731 for all latest versions under parent that
732 do not belong to the cluster.
735 # TODO: Use another table to store before=inf results.
736 a = self.attributes.alias('a')
737 v = self.versions.alias('v')
738 n = self.nodes.alias('n')
739 s = select([a.c.key]).distinct()
740 filtered = select([func.max(self.versions.c.serial)])
742 filtered = filtered.where(self.versions.c.mtime < before)
743 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
744 s = s.where(v.c.cluster != except_cluster)
745 s = s.where(v.c.node.in_(select([self.nodes.c.node],
746 self.nodes.c.parent == parent)))
747 s = s.where(a.c.serial == v.c.serial)
748 s = s.where(a.c.domain == domain)
749 s = s.where(n.c.node == v.c.node)
752 conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
754 s = s.where(or_(*conj))
755 rp = self.conn.execute(s)
758 return [r[0] for r in rows]
760 def latest_version_list(self, parent, prefix='', delimiter=None,
761 start='', limit=10000, before=inf,
762 except_cluster=0, pathq=[], domain=None, filterq=[], sizeq=None):
763 """Return a (list of (path, serial) tuples, list of common prefixes)
764 for the current versions of the paths with the given parent,
765 matching the following criteria.
767 The property tuple for a version is returned if all
768 of these conditions are true:
774 c. path starts with prefix (and paths in pathq)
776 d. version is the max up to before
778 e. version is not in cluster
780 f. the path does not have the delimiter occuring
781 after the prefix, or ends with the delimiter
783 g. serial matches the attribute filter query.
785 A filter query is a comma-separated list of
786 terms in one of these three forms:
789 an attribute with this key must exist
792 an attribute with this key must not exist
795 the attribute with this key satisfies the value
796 where ?op is one of ==, != <=, >=, <, >.
798 h. the size is in the range set by sizeq
800 The list of common prefixes includes the prefixes
801 matching up to the first delimiter after prefix,
802 and are reported only once, as "virtual directories".
803 The delimiter is included in the prefixes.
805 If arguments are None, then the corresponding matching rule
808 Limit applies to the first list of tuples returned.
811 if not start or start < prefix:
812 start = strprevling(prefix)
813 nextling = strnextling(prefix)
815 v = self.versions.alias('v')
816 n = self.nodes.alias('n')
817 s = select([n.c.path, v.c.serial]).distinct()
818 filtered = select([func.max(self.versions.c.serial)])
820 filtered = filtered.where(self.versions.c.mtime < before)
821 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
822 s = s.where(v.c.cluster != except_cluster)
823 s = s.where(v.c.node.in_(select([self.nodes.c.node],
824 self.nodes.c.parent == parent)))
826 s = s.where(n.c.node == v.c.node)
827 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
830 conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
832 s = s.where(or_(*conj))
834 if sizeq and len(sizeq) == 2:
836 s = s.where(v.c.size >= sizeq[0])
838 s = s.where(v.c.size < sizeq[1])
840 if domain and filterq:
841 a = self.attributes.alias('a')
842 included, excluded, opers = parse_filters(filterq)
845 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
846 subs = subs.where(a.c.domain == domain)
847 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
848 s = s.where(exists(subs))
851 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
852 subs = subs.where(a.c.domain == domain)
853 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
854 s = s.where(not_(exists(subs)))
856 for k, o, val in opers:
858 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
859 subs = subs.where(a.c.domain == domain)
860 subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
861 s = s.where(exists(subs))
863 s = s.order_by(n.c.path)
867 rp = self.conn.execute(s, start=start)
876 pappend = prefixes.append
878 mappend = matches.append
880 rp = self.conn.execute(s, start=start)
882 props = rp.fetchone()
886 idx = path.find(delimiter, pfz)
895 if idx + dz == len(path):
898 continue # Get one more, in case there is a path.
904 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
907 return matches, prefixes
909 def latest_uuid(self, uuid):
910 """Return a (path, serial) tuple, for the latest version of the given uuid."""
912 v = self.versions.alias('v')
913 n = self.nodes.alias('n')
914 s = select([n.c.path, v.c.serial])
915 filtered = select([func.max(self.versions.c.serial)])
916 s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
917 s = s.where(n.c.node == v.c.node)
919 r = self.conn.execute(s)