1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
42 from dbworker import DBWorker
44 from pithos.backends.filter import parse_filters
49 ( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
51 ( MATCH_PREFIX, MATCH_EXACT ) = range(2)
56 def strnextling(prefix):
57 """Return the first unicode string
58 greater than but not starting with given prefix.
59 strnextling('hello') -> 'hellp'
62 ## all strings start with the null string,
63 ## therefore we have to approximate strnextling('')
64 ## with the last unicode character supported by python
65 ## 0x10ffff for wide (32-bit unicode) python builds
66 ## 0x00ffff for narrow (16-bit unicode) python builds
67 ## We will not autodetect. 0xffff is safe enough.
76 def strprevling(prefix):
77 """Return an approximation of the last unicode string
78 less than but not starting with given prefix.
79 strprevling(u'hello') -> u'helln\\xffff'
82 ## There is no prevling for the null string
87 s += unichr(c-1) + unichr(0xffff)
105 class Node(DBWorker):
106 """Nodes store path organization and have multiple versions.
107 Versions store object history and have multiple attributes.
108 Attributes store metadata.
111 # TODO: Provide an interface for included and excluded clusters.
113 def __init__(self, **params):
114 DBWorker.__init__(self, **params)
115 metadata = MetaData()
119 columns.append(Column('node', Integer, primary_key=True))
120 columns.append(Column('parent', Integer,
121 ForeignKey('nodes.node',
124 autoincrement=False))
125 columns.append(Column('path', String(2048), default='', nullable=False))
126 self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
127 Index('idx_nodes_path', self.nodes.c.path, unique=True)
131 columns.append(Column('node', Integer,
132 ForeignKey('nodes.node',
136 columns.append(Column('key', String(128), primary_key=True))
137 columns.append(Column('value', String(256)))
138 self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
140 #create statistics table
142 columns.append(Column('node', Integer,
143 ForeignKey('nodes.node',
147 columns.append(Column('population', Integer, nullable=False, default=0))
148 columns.append(Column('size', BigInteger, nullable=False, default=0))
149 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
150 columns.append(Column('cluster', Integer, nullable=False, default=0,
151 primary_key=True, autoincrement=False))
152 self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
154 #create versions table
156 columns.append(Column('serial', Integer, primary_key=True))
157 columns.append(Column('node', Integer,
158 ForeignKey('nodes.node',
160 onupdate='CASCADE')))
161 columns.append(Column('hash', String(256)))
162 columns.append(Column('size', BigInteger, nullable=False, default=0))
163 columns.append(Column('type', String(256), nullable=False, default=''))
164 columns.append(Column('source', Integer))
165 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
166 columns.append(Column('muser', String(256), nullable=False, default=''))
167 columns.append(Column('uuid', String(64), nullable=False, default=''))
168 columns.append(Column('checksum', String(256), nullable=False, default=''))
169 columns.append(Column('cluster', Integer, nullable=False, default=0))
170 self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
171 Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
172 Index('idx_versions_node_uuid', self.versions.c.uuid)
174 #create attributes table
176 columns.append(Column('serial', Integer,
177 ForeignKey('versions.serial',
181 columns.append(Column('domain', String(256), primary_key=True))
182 columns.append(Column('key', String(128), primary_key=True))
183 columns.append(Column('value', String(256)))
184 self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
186 metadata.create_all(self.engine)
188 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
189 self.nodes.c.parent == ROOTNODE))
190 rp = self.conn.execute(s)
194 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
197 def node_create(self, parent, path):
198 """Create a new node from the given properties.
199 Return the node identifier of the new node.
201 #TODO catch IntegrityError?
202 s = self.nodes.insert().values(parent=parent, path=path)
203 r = self.conn.execute(s)
204 inserted_primary_key = r.inserted_primary_key[0]
206 return inserted_primary_key
208 def node_lookup(self, path):
209 """Lookup the current node of the given path.
210 Return None if the path is not found.
213 # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
214 s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
215 r = self.conn.execute(s)
222 def node_lookup_bulk(self, paths):
223 """Lookup the current nodes for the given paths.
224 Return () if the path is not found.
227 # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
228 s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
229 r = self.conn.execute(s)
232 return [row[0] for row in rows]
234 def node_get_properties(self, node):
235 """Return the node's (parent, path).
236 Return None if the node is not found.
239 s = select([self.nodes.c.parent, self.nodes.c.path])
240 s = s.where(self.nodes.c.node == node)
241 r = self.conn.execute(s)
246 def node_get_versions(self, node, keys=(), propnames=_propnames):
247 """Return the properties of all versions at node.
248 If keys is empty, return all properties in the order
249 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
252 s = select([self.versions.c.serial,
253 self.versions.c.node,
254 self.versions.c.hash,
255 self.versions.c.size,
256 self.versions.c.type,
257 self.versions.c.source,
258 self.versions.c.mtime,
259 self.versions.c.muser,
260 self.versions.c.uuid,
261 self.versions.c.checksum,
262 self.versions.c.cluster], self.versions.c.node == node)
263 s = s.order_by(self.versions.c.serial)
264 r = self.conn.execute(s)
273 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
275 def node_count_children(self, node):
276 """Return node's child count."""
278 s = select([func.count(self.nodes.c.node)])
279 s = s.where(and_(self.nodes.c.parent == node,
280 self.nodes.c.node != ROOTNODE))
281 r = self.conn.execute(s)
286 def node_purge_children(self, parent, before=inf, cluster=0):
287 """Delete all versions with the specified
288 parent and cluster, and return
289 the hashes and size of versions deleted.
290 Clears out nodes with no remaining versions.
293 c1 = select([self.nodes.c.node],
294 self.nodes.c.parent == parent)
295 where_clause = and_(self.versions.c.node.in_(c1),
296 self.versions.c.cluster == cluster)
297 s = select([func.count(self.versions.c.serial),
298 func.sum(self.versions.c.size)])
299 s = s.where(where_clause)
301 s = s.where(self.versions.c.mtime <= before)
302 r = self.conn.execute(s)
307 nr, size = row[0], -row[1] if row[1] else 0
309 self.statistics_update(parent, -nr, size, mtime, cluster)
310 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
312 s = select([self.versions.c.hash])
313 s = s.where(where_clause)
314 r = self.conn.execute(s)
315 hashes = [row[0] for row in r.fetchall()]
319 s = self.versions.delete().where(where_clause)
320 r = self.conn.execute(s)
324 s = select([self.nodes.c.node],
325 and_(self.nodes.c.parent == parent,
326 select([func.count(self.versions.c.serial)],
327 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
328 rp = self.conn.execute(s)
329 nodes = [r[0] for r in rp.fetchall()]
331 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
332 self.conn.execute(s).close()
336 def node_purge(self, node, before=inf, cluster=0):
337 """Delete all versions with the specified
338 node and cluster, and return
339 the hashes and size of versions deleted.
340 Clears out the node if it has no remaining versions.
344 s = select([func.count(self.versions.c.serial),
345 func.sum(self.versions.c.size)])
346 where_clause = and_(self.versions.c.node == node,
347 self.versions.c.cluster == cluster)
348 s = s.where(where_clause)
350 s = s.where(self.versions.c.mtime <= before)
351 r = self.conn.execute(s)
353 nr, size = row[0], row[1]
358 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
360 s = select([self.versions.c.hash])
361 s = s.where(where_clause)
362 r = self.conn.execute(s)
363 hashes = [r[0] for r in r.fetchall()]
367 s = self.versions.delete().where(where_clause)
368 r = self.conn.execute(s)
372 s = select([self.nodes.c.node],
373 and_(self.nodes.c.node == node,
374 select([func.count(self.versions.c.serial)],
375 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
376 r = self.conn.execute(s)
379 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
380 self.conn.execute(s).close()
384 def node_remove(self, node):
385 """Remove the node specified.
386 Return false if the node has children or is not found.
389 if self.node_count_children(node):
393 s = select([func.count(self.versions.c.serial),
394 func.sum(self.versions.c.size),
395 self.versions.c.cluster])
396 s = s.where(self.versions.c.node == node)
397 s = s.group_by(self.versions.c.cluster)
398 r = self.conn.execute(s)
399 for population, size, cluster in r.fetchall():
400 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
403 s = self.nodes.delete().where(self.nodes.c.node == node)
404 self.conn.execute(s).close()
407 def policy_get(self, node):
408 s = select([self.policies.c.key, self.policies.c.value],
409 self.policies.c.node==node)
410 r = self.conn.execute(s)
411 d = dict(r.fetchall())
415 def policy_set(self, node, policy):
417 for k, v in policy.iteritems():
418 s = self.policies.update().where(and_(self.policies.c.node == node,
419 self.policies.c.key == k))
420 s = s.values(value = v)
421 rp = self.conn.execute(s)
424 s = self.policies.insert()
425 values = {'node':node, 'key':k, 'value':v}
426 r = self.conn.execute(s, values)
429 def statistics_get(self, node, cluster=0):
430 """Return population, total size and last mtime
431 for all versions under node that belong to the cluster.
434 s = select([self.statistics.c.population,
435 self.statistics.c.size,
436 self.statistics.c.mtime])
437 s = s.where(and_(self.statistics.c.node == node,
438 self.statistics.c.cluster == cluster))
439 r = self.conn.execute(s)
444 def statistics_update(self, node, population, size, mtime, cluster=0):
445 """Update the statistics of the given node.
446 Statistics keep track the population, total
447 size of objects and mtime in the node's namespace.
448 May be zero or positive or negative numbers.
450 s = select([self.statistics.c.population, self.statistics.c.size],
451 and_(self.statistics.c.node == node,
452 self.statistics.c.cluster == cluster))
453 rp = self.conn.execute(s)
457 prepopulation, presize = (0, 0)
459 prepopulation, presize = r
460 population += prepopulation
465 u = self.statistics.update().where(and_(self.statistics.c.node==node,
466 self.statistics.c.cluster==cluster))
467 u = u.values(population=population, size=size, mtime=mtime)
468 rp = self.conn.execute(u)
471 ins = self.statistics.insert()
472 ins = ins.values(node=node, population=population, size=size,
473 mtime=mtime, cluster=cluster)
474 self.conn.execute(ins).close()
476 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
477 """Update the statistics of the given node's parent.
478 Then recursively update all parents up to the root.
479 Population is not recursive.
485 props = self.node_get_properties(node)
489 self.statistics_update(parent, population, size, mtime, cluster)
491 population = 0 # Population isn't recursive
493 def statistics_latest(self, node, before=inf, except_cluster=0):
494 """Return population, total size and last mtime
495 for all latest versions under node that
496 do not belong to the cluster.
500 props = self.node_get_properties(node)
505 # The latest version.
506 s = select([self.versions.c.serial,
507 self.versions.c.node,
508 self.versions.c.hash,
509 self.versions.c.size,
510 self.versions.c.type,
511 self.versions.c.source,
512 self.versions.c.mtime,
513 self.versions.c.muser,
514 self.versions.c.uuid,
515 self.versions.c.checksum,
516 self.versions.c.cluster])
517 filtered = select([func.max(self.versions.c.serial)],
518 self.versions.c.node == node)
520 filtered = filtered.where(self.versions.c.mtime < before)
521 s = s.where(and_(self.versions.c.cluster != except_cluster,
522 self.versions.c.serial == filtered))
523 r = self.conn.execute(s)
530 # First level, just under node (get population).
531 v = self.versions.alias('v')
532 s = select([func.count(v.c.serial),
534 func.max(v.c.mtime)])
535 c1 = select([func.max(self.versions.c.serial)])
537 c1 = c1.where(self.versions.c.mtime < before)
538 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
539 s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
540 v.c.cluster != except_cluster,
542 rp = self.conn.execute(s)
548 mtime = max(mtime, r[2])
552 # All children (get size and mtime).
553 # This is why the full path is stored.
554 s = select([func.count(v.c.serial),
556 func.max(v.c.mtime)])
557 c1 = select([func.max(self.versions.c.serial)],
558 self.versions.c.node == v.c.node)
560 c1 = c1.where(self.versions.c.mtime < before)
561 c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
562 s = s.where(and_(v.c.serial == c1,
563 v.c.cluster != except_cluster,
565 rp = self.conn.execute(s)
570 size = r[1] - props[SIZE]
571 mtime = max(mtime, r[2])
572 return (count, size, mtime)
574 def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
575 """Create a new version from the given properties.
576 Return the (serial, mtime) of the new version.
580 s = self.versions.insert().values(node=node, hash=hash, size=size, type=type, source=source,
581 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
582 serial = self.conn.execute(s).inserted_primary_key[0]
583 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
586 def version_lookup(self, node, before=inf, cluster=0, all_props=True):
587 """Lookup the current version of the given node.
588 Return a list with its properties:
589 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
590 or None if the current version is not found in the given cluster.
593 v = self.versions.alias('v')
595 s = select([v.c.serial])
597 s = select([v.c.serial, v.c.node, v.c.hash,
598 v.c.size, v.c.type, v.c.source,
599 v.c.mtime, v.c.muser, v.c.uuid,
600 v.c.checksum, v.c.cluster])
601 c = select([func.max(self.versions.c.serial)],
602 self.versions.c.node == node)
604 c = c.where(self.versions.c.mtime < before)
605 s = s.where(and_(v.c.serial == c,
606 v.c.cluster == cluster))
607 r = self.conn.execute(s)
614 def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
615 """Lookup the current versions of the given nodes.
616 Return a list with their properties:
617 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
620 v = self.versions.alias('v')
622 s = select([v.c.serial])
624 s = select([v.c.serial, v.c.node, v.c.hash,
625 v.c.size, v.c.type, v.c.source,
626 v.c.mtime, v.c.muser, v.c.uuid,
627 v.c.checksum, v.c.cluster])
628 c = select([func.max(self.versions.c.serial)],
629 self.versions.c.node.in_(nodes)).group_by(self.versions.c.node)
631 c = c.where(self.versions.c.mtime < before)
632 s = s.where(and_(v.c.serial.in_(c),
633 v.c.cluster == cluster))
634 r = self.conn.execute(s)
635 rproxy = r.fetchall()
637 return (tuple(row.values()) for row in rproxy)
639 def version_get_properties(self, serial, keys=(), propnames=_propnames):
640 """Return a sequence of values for the properties of
641 the version specified by serial and the keys, in the order given.
642 If keys is empty, return all properties in the order
643 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
646 v = self.versions.alias()
647 s = select([v.c.serial, v.c.node, v.c.hash,
648 v.c.size, v.c.type, v.c.source,
649 v.c.mtime, v.c.muser, v.c.uuid,
650 v.c.checksum, v.c.cluster], v.c.serial == serial)
651 rp = self.conn.execute(s)
659 return [r[propnames[k]] for k in keys if k in propnames]
661 def version_put_property(self, serial, key, value):
662 """Set value for the property of version specified by key."""
664 if key not in _propnames:
666 s = self.versions.update()
667 s = s.where(self.versions.c.serial == serial)
668 s = s.values(**{key: value})
669 self.conn.execute(s).close()
671 def version_recluster(self, serial, cluster):
672 """Move the version into another cluster."""
674 props = self.version_get_properties(serial)
679 oldcluster = props[CLUSTER]
680 if cluster == oldcluster:
684 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
685 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
687 s = self.versions.update()
688 s = s.where(self.versions.c.serial == serial)
689 s = s.values(cluster = cluster)
690 self.conn.execute(s).close()
692 def version_remove(self, serial):
693 """Remove the serial specified."""
695 props = self.version_get_properties(serial)
701 cluster = props[CLUSTER]
704 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
706 s = self.versions.delete().where(self.versions.c.serial == serial)
707 self.conn.execute(s).close()
710 def attribute_get(self, serial, domain, keys=()):
711 """Return a list of (key, value) pairs of the version specified by serial.
712 If keys is empty, return all attributes.
713 Othwerise, return only those specified.
717 attrs = self.attributes.alias()
718 s = select([attrs.c.key, attrs.c.value])
719 s = s.where(and_(attrs.c.key.in_(keys),
720 attrs.c.serial == serial,
721 attrs.c.domain == domain))
723 attrs = self.attributes.alias()
724 s = select([attrs.c.key, attrs.c.value])
725 s = s.where(and_(attrs.c.serial == serial,
726 attrs.c.domain == domain))
727 r = self.conn.execute(s)
732 def attribute_set(self, serial, domain, items):
733 """Set the attributes of the version specified by serial.
734 Receive attributes as an iterable of (key, value) pairs.
739 s = self.attributes.update()
740 s = s.where(and_(self.attributes.c.serial == serial,
741 self.attributes.c.domain == domain,
742 self.attributes.c.key == k))
743 s = s.values(value = v)
744 rp = self.conn.execute(s)
747 s = self.attributes.insert()
748 s = s.values(serial=serial, domain=domain, key=k, value=v)
749 self.conn.execute(s).close()
751 def attribute_del(self, serial, domain, keys=()):
752 """Delete attributes of the version specified by serial.
753 If keys is empty, delete all attributes.
754 Otherwise delete those specified.
758 #TODO more efficient way to do this?
760 s = self.attributes.delete()
761 s = s.where(and_(self.attributes.c.serial == serial,
762 self.attributes.c.domain == domain,
763 self.attributes.c.key == key))
764 self.conn.execute(s).close()
766 s = self.attributes.delete()
767 s = s.where(and_(self.attributes.c.serial == serial,
768 self.attributes.c.domain == domain))
769 self.conn.execute(s).close()
771 def attribute_copy(self, source, dest):
772 s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
773 self.attributes.c.serial == source)
774 rp = self.conn.execute(s)
775 attributes = rp.fetchall()
777 for dest, domain, k, v in attributes:
779 s = self.attributes.update().where(and_(
780 self.attributes.c.serial == dest,
781 self.attributes.c.domain == domain,
782 self.attributes.c.key == k))
783 rp = self.conn.execute(s, value=v)
786 s = self.attributes.insert()
787 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
788 self.conn.execute(s, values).close()
790 def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
791 """Return a list with all keys pairs defined
792 for all latest versions under parent that
793 do not belong to the cluster.
796 # TODO: Use another table to store before=inf results.
797 a = self.attributes.alias('a')
798 v = self.versions.alias('v')
799 n = self.nodes.alias('n')
800 s = select([a.c.key]).distinct()
801 filtered = select([func.max(self.versions.c.serial)])
803 filtered = filtered.where(self.versions.c.mtime < before)
804 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
805 s = s.where(v.c.cluster != except_cluster)
806 s = s.where(v.c.node.in_(select([self.nodes.c.node],
807 self.nodes.c.parent == parent)))
808 s = s.where(a.c.serial == v.c.serial)
809 s = s.where(a.c.domain == domain)
810 s = s.where(n.c.node == v.c.node)
812 for path, match in pathq:
813 if match == MATCH_PREFIX:
814 conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
815 elif match == MATCH_EXACT:
816 conj.append(n.c.path == path)
818 s = s.where(or_(*conj))
819 rp = self.conn.execute(s)
822 return [r[0] for r in rows]
824 def latest_version_list(self, parent, prefix='', delimiter=None,
825 start='', limit=10000, before=inf,
826 except_cluster=0, pathq=[], domain=None,
827 filterq=[], sizeq=None, all_props=False):
828 """Return a (list of (path, serial) tuples, list of common prefixes)
829 for the current versions of the paths with the given parent,
830 matching the following criteria.
832 The property tuple for a version is returned if all
833 of these conditions are true:
839 c. path starts with prefix (and paths in pathq)
841 d. version is the max up to before
843 e. version is not in cluster
845 f. the path does not have the delimiter occuring
846 after the prefix, or ends with the delimiter
848 g. serial matches the attribute filter query.
850 A filter query is a comma-separated list of
851 terms in one of these three forms:
854 an attribute with this key must exist
857 an attribute with this key must not exist
860 the attribute with this key satisfies the value
861 where ?op is one of ==, != <=, >=, <, >.
863 h. the size is in the range set by sizeq
865 The list of common prefixes includes the prefixes
866 matching up to the first delimiter after prefix,
867 and are reported only once, as "virtual directories".
868 The delimiter is included in the prefixes.
870 If arguments are None, then the corresponding matching rule
873 Limit applies to the first list of tuples returned.
875 If all_props is True, return all properties after path, not just serial.
878 if not start or start < prefix:
879 start = strprevling(prefix)
880 nextling = strnextling(prefix)
882 v = self.versions.alias('v')
883 n = self.nodes.alias('n')
885 s = select([n.c.path, v.c.serial]).distinct()
887 s = select([n.c.path,
888 v.c.serial, v.c.node, v.c.hash,
889 v.c.size, v.c.type, v.c.source,
890 v.c.mtime, v.c.muser, v.c.uuid,
891 v.c.checksum, v.c.cluster]).distinct()
892 filtered = select([func.max(self.versions.c.serial)])
894 filtered = filtered.where(self.versions.c.mtime < before)
895 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
896 s = s.where(v.c.cluster != except_cluster)
897 s = s.where(v.c.node.in_(select([self.nodes.c.node],
898 self.nodes.c.parent == parent)))
900 s = s.where(n.c.node == v.c.node)
901 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
903 for path, match in pathq:
904 if match == MATCH_PREFIX:
905 conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
906 elif match == MATCH_EXACT:
907 conj.append(n.c.path == path)
909 s = s.where(or_(*conj))
911 if sizeq and len(sizeq) == 2:
913 s = s.where(v.c.size >= sizeq[0])
915 s = s.where(v.c.size < sizeq[1])
917 if domain and filterq:
918 a = self.attributes.alias('a')
919 included, excluded, opers = parse_filters(filterq)
922 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
923 subs = subs.where(a.c.domain == domain)
924 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
925 s = s.where(exists(subs))
928 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
929 subs = subs.where(a.c.domain == domain)
930 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
931 s = s.where(not_(exists(subs)))
933 for k, o, val in opers:
935 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
936 subs = subs.where(a.c.domain == domain)
937 subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
938 s = s.where(exists(subs))
940 s = s.order_by(n.c.path)
944 rp = self.conn.execute(s, start=start)
953 pappend = prefixes.append
955 mappend = matches.append
957 rp = self.conn.execute(s, start=start)
959 props = rp.fetchone()
964 idx = path.find(delimiter, pfz)
973 if idx + dz == len(path):
976 continue # Get one more, in case there is a path.
982 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
985 return matches, prefixes
987 def latest_uuid(self, uuid):
988 """Return a (path, serial) tuple, for the latest version of the given uuid."""
990 v = self.versions.alias('v')
991 n = self.nodes.alias('n')
992 s = select([n.c.path, v.c.serial])
993 filtered = select([func.max(self.versions.c.serial)])
994 s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
995 s = s.where(n.c.node == v.c.node)
997 r = self.conn.execute(s)