1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
42 from dbworker import DBWorker
44 from pithos.lib.filter import parse_filters, OPERATORS
48 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
53 def strnextling(prefix):
54 """Return the first unicode string
55 greater than but not starting with given prefix.
56 strnextling('hello') -> 'hellp'
59 ## all strings start with the null string,
60 ## therefore we have to approximate strnextling('')
61 ## with the last unicode character supported by python
62 ## 0x10ffff for wide (32-bit unicode) python builds
63 ## 0x00ffff for narrow (16-bit unicode) python builds
64 ## We will not autodetect. 0xffff is safe enough.
73 def strprevling(prefix):
74 """Return an approximation of the last unicode string
75 less than but not starting with given prefix.
76 strprevling(u'hello') -> u'helln\\xffff'
79 ## There is no prevling for the null string
84 s += unichr(c-1) + unichr(0xffff)
100 class Node(DBWorker):
101 """Nodes store path organization and have multiple versions.
102 Versions store object history and have multiple attributes.
103 Attributes store metadata.
106 # TODO: Provide an interface for included and excluded clusters.
108 def __init__(self, **params):
109 DBWorker.__init__(self, **params)
110 metadata = MetaData()
114 columns.append(Column('node', Integer, primary_key=True))
115 columns.append(Column('parent', Integer,
116 ForeignKey('nodes.node',
119 autoincrement=False))
121 columns.append(Column('path', String(path_length), default='', nullable=False))
122 self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
126 columns.append(Column('node', Integer,
127 ForeignKey('nodes.node',
131 columns.append(Column('key', String(255), primary_key=True))
132 columns.append(Column('value', String(255)))
133 self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
135 #create statistics table
137 columns.append(Column('node', Integer,
138 ForeignKey('nodes.node',
142 columns.append(Column('population', Integer, nullable=False, default=0))
143 columns.append(Column('size', BigInteger, nullable=False, default=0))
144 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
145 columns.append(Column('cluster', Integer, nullable=False, default=0,
146 primary_key=True, autoincrement=False))
147 self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
149 #create versions table
151 columns.append(Column('serial', Integer, primary_key=True))
152 columns.append(Column('node', Integer,
153 ForeignKey('nodes.node',
155 onupdate='CASCADE')))
156 columns.append(Column('hash', String(255)))
157 columns.append(Column('size', BigInteger, nullable=False, default=0))
158 columns.append(Column('source', Integer))
159 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
160 columns.append(Column('muser', String(255), nullable=False, default=''))
161 columns.append(Column('uuid', String(64), nullable=False, default=''))
162 columns.append(Column('cluster', Integer, nullable=False, default=0))
163 self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
164 Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
165 Index('idx_versions_node_uuid', self.versions.c.uuid)
167 #create attributes table
169 columns.append(Column('serial', Integer,
170 ForeignKey('versions.serial',
174 columns.append(Column('domain', String(255), primary_key=True))
175 columns.append(Column('key', String(255), primary_key=True))
176 columns.append(Column('value', String(255)))
177 self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
179 metadata.create_all(self.engine)
181 # the following code creates an index of specific length
182 # this can be accompliced in sqlalchemy >= 0.7.3
183 # providing mysql_length option during index creation
184 insp = Inspector.from_engine(self.engine)
185 indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
186 if 'idx_nodes_path' not in indexes:
187 explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
188 s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
189 self.conn.execute(s).close()
191 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
192 self.nodes.c.parent == ROOTNODE))
193 rp = self.conn.execute(s)
197 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
200 def node_create(self, parent, path):
201 """Create a new node from the given properties.
202 Return the node identifier of the new node.
204 #TODO catch IntegrityError?
205 s = self.nodes.insert().values(parent=parent, path=path)
206 r = self.conn.execute(s)
207 inserted_primary_key = r.inserted_primary_key[0]
209 return inserted_primary_key
211 def node_lookup(self, path):
212 """Lookup the current node of the given path.
213 Return None if the path is not found.
216 # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
217 s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
218 r = self.conn.execute(s)
225 def node_get_properties(self, node):
226 """Return the node's (parent, path).
227 Return None if the node is not found.
230 s = select([self.nodes.c.parent, self.nodes.c.path])
231 s = s.where(self.nodes.c.node == node)
232 r = self.conn.execute(s)
237 def node_get_versions(self, node, keys=(), propnames=_propnames):
238 """Return the properties of all versions at node.
239 If keys is empty, return all properties in the order
240 (serial, node, hash, size, source, mtime, muser, uuid, cluster).
243 s = select([self.versions.c.serial,
244 self.versions.c.node,
245 self.versions.c.hash,
246 self.versions.c.size,
247 self.versions.c.source,
248 self.versions.c.mtime,
249 self.versions.c.muser,
250 self.versions.c.uuid,
251 self.versions.c.cluster], self.versions.c.node == node)
252 s = s.order_by(self.versions.c.serial)
253 r = self.conn.execute(s)
262 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
264 def node_count_children(self, node):
265 """Return node's child count."""
267 s = select([func.count(self.nodes.c.node)])
268 s = s.where(and_(self.nodes.c.parent == node,
269 self.nodes.c.node != ROOTNODE))
270 r = self.conn.execute(s)
275 def node_purge_children(self, parent, before=inf, cluster=0):
276 """Delete all versions with the specified
277 parent and cluster, and return
278 the hashes of versions deleted.
279 Clears out nodes with no remaining versions.
282 c1 = select([self.nodes.c.node],
283 self.nodes.c.parent == parent)
284 where_clause = and_(self.versions.c.node.in_(c1),
285 self.versions.c.cluster == cluster)
286 s = select([func.count(self.versions.c.serial),
287 func.sum(self.versions.c.size)])
288 s = s.where(where_clause)
290 s = s.where(self.versions.c.mtime <= before)
291 r = self.conn.execute(s)
296 nr, size = row[0], -row[1] if row[1] else 0
298 self.statistics_update(parent, -nr, size, mtime, cluster)
299 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
301 s = select([self.versions.c.hash])
302 s = s.where(where_clause)
303 r = self.conn.execute(s)
304 hashes = [row[0] for row in r.fetchall()]
308 s = self.versions.delete().where(where_clause)
309 r = self.conn.execute(s)
313 s = select([self.nodes.c.node],
314 and_(self.nodes.c.parent == parent,
315 select([func.count(self.versions.c.serial)],
316 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
317 rp = self.conn.execute(s)
318 nodes = [r[0] for r in rp.fetchall()]
320 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
321 self.conn.execute(s).close()
325 def node_purge(self, node, before=inf, cluster=0):
326 """Delete all versions with the specified
327 node and cluster, and return
328 the hashes of versions deleted.
329 Clears out the node if it has no remaining versions.
333 s = select([func.count(self.versions.c.serial),
334 func.sum(self.versions.c.size)])
335 where_clause = and_(self.versions.c.node == node,
336 self.versions.c.cluster == cluster)
337 s = s.where(where_clause)
339 s = s.where(self.versions.c.mtime <= before)
340 r = self.conn.execute(s)
342 nr, size = row[0], row[1]
347 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
349 s = select([self.versions.c.hash])
350 s = s.where(where_clause)
351 r = self.conn.execute(s)
352 hashes = [r[0] for r in r.fetchall()]
356 s = self.versions.delete().where(where_clause)
357 r = self.conn.execute(s)
361 s = select([self.nodes.c.node],
362 and_(self.nodes.c.node == node,
363 select([func.count(self.versions.c.serial)],
364 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
365 r = self.conn.execute(s)
368 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
369 self.conn.execute(s).close()
373 def node_remove(self, node):
374 """Remove the node specified.
375 Return false if the node has children or is not found.
378 if self.node_count_children(node):
382 s = select([func.count(self.versions.c.serial),
383 func.sum(self.versions.c.size),
384 self.versions.c.cluster])
385 s = s.where(self.versions.c.node == node)
386 s = s.group_by(self.versions.c.cluster)
387 r = self.conn.execute(s)
388 for population, size, cluster in r.fetchall():
389 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
392 s = self.nodes.delete().where(self.nodes.c.node == node)
393 self.conn.execute(s).close()
396 def policy_get(self, node):
397 s = select([self.policies.c.key, self.policies.c.value],
398 self.policies.c.node==node)
399 r = self.conn.execute(s)
400 d = dict(r.fetchall())
404 def policy_set(self, node, policy):
406 for k, v in policy.iteritems():
407 s = self.policies.update().where(and_(self.policies.c.node == node,
408 self.policies.c.key == k))
409 s = s.values(value = v)
410 rp = self.conn.execute(s)
413 s = self.policies.insert()
414 values = {'node':node, 'key':k, 'value':v}
415 r = self.conn.execute(s, values)
418 def statistics_get(self, node, cluster=0):
419 """Return population, total size and last mtime
420 for all versions under node that belong to the cluster.
423 s = select([self.statistics.c.population,
424 self.statistics.c.size,
425 self.statistics.c.mtime])
426 s = s.where(and_(self.statistics.c.node == node,
427 self.statistics.c.cluster == cluster))
428 r = self.conn.execute(s)
433 def statistics_update(self, node, population, size, mtime, cluster=0):
434 """Update the statistics of the given node.
435 Statistics keep track the population, total
436 size of objects and mtime in the node's namespace.
437 May be zero or positive or negative numbers.
439 s = select([self.statistics.c.population, self.statistics.c.size],
440 and_(self.statistics.c.node == node,
441 self.statistics.c.cluster == cluster))
442 rp = self.conn.execute(s)
446 prepopulation, presize = (0, 0)
448 prepopulation, presize = r
449 population += prepopulation
454 u = self.statistics.update().where(and_(self.statistics.c.node==node,
455 self.statistics.c.cluster==cluster))
456 u = u.values(population=population, size=size, mtime=mtime)
457 rp = self.conn.execute(u)
460 ins = self.statistics.insert()
461 ins = ins.values(node=node, population=population, size=size,
462 mtime=mtime, cluster=cluster)
463 self.conn.execute(ins).close()
465 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
466 """Update the statistics of the given node's parent.
467 Then recursively update all parents up to the root.
468 Population is not recursive.
474 props = self.node_get_properties(node)
478 self.statistics_update(parent, population, size, mtime, cluster)
480 population = 0 # Population isn't recursive
482 def statistics_latest(self, node, before=inf, except_cluster=0):
483 """Return population, total size and last mtime
484 for all latest versions under node that
485 do not belong to the cluster.
489 props = self.node_get_properties(node)
494 # The latest version.
495 s = select([self.versions.c.serial,
496 self.versions.c.node,
497 self.versions.c.hash,
498 self.versions.c.size,
499 self.versions.c.source,
500 self.versions.c.mtime,
501 self.versions.c.muser,
502 self.versions.c.uuid,
503 self.versions.c.cluster])
504 filtered = select([func.max(self.versions.c.serial)],
505 self.versions.c.node == node)
507 filtered = filtered.where(self.versions.c.mtime < before)
508 s = s.where(and_(self.versions.c.cluster != except_cluster,
509 self.versions.c.serial == filtered))
510 r = self.conn.execute(s)
517 # First level, just under node (get population).
518 v = self.versions.alias('v')
519 s = select([func.count(v.c.serial),
521 func.max(v.c.mtime)])
522 c1 = select([func.max(self.versions.c.serial)])
524 c1 = c1.where(self.versions.c.mtime < before)
525 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
526 s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
527 v.c.cluster != except_cluster,
529 rp = self.conn.execute(s)
535 mtime = max(mtime, r[2])
539 # All children (get size and mtime).
540 # XXX: This is why the full path is stored.
541 s = select([func.count(v.c.serial),
543 func.max(v.c.mtime)])
544 c1 = select([func.max(self.versions.c.serial)],
545 self.versions.c.node == v.c.node)
547 c1 = c1.where(self.versions.c.mtime < before)
548 c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
549 s = s.where(and_(v.c.serial == c1,
550 v.c.cluster != except_cluster,
552 rp = self.conn.execute(s)
557 size = r[1] - props[SIZE]
558 mtime = max(mtime, r[2])
559 return (count, size, mtime)
561 def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
562 """Create a new version from the given properties.
563 Return the (serial, mtime) of the new version.
567 s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
568 mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
569 serial = self.conn.execute(s).inserted_primary_key[0]
570 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
573 def version_lookup(self, node, before=inf, cluster=0):
574 """Lookup the current version of the given node.
575 Return a list with its properties:
576 (serial, node, hash, size, source, mtime, muser, uuid, cluster)
577 or None if the current version is not found in the given cluster.
580 v = self.versions.alias('v')
581 s = select([v.c.serial, v.c.node, v.c.hash,
582 v.c.size, v.c.source, v.c.mtime,
583 v.c.muser, v.c.uuid, v.c.cluster])
584 c = select([func.max(self.versions.c.serial)],
585 self.versions.c.node == node)
587 c = c.where(self.versions.c.mtime < before)
588 s = s.where(and_(v.c.serial == c,
589 v.c.cluster == cluster))
590 r = self.conn.execute(s)
597 def version_get_properties(self, serial, keys=(), propnames=_propnames):
598 """Return a sequence of values for the properties of
599 the version specified by serial and the keys, in the order given.
600 If keys is empty, return all properties in the order
601 (serial, node, hash, size, source, mtime, muser, uuid, cluster).
604 v = self.versions.alias()
605 s = select([v.c.serial, v.c.node, v.c.hash,
606 v.c.size, v.c.source, v.c.mtime,
607 v.c.muser, v.c.uuid, v.c.cluster], v.c.serial == serial)
608 rp = self.conn.execute(s)
616 return [r[propnames[k]] for k in keys if k in propnames]
618 def version_recluster(self, serial, cluster):
619 """Move the version into another cluster."""
621 props = self.version_get_properties(serial)
626 oldcluster = props[CLUSTER]
627 if cluster == oldcluster:
631 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
632 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
634 s = self.versions.update()
635 s = s.where(self.versions.c.serial == serial)
636 s = s.values(cluster = cluster)
637 self.conn.execute(s).close()
639 def version_remove(self, serial):
640 """Remove the serial specified."""
642 props = self.version_get_properties(serial)
648 cluster = props[CLUSTER]
651 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
653 s = self.versions.delete().where(self.versions.c.serial == serial)
654 self.conn.execute(s).close()
657 def attribute_get(self, serial, domain, keys=()):
658 """Return a list of (key, value) pairs of the version specified by serial.
659 If keys is empty, return all attributes.
660 Othwerise, return only those specified.
664 attrs = self.attributes.alias()
665 s = select([attrs.c.key, attrs.c.value])
666 s = s.where(and_(attrs.c.key.in_(keys),
667 attrs.c.serial == serial,
668 attrs.c.domain == domain))
670 attrs = self.attributes.alias()
671 s = select([attrs.c.key, attrs.c.value])
672 s = s.where(and_(attrs.c.serial == serial,
673 attrs.c.domain == domain))
674 r = self.conn.execute(s)
679 def attribute_set(self, serial, domain, items):
680 """Set the attributes of the version specified by serial.
681 Receive attributes as an iterable of (key, value) pairs.
686 s = self.attributes.update()
687 s = s.where(and_(self.attributes.c.serial == serial,
688 self.attributes.c.domain == domain,
689 self.attributes.c.key == k))
690 s = s.values(value = v)
691 rp = self.conn.execute(s)
694 s = self.attributes.insert()
695 s = s.values(serial=serial, domain=domain, key=k, value=v)
696 self.conn.execute(s).close()
698 def attribute_del(self, serial, domain, keys=()):
699 """Delete attributes of the version specified by serial.
700 If keys is empty, delete all attributes.
701 Otherwise delete those specified.
705 #TODO more efficient way to do this?
707 s = self.attributes.delete()
708 s = s.where(and_(self.attributes.c.serial == serial,
709 self.attributes.c.domain == domain,
710 self.attributes.c.key == key))
711 self.conn.execute(s).close()
713 s = self.attributes.delete()
714 s = s.where(and_(self.attributes.c.serial == serial,
715 self.attributes.c.domain == domain))
716 self.conn.execute(s).close()
718 def attribute_copy(self, source, dest):
719 s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
720 self.attributes.c.serial == source)
721 rp = self.conn.execute(s)
722 attributes = rp.fetchall()
724 for dest, domain, k, v in attributes:
726 s = self.attributes.update().where(and_(
727 self.attributes.c.serial == dest,
728 self.attributes.c.domain == domain,
729 self.attributes.c.key == k))
730 rp = self.conn.execute(s, value=v)
733 s = self.attributes.insert()
734 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
735 self.conn.execute(s, values).close()
737 def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
738 """Return a list with all keys pairs defined
739 for all latest versions under parent that
740 do not belong to the cluster.
743 # TODO: Use another table to store before=inf results.
744 a = self.attributes.alias('a')
745 v = self.versions.alias('v')
746 n = self.nodes.alias('n')
747 s = select([a.c.key]).distinct()
748 filtered = select([func.max(self.versions.c.serial)])
750 filtered = filtered.where(self.versions.c.mtime < before)
751 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
752 s = s.where(v.c.cluster != except_cluster)
753 s = s.where(v.c.node.in_(select([self.nodes.c.node],
754 self.nodes.c.parent == parent)))
755 s = s.where(a.c.serial == v.c.serial)
756 s = s.where(a.c.domain == domain)
757 s = s.where(n.c.node == v.c.node)
760 conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
762 s = s.where(or_(*conj))
763 rp = self.conn.execute(s)
766 return [r[0] for r in rows]
768 def latest_version_list(self, parent, prefix='', delimiter=None,
769 start='', limit=10000, before=inf,
770 except_cluster=0, pathq=[], domain=None, filterq=[]):
771 """Return a (list of (path, serial) tuples, list of common prefixes)
772 for the current versions of the paths with the given parent,
773 matching the following criteria.
775 The property tuple for a version is returned if all
776 of these conditions are true:
782 c. path starts with prefix (and paths in pathq)
784 d. version is the max up to before
786 e. version is not in cluster
788 f. the path does not have the delimiter occuring
789 after the prefix, or ends with the delimiter
791 g. serial matches the attribute filter query.
793 A filter query is a comma-separated list of
794 terms in one of these three forms:
797 an attribute with this key must exist
800 an attribute with this key must not exist
803 the attribute with this key satisfies the value
804 where ?op is one of ==, != <=, >=, <, >.
806 The list of common prefixes includes the prefixes
807 matching up to the first delimiter after prefix,
808 and are reported only once, as "virtual directories".
809 The delimiter is included in the prefixes.
811 If arguments are None, then the corresponding matching rule
814 Limit applies to the first list of tuples returned.
817 if not start or start < prefix:
818 start = strprevling(prefix)
819 nextling = strnextling(prefix)
821 a = self.attributes.alias('a')
822 v = self.versions.alias('v')
823 n = self.nodes.alias('n')
824 s = select([n.c.path, v.c.serial]).distinct()
825 filtered = select([func.max(self.versions.c.serial)])
827 filtered = filtered.where(self.versions.c.mtime < before)
828 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
829 s = s.where(v.c.cluster != except_cluster)
830 s = s.where(v.c.node.in_(select([self.nodes.c.node],
831 self.nodes.c.parent == parent)))
833 s = s.where(n.c.node == v.c.node)
834 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
837 conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
840 s = s.where(or_(*conj))
842 s = s.order_by(n.c.path)
848 included, excluded, opers = parse_filters(filterq)
851 s = select([a.c.key, a.c.value])
852 s = s.where(a.c.domain == domain)
853 s = s.where(a.c.serial == serial)
854 rp = self.conn.execute(s)
855 meta = dict(rp.fetchall())
856 keyset= set([k.encode('utf8') for k in meta.keys()])
859 if not set(included) & keyset:
862 if set(excluded) & keyset:
864 for k, op, v in opers:
869 operation = OPERATORS[op]
870 if not operation(meta[k], v):
876 rp = self.conn.execute(s, start=start)
877 filter_ = lambda r : [t for t in r if not filterout(t)]
878 r = filter_(rp.fetchall())
886 pappend = prefixes.append
888 mappend = matches.append
890 rp = self.conn.execute(s, start=start)
892 props = rp.fetchone()
898 idx = path.find(delimiter, pfz)
907 if idx + dz == len(path):
910 continue # Get one more, in case there is a path.
916 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
919 return matches, prefixes
921 def latest_uuid(self, uuid):
922 """Return a (path, serial) tuple, for the latest version of the given uuid."""
924 v = self.versions.alias('v')
925 n = self.nodes.alias('n')
926 s = select([n.c.path, v.c.serial])
927 filtered = select([func.max(self.versions.c.serial)])
928 s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
929 s = s.where(n.c.node == v.c.node)
931 r = self.conn.execute(s)