Make AlchemyBackend a ModularBackend module.
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, Float, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
39
40 from dbworker import DBWorker
41
42 ROOTNODE  = 0
43
44 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
45
46 inf = float('inf')
47
48
49 def strnextling(prefix):
50     """Return the first unicode string
51        greater than but not starting with given prefix.
52        strnextling('hello') -> 'hellp'
53     """
54     if not prefix:
55         ## all strings start with the null string,
56         ## therefore we have to approximate strnextling('')
57         ## with the last unicode character supported by python
58         ## 0x10ffff for wide (32-bit unicode) python builds
59         ## 0x00ffff for narrow (16-bit unicode) python builds
60         ## We will not autodetect. 0xffff is safe enough.
61         return unichr(0xffff)
62     s = prefix[:-1]
63     c = ord(prefix[-1])
64     if c >= 0xffff:
65         raise RuntimeError
66     s += unichr(c+1)
67     return s
68
69 def strprevling(prefix):
70     """Return an approximation of the last unicode string
71        less than but not starting with given prefix.
72        strprevling(u'hello') -> u'helln\\xffff'
73     """
74     if not prefix:
75         ## There is no prevling for the null string
76         return prefix
77     s = prefix[:-1]
78     c = ord(prefix[-1])
79     if c > 0:
80         #s += unichr(c-1) + unichr(0xffff)
81         s += unichr(c-1)
82     return s
83
84
85 _propnames = {
86     'serial'    : 0,
87     'node'      : 1,
88     'size'      : 2,
89     'source'    : 3,
90     'mtime'     : 4,
91     'muser'     : 5,
92     'cluster'   : 6,
93 }
94
95
96 class Node(DBWorker):
97     """Nodes store path organization and have multiple versions.
98        Versions store object history and have multiple attributes.
99        Attributes store metadata.
100     """
101     
102     # TODO: Provide an interface for included and excluded clusters.
103     
104     def __init__(self, **params):
105         DBWorker.__init__(self, **params)
106         metadata = MetaData()
107         
108         #create nodes table
109         columns=[]
110         columns.append(Column('node', Integer, primary_key=True))
111         columns.append(Column('parent', Integer,
112                               ForeignKey('nodes.node',
113                                          ondelete='CASCADE',
114                                          onupdate='CASCADE'),
115                               autoincrement=False))
116         columns.append(Column('path', String(2048), default='', nullable=False))
117         self.nodes = Table('nodes', metadata, *columns)
118         # place an index on path
119         Index('idx_nodes_path', self.nodes.c.path, unique=True)
120         
121         #create statistics table
122         columns=[]
123         columns.append(Column('node', Integer,
124                               ForeignKey('nodes.node',
125                                          ondelete='CASCADE',
126                                          onupdate='CASCADE'),
127                               primary_key=True))
128         columns.append(Column('population', Integer, nullable=False, default=0))
129         columns.append(Column('size', Integer, nullable=False, default=0))
130         columns.append(Column('mtime', Float))
131         columns.append(Column('cluster', Integer, nullable=False, default=0,
132                               primary_key=True))
133         self.statistics = Table('statistics', metadata, *columns)
134         
135         #create versions table
136         columns=[]
137         columns.append(Column('serial', Integer, primary_key=True))
138         columns.append(Column('node', Integer,
139                               ForeignKey('nodes.node',
140                                          ondelete='CASCADE',
141                                          onupdate='CASCADE')))
142         columns.append(Column('size', Integer, nullable=False, default=0))
143         columns.append(Column('source', Integer))
144         columns.append(Column('mtime', Float))
145         columns.append(Column('muser', String(255), nullable=False, default=''))
146         columns.append(Column('cluster', Integer, nullable=False, default=0))
147         self.versions = Table('versions', metadata, *columns)
148         Index('idx_versions_node_mtime', self.versions.c.node,
149               self.versions.c.mtime)
150         
151         #create attributes table
152         columns = []
153         columns.append(Column('serial', Integer,
154                               ForeignKey('versions.serial',
155                                          ondelete='CASCADE',
156                                          onupdate='CASCADE'),
157                               primary_key=True))
158         columns.append(Column('key', String(255), primary_key=True))
159         columns.append(Column('value', String(255)))
160         self.attributes = Table('attributes', metadata, *columns)
161         
162         metadata.create_all(self.engine)
163         
164         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
165                                            self.nodes.c.parent == ROOTNODE))
166         r = self.conn.execute(s).fetchone()
167         if not r:
168             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
169             self.conn.execute(s)
170     
171     def node_create(self, parent, path):
172         """Create a new node from the given properties.
173            Return the node identifier of the new node.
174         """
175         #TODO catch IntegrityError?
176         s = self.nodes.insert().values(parent=parent, path=path)
177         r = self.conn.execute(s)
178         inserted_primary_key = r.inserted_primary_key[0]
179         r.close()
180         return inserted_primary_key
181     
182     def node_lookup(self, path):
183         """Lookup the current node of the given path.
184            Return None if the path is not found.
185         """
186         
187         s = select([self.nodes.c.node], self.nodes.c.path == path)
188         r = self.conn.execute(s)
189         row = r.fetchone()
190         r.close()
191         if row:
192             return row[0]
193         return None
194     
195     def node_get_properties(self, node):
196         """Return the node's (parent, path).
197            Return None if the node is not found.
198         """
199         
200         s = select([self.nodes.c.parent, self.nodes.c.path])
201         s = s.where(self.nodes.c.node == node)
202         r = self.conn.execute(s)
203         l = r.fetchone()
204         r.close()
205         return l
206     
207     def node_get_versions(self, node, keys=(), propnames=_propnames):
208         """Return the properties of all versions at node.
209            If keys is empty, return all properties in the order
210            (serial, node, size, source, mtime, muser, cluster).
211         """
212         
213         s = select(['*'], self.versions.c.node == node)
214         s = s.order_by(self.versions.c.serial)
215         r = self.conn.execute(s)
216         rows = r.fetchall()
217         if not rows:
218             return rows
219         
220         if not keys:
221             return rows
222         
223         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
224     
225     def node_count_children(self, node):
226         """Return node's child count."""
227         
228         s = select([func.count(self.nodes.c.node)])
229         s = s.where(and_(self.nodes.c.parent == node,
230                          self.nodes.c.node != ROOTNODE))
231         r = self.conn.execute(s)
232         row = r.fetchone()
233         r.close()
234         return row[0]
235     
236     def node_purge_children(self, parent, before=inf, cluster=0):
237         """Delete all versions with the specified
238            parent and cluster, and return
239            the serials of versions deleted.
240            Clears out nodes with no remaining versions.
241         """
242         #update statistics
243         c1 = select([self.nodes.c.node],
244             self.nodes.c.parent == parent)
245         where_clause = and_(self.versions.c.node.in_(c1),
246                             self.versions.c.cluster == cluster,
247                             self.versions.c.mtime <= before)
248         s = select([func.count(self.versions.c.serial),
249                     func.sum(self.versions.c.size)])
250         s = s.where(where_clause)
251         r = self.conn.execute(s)
252         row = r.fetchone()
253         r.close()
254         if not row:
255             return ()
256         nr, size = row[0], -row[1] if row[1] else 0
257         mtime = time()
258         self.statistics_update(parent, -nr, size, mtime, cluster)
259         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
260         
261         s = select([self.versions.c.serial])
262         s = s.where(where_clause)
263         r = self.conn.execute(s)
264         serials = [row[SERIAL] for row in r.fetchall()]
265         r.close()
266         
267         #delete versions
268         s = self.versions.delete().where(where_clause)
269         r = self.conn.execute(s)
270         r.close()
271         
272         #delete nodes
273         s = select([self.nodes.c.node],
274             and_(self.nodes.c.parent == parent,
275                  select([func.count(self.versions.c.serial)],
276                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
277         rp = self.conn.execute(s)
278         nodes = [r[0] for r in rp.fetchall()]
279         rp.close()
280         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
281         self.conn.execute(s).close()
282         
283         return serials
284     
285     def node_purge(self, node, before=inf, cluster=0):
286         """Delete all versions with the specified
287            node and cluster, and return
288            the serials of versions deleted.
289            Clears out the node if it has no remaining versions.
290         """
291         
292         #update statistics
293         s = select([func.count(self.versions.c.serial),
294                     func.sum(self.versions.c.size)])
295         where_clause = and_(self.versions.c.node == node,
296                          self.versions.c.cluster == cluster,
297                          self.versions.c.mtime <= before)
298         s = s.where(where_clause)
299         r = self.conn.execute(s)
300         row = r.fetchone()
301         nr, size = row[0], row[1]
302         r.close()
303         if not nr:
304             return ()
305         mtime = time()
306         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
307         
308         s = select([self.versions.c.serial])
309         s = s.where(where_clause)
310         r = self.conn.execute(s)
311         serials = [r[SERIAL] for r in r.fetchall()]
312         
313         #delete versions
314         s = self.versions.delete().where(where_clause)
315         r = self.conn.execute(s)
316         r.close()
317         
318         #delete nodes
319         s = select([self.nodes.c.node],
320             and_(self.nodes.c.node == node,
321                  select([func.count(self.versions.c.serial)],
322                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
323         r = self.conn.execute(s)
324         nodes = r.fetchall()
325         r.close()
326         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
327         self.conn.execute(s).close()
328         
329         return serials
330     
331     def node_remove(self, node):
332         """Remove the node specified.
333            Return false if the node has children or is not found.
334         """
335         
336         if self.node_count_children(node):
337             return False
338         
339         mtime = time()
340         s = select([func.count(self.versions.c.serial),
341                     func.sum(self.versions.c.size),
342                     self.versions.c.cluster])
343         s = s.where(self.versions.c.node == node)
344         s = s.group_by(self.versions.c.cluster)
345         r = self.conn.execute(s)
346         for population, size, cluster in r.fetchall():
347             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
348         r.close()
349         
350         s = self.nodes.delete().where(self.nodes.c.node == node)
351         self.conn.execute(s).close()
352         return True
353     
354     def statistics_get(self, node, cluster=0):
355         """Return population, total size and last mtime
356            for all versions under node that belong to the cluster.
357         """
358         
359         s = select([self.statistics.c.population,
360                     self.statistics.c.size,
361                     self.statistics.c.mtime])
362         s = s.where(and_(self.statistics.c.node == node,
363                          self.statistics.c.cluster == cluster))
364         r = self.conn.execute(s)
365         row = r.fetchone()
366         r.close()
367         return row
368     
369     def statistics_update(self, node, population, size, mtime, cluster=0):
370         """Update the statistics of the given node.
371            Statistics keep track the population, total
372            size of objects and mtime in the node's namespace.
373            May be zero or positive or negative numbers.
374         """
375         
376         s = select([self.statistics.c.population, self.statistics.c.size],
377             and_(self.statistics.c.node == node,
378                  self.statistics.c.cluster == cluster))
379         rp = self.conn.execute(s)
380         r = rp.fetchone()
381         rp.close()
382         if not r:
383             prepopulation, presize = (0, 0)
384         else:
385             prepopulation, presize = r
386         population += prepopulation
387         size += presize
388         
389         #insert or replace
390         #TODO better upsert
391         u = self.statistics.update().where(and_(self.statistics.c.node==node,
392                                            self.statistics.c.cluster==cluster))
393         u = u.values(population=population, size=size, mtime=mtime)
394         rp = self.conn.execute(u)
395         rp.close()
396         if rp.rowcount == 0:
397             ins = self.statistics.insert()
398             ins = ins.values(node=node, population=population, size=size,
399                              mtime=mtime, cluster=cluster)
400             self.conn.execute(ins).close()
401     
402     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
403         """Update the statistics of the given node's parent.
404            Then recursively update all parents up to the root.
405            Population is not recursive.
406         """
407         
408         while True:
409             if node == ROOTNODE:
410                 break
411             props = self.node_get_properties(node)
412             if props is None:
413                 break
414             parent, path = props
415             self.statistics_update(parent, population, size, mtime, cluster)
416             node = parent
417             population = 0 # Population isn't recursive
418     
419     def statistics_latest(self, node, before=inf, except_cluster=0):
420         """Return population, total size and last mtime
421            for all latest versions under node that
422            do not belong to the cluster.
423         """
424         
425         # The node.
426         props = self.node_get_properties(node)
427         if props is None:
428             return None
429         parent, path = props
430         
431         # The latest version.
432         s = select([self.versions.c.serial,
433                     self.versions.c.node,
434                     self.versions.c.size,
435                     self.versions.c.source,
436                     self.versions.c.mtime,
437                     self.versions.c.muser,
438                     self.versions.c.cluster])
439         s = s.where(and_(self.versions.c.cluster != except_cluster,
440                          self.versions.c.serial == select(
441                             [func.max(self.versions.c.serial)],
442                             and_(self.versions.c.node == node,
443                             self.versions.c.mtime < before))))
444         r = self.conn.execute(s)
445         props = r.fetchone()
446         r.close()
447         if not props:
448             return None
449         mtime = props[MTIME]
450         
451         # First level, just under node (get population).
452         v = self.versions.alias('v')
453         s = select([func.count(v.c.serial),
454                     func.sum(v.c.size),
455                     func.max(v.c.mtime)])
456         c1 = select([func.max(self.versions.c.serial)],
457             and_(self.versions.c.node == v.c.node,
458                  self.versions.c.mtime < before))
459         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
460         s = s.where(and_(v.c.serial == c1,
461                          v.c.cluster != except_cluster,
462                          v.c.node.in_(c2)))
463         rp = self.conn.execute(s)
464         r = rp.fetchone()
465         rp.close()
466         if not r:
467             return None
468         count = r[0]
469         mtime = max(mtime, r[2])
470         if count == 0:
471             return (0, 0, mtime)
472         
473         # All children (get size and mtime).
474         # XXX: This is why the full path is stored.
475         s = select([func.count(v.c.serial),
476                     func.sum(v.c.size),
477                     func.max(v.c.mtime)])
478         c1 = select([func.max(self.versions.c.serial)],
479             and_(self.versions.c.node == v.c.node,
480                  self.versions.c.mtime < before))
481         c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
482         s = s.where(and_(v.c.serial == c1,
483                          v.c.cluster != except_cluster,
484                          v.c.node.in_(c2)))
485         rp = self.conn.execute(s)
486         r = rp.fetchone()
487         rp.close()
488         if not r:
489             return None
490         size = r[1] - props[SIZE]
491         mtime = max(mtime, r[2])
492         return (count, size, mtime)
493     
494     def version_create(self, node, size, source, muser, cluster=0):
495         """Create a new version from the given properties.
496            Return the (serial, mtime) of the new version.
497         """
498         
499         mtime = time()
500         props = (node, size, source, mtime, muser, cluster)
501         props = locals()
502         props.pop('self')
503         s = self.versions.insert().values(**props)
504         serial = self.conn.execute(s).inserted_primary_key[0]
505         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
506         return serial, mtime
507     
508     def version_lookup(self, node, before=inf, cluster=0):
509         """Lookup the current version of the given node.
510            Return a list with its properties:
511            (serial, node, size, source, mtime, muser, cluster)
512            or None if the current version is not found in the given cluster.
513         """
514         
515         v = self.versions.alias('v')
516         s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
517                     v.c.muser, v.c.cluster])
518         c = select([func.max(self.versions.c.serial)],
519             and_(self.versions.c.node == node,
520                  self.versions.c.mtime < before))
521         s = s.where(and_(v.c.serial == c,
522                          v.c.cluster == cluster))
523         r = self.conn.execute(s)
524         props = r.fetchone()
525         r.close()
526         if props:
527             return props
528         return None
529     
530     def version_get_properties(self, serial, keys=(), propnames=_propnames):
531         """Return a sequence of values for the properties of
532            the version specified by serial and the keys, in the order given.
533            If keys is empty, return all properties in the order
534            (serial, node, size, source, mtime, muser, cluster).
535         """
536         
537         v = self.versions.alias()
538         s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
539                    v.c.muser, v.c.cluster], v.c.serial == serial)
540         rp = self.conn.execute(s)
541         r = rp.fetchone()
542         rp.close()
543         if r is None:
544             return r
545         
546         if not keys:
547             return r
548         return [r[propnames[k]] for k in keys if k in propnames]
549     
550     def version_recluster(self, serial, cluster):
551         """Move the version into another cluster."""
552         
553         props = self.version_get_properties(serial)
554         if not props:
555             return
556         node = props[NODE]
557         size = props[SIZE]
558         oldcluster = props[CLUSTER]
559         if cluster == oldcluster:
560             return
561         
562         mtime = time()
563         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
564         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
565         
566         s = self.versions.update()
567         s = s.where(self.versions.c.serial == serial)
568         s = s.values(cluster = cluster)
569         self.conn.execute(s).close()
570     
571     def version_remove(self, serial):
572         """Remove the serial specified."""
573         
574         props = self.node_get_properties(serial)
575         if not props:
576             return
577         node = props[NODE]
578         size = props[SIZE]
579         cluster = props[CLUSTER]
580         
581         mtime = time()
582         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
583         
584         s = self.versions.delete().where(self.versions.c.serial == serial)
585         self.conn.execute(s).close()
586         return True
587     
588     def attribute_get(self, serial, keys=()):
589         """Return a list of (key, value) pairs of the version specified by serial.
590            If keys is empty, return all attributes.
591            Othwerise, return only those specified.
592         """
593         
594         if keys:
595             attrs = self.attributes.alias()
596             s = select([attrs.c.key, attrs.c.value])
597             s = s.where(and_(attrs.c.key.in_(keys),
598                              attrs.c.serial == serial))
599         else:
600             attrs = self.attributes.alias()
601             s = select([attrs.c.key, attrs.c.value])
602             s = s.where(attrs.c.serial == serial)
603         r = self.conn.execute(s)
604         l = r.fetchall()
605         r.close()
606         return l
607     
608     def attribute_set(self, serial, items):
609         """Set the attributes of the version specified by serial.
610            Receive attributes as an iterable of (key, value) pairs.
611         """
612         #insert or replace
613         #TODO better upsert
614         for k, v in items:
615             s = self.attributes.update()
616             s = s.where(and_(self.attributes.c.serial == serial,
617                              self.attributes.c.key == k))
618             s = s.values(value = v)
619             rp = self.conn.execute(s)
620             rp.close()
621             if rp.rowcount == 0:
622                 s = self.attributes.insert()
623                 s = s.values(serial=serial, key=k, value=v)
624                 self.conn.execute(s).close()
625     
626     def attribute_del(self, serial, keys=()):
627         """Delete attributes of the version specified by serial.
628            If keys is empty, delete all attributes.
629            Otherwise delete those specified.
630         """
631         
632         if keys:
633             #TODO more efficient way to do this?
634             for key in keys:
635                 s = self.attributes.delete()
636                 s = s.where(and_(self.attributes.c.serial == serial,
637                                  self.attributes.c.key == key))
638                 self.conn.execute(s).close()
639         else:
640             s = self.attributes.delete()
641             s = s.where(self.attributes.c.serial == serial)
642             self.conn.execute(s).close()
643     
644     def attribute_copy(self, source, dest):
645         s = select([dest, self.attributes.c.key, self.attributes.c.value],
646             self.attributes.c.serial == source)
647         rp = self.conn.execute(s)
648         attributes = rp.fetchall()
649         rp.close()
650         for dest, k, v in attributes:
651             #insert or replace
652             s = self.attributes.update().where(and_(
653                 self.attributes.c.serial == dest,
654                 self.attributes.c.key == k))
655             rp = self.conn.execute(s, value=v)
656             rp.close()
657             if rp.rowcount == 0:
658                 s = self.attributes.insert()
659                 values = {'serial':dest, 'key':k, 'value':v}
660                 self.conn.execute(s, values).close()
661     
662     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
663         """Return a list with all keys pairs defined
664            for all latest versions under parent that
665            do not belong to the cluster.
666         """
667         
668         # TODO: Use another table to store before=inf results.
669         a = self.attributes.alias('a')
670         v = self.versions.alias('v')
671         n = self.nodes.alias('n')
672         s = select([a.c.key]).distinct()
673         s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
674                                           and_(self.versions.c.node == v.c.node,
675                                                self.versions.c.mtime < before)))
676         s = s.where(v.c.cluster != except_cluster)
677         s = s.where(v.c.node.in_(select([self.nodes.c.node],
678             self.nodes.c.parent == parent)))
679         s = s.where(a.c.serial == v.c.serial)
680         s = s.where(n.c.node == v.c.node)
681         conj = []
682         for x in pathq:
683             conj.append(n.c.path.like(x + '%'))
684         if conj:
685             s = s.where(or_(*conj))
686         rp = self.conn.execute(s)
687         rows = rp.fetchall()
688         rp.close()
689         return [r[0] for r in rows]
690     
691     def latest_version_list(self, parent, prefix='', delimiter=None,
692                             start='', limit=10000, before=inf,
693                             except_cluster=0, pathq=[], filterq=None):
694         """Return a (list of (path, serial) tuples, list of common prefixes)
695            for the current versions of the paths with the given parent,
696            matching the following criteria.
697            
698            The property tuple for a version is returned if all
699            of these conditions are true:
700                 
701                 a. parent matches
702                 
703                 b. path > start
704                 
705                 c. path starts with prefix (and paths in pathq)
706                 
707                 d. version is the max up to before
708                 
709                 e. version is not in cluster
710                 
711                 f. the path does not have the delimiter occuring
712                    after the prefix, or ends with the delimiter
713                 
714                 g. serial matches the attribute filter query.
715                    
716                    A filter query is a comma-separated list of
717                    terms in one of these three forms:
718                    
719                    key
720                        an attribute with this key must exist
721                    
722                    !key
723                        an attribute with this key must not exist
724                    
725                    key ?op value
726                        the attribute with this key satisfies the value
727                        where ?op is one of ==, != <=, >=, <, >.
728            
729            The list of common prefixes includes the prefixes
730            matching up to the first delimiter after prefix,
731            and are reported only once, as "virtual directories".
732            The delimiter is included in the prefixes.
733            
734            If arguments are None, then the corresponding matching rule
735            will always match.
736            
737            Limit applies to the first list of tuples returned.
738         """
739         
740         if not start or start < prefix:
741             start = strprevling(prefix)
742         nextling = strnextling(prefix)
743         
744         a = self.attributes.alias('a')
745         v = self.versions.alias('v')
746         n = self.nodes.alias('n')
747         s = select([n.c.path, v.c.serial]).distinct()
748         s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
749             and_(self.versions.c.node == v.c.node,
750                  self.versions.c.mtime < before)))
751         s = s.where(v.c.cluster != except_cluster)
752         s = s.where(v.c.node.in_(select([self.nodes.c.node],
753             self.nodes.c.parent == parent)))
754         if filterq:
755             s = s.where(a.c.serial == v.c.serial)
756         
757         s = s.where(n.c.node == v.c.node)
758         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
759         conj = []
760         for x in pathq:
761             conj.append(n.c.path.like(x + '%'))
762         
763         if conj:
764             s = s.where(or_(*conj))
765         
766         if filterq:
767             s = s.where(a.c.key.in_(filterq.split(',')))
768         
769         s = s.order_by(n.c.path)
770         
771         if not delimiter:
772             s = s.limit(limit)
773             rp = self.conn.execute(s, start=start)
774             r = rp.fetchall()
775             rp.close()
776             return r, ()
777         
778         pfz = len(prefix)
779         dz = len(delimiter)
780         count = 0
781         prefixes = []
782         pappend = prefixes.append
783         matches = []
784         mappend = matches.append
785         
786         rp = self.conn.execute(s, start=start)
787         while True:
788             props = rp.fetchone()
789             if props is None:
790                 break
791             path, serial = props
792             idx = path.find(delimiter, pfz)
793             
794             if idx < 0:
795                 mappend(props)
796                 count += 1
797                 if count >= limit:
798                     break
799                 continue
800             
801             if idx + dz == len(path):
802                 mappend(props)
803                 count += 1
804                 continue # Get one more, in case there is a path.
805             pf = path[:idx + dz]
806             pappend(pf)
807             if count >= limit: 
808                 break
809             
810             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
811         
812         return matches, prefixes