6e7c887aa985b8982d63d2c26a3de0c1896ee743
[pithos] / pithos / backends / lib_alchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, Float, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
39 try:
40     from sqlalchemy.sql.expression import _UpdateBase
41 except:
42     from sqlalchemy.sql.expression import UpdateBase as _UpdateBase
43
44 from dbworker import DBWorker
45
46 ROOTNODE  = 0
47
48 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
49
50 inf = float('inf')
51
52
53 def strnextling(prefix):
54     """Return the first unicode string
55        greater than but not starting with given prefix.
56        strnextling('hello') -> 'hellp'
57     """
58     if not prefix:
59         ## all strings start with the null string,
60         ## therefore we have to approximate strnextling('')
61         ## with the last unicode character supported by python
62         ## 0x10ffff for wide (32-bit unicode) python builds
63         ## 0x00ffff for narrow (16-bit unicode) python builds
64         ## We will not autodetect. 0xffff is safe enough.
65         return unichr(0xffff)
66     s = prefix[:-1]
67     c = ord(prefix[-1])
68     if c >= 0xffff:
69         raise RuntimeError
70     s += unichr(c+1)
71     return s
72
73 def strprevling(prefix):
74     """Return an approximation of the last unicode string
75        less than but not starting with given prefix.
76        strprevling(u'hello') -> u'helln\\xffff'
77     """
78     if not prefix:
79         ## There is no prevling for the null string
80         return prefix
81     s = prefix[:-1]
82     c = ord(prefix[-1])
83     if c > 0:
84         #s += unichr(c-1) + unichr(0xffff)
85         s += unichr(c-1)
86     return s
87
88
89 _propnames = {
90     'serial'    : 0,
91     'node'      : 1,
92     'size'      : 2,
93     'source'    : 3,
94     'mtime'     : 4,
95     'muser'     : 5,
96     'cluster'   : 6,
97 }
98
99
100 class Node(DBWorker):
101     """Nodes store path organization and have multiple versions.
102        Versions store object history and have multiple attributes.
103        Attributes store metadata.
104     """
105     
106     # TODO: Provide an interface for included and excluded clusters.
107     
108     def __init__(self, **params):
109         DBWorker.__init__(self, **params)
110         metadata = MetaData()
111         
112         #create nodes table
113         columns=[]
114         columns.append(Column('node', Integer, primary_key=True))
115         columns.append(Column('parent', Integer,
116                               ForeignKey('nodes.node',
117                                          ondelete='CASCADE',
118                                          onupdate='CASCADE'),
119                               autoincrement=False))
120         columns.append(Column('path', String(2048), default='', nullable=False))
121         self.nodes = Table('nodes', metadata, *columns)
122         # place an index on path
123         Index('idx_nodes_path', self.nodes.c.path, unique=True)
124         
125         #create statistics table
126         columns=[]
127         columns.append(Column('node', Integer,
128                               ForeignKey('nodes.node',
129                                          ondelete='CASCADE',
130                                          onupdate='CASCADE'),
131                               primary_key=True))
132         columns.append(Column('population', Integer, nullable=False, default=0))
133         columns.append(Column('size', Integer, nullable=False, default=0))
134         columns.append(Column('mtime', Float))
135         columns.append(Column('cluster', Integer, nullable=False, default=0,
136                               primary_key=True))
137         self.statistics = Table('statistics', metadata, *columns)
138         
139         #create versions table
140         columns=[]
141         columns.append(Column('serial', Integer, primary_key=True))
142         columns.append(Column('node', Integer,
143                               ForeignKey('nodes.node',
144                                          ondelete='CASCADE',
145                                          onupdate='CASCADE')))
146         columns.append(Column('size', Integer, nullable=False, default=0))
147         columns.append(Column('source', Integer))
148         columns.append(Column('mtime', Float))
149         columns.append(Column('muser', String(255), nullable=False, default=''))
150         columns.append(Column('cluster', Integer, nullable=False, default=0))
151         self.versions = Table('versions', metadata, *columns)
152         # place an index on node
153         Index('idx_versions_node', self.versions.c.node)
154         # TODO: Sort out if more indexes are needed.
155         #Index('idx_versions_node', self.versions.c.mtime)
156         
157         #create attributes table
158         columns = []
159         columns.append(Column('serial', Integer,
160                               ForeignKey('versions.serial',
161                                          ondelete='CASCADE',
162                                          onupdate='CASCADE'),
163                               primary_key=True))
164         columns.append(Column('key', String(255), primary_key=True))
165         columns.append(Column('value', String(255)))
166         self.attributes = Table('attributes', metadata, *columns)
167         
168         metadata.create_all(self.engine)
169         
170         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
171                                            self.nodes.c.parent == ROOTNODE))
172         r = self.conn.execute(s).fetchone()
173         if not r:
174             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
175             self.conn.execute(s)
176     
177     def node_create(self, parent, path):
178         """Create a new node from the given properties.
179            Return the node identifier of the new node.
180         """
181         #TODO catch IntegrityError?
182         s = self.nodes.insert().values(parent=parent, path=path)
183         r = self.conn.execute(s)
184         inserted_primary_key = r.inserted_primary_key[0]
185         r.close()
186         return inserted_primary_key
187     
188     def node_lookup(self, path):
189         """Lookup the current node of the given path.
190            Return None if the path is not found.
191         """
192         
193         s = select([self.nodes.c.node], self.nodes.c.path == path)
194         r = self.conn.execute(s)
195         row = r.fetchone()
196         r.close()
197         if row:
198             return row[0]
199         return None
200     
201     def node_get_properties(self, node):
202         """Return the node's (parent, path).
203            Return None if the node is not found.
204         """
205         
206         s = select([self.nodes.c.parent, self.nodes.c.path])
207         s = s.where(self.nodes.c.node == node)
208         r = self.conn.execute(s)
209         l = r.fetchone()
210         r.close()
211         return l
212     
213     def node_get_versions(self, node, keys=(), propnames=_propnames):
214         """Return the properties of all versions at node.
215            If keys is empty, return all properties in the order
216            (serial, node, size, source, mtime, muser, cluster).
217         """
218         
219         s = select(['*'], self.versions.c.node == node)
220         s = s.order_by(self.versions.c.serial)
221         r = self.conn.execute(s)
222         rows = r.fetchall()
223         if not rows:
224             return rows
225         
226         if not keys:
227             return rows
228         
229         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
230     
231     def node_count_children(self, node):
232         """Return node's child count."""
233         
234         s = select([func.count(self.nodes.c.node)])
235         s = s.where(and_(self.nodes.c.parent == node,
236                          self.nodes.c.node != ROOTNODE))
237         r = self.conn.execute(s)
238         row = r.fetchone()
239         r.close()
240         return row[0]
241     
242     def node_purge_children(self, parent, before=inf, cluster=0):
243         """Delete all versions with the specified
244            parent and cluster, and return
245            the serials of versions deleted.
246            Clears out nodes with no remaining versions.
247         """
248         #update statistics
249         c1 = select([self.nodes.c.node],
250             self.nodes.c.parent == parent)
251         where_clause = and_(self.versions.c.node.in_(c1),
252                             self.versions.c.cluster == cluster,
253                             self.versions.c.mtime <= before)
254         s = select([func.count(self.versions.c.serial),
255                     func.sum(self.versions.c.size)])
256         s = s.where(where_clause)
257         r = self.conn.execute(s)
258         row = r.fetchone()
259         r.close()
260         if not row:
261             return ()
262         nr, size = row[0], -row[1] if row[1] else 0
263         mtime = time()
264         self.statistics_update(parent, -nr, size, mtime, cluster)
265         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
266         
267         s = select([self.versions.c.serial])
268         s = s.where(where_clause)
269         r = self.conn.execute(s)
270         serials = [row[SERIAL] for row in r.fetchall()]
271         r.close()
272         
273         #delete versions
274         s = self.versions.delete().where(where_clause)
275         r = self.conn.execute(s)
276         r.close()
277         
278         #delete nodes
279         s = select([self.nodes.c.node],
280             and_(self.nodes.c.parent == parent,
281                  select([func.count(self.versions.c.serial)],
282                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
283         rp = self.conn.execute(s)
284         nodes = [r[0] for r in rp.fetchall()]
285         rp.close()
286         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
287         self.conn.execute(s).close()
288         
289         return serials
290     
291     def node_purge(self, node, before=inf, cluster=0):
292         """Delete all versions with the specified
293            node and cluster, and return
294            the serials of versions deleted.
295            Clears out the node if it has no remaining versions.
296         """
297         
298         #update statistics
299         s = select([func.count(self.versions.c.serial),
300                     func.sum(self.versions.c.size)])
301         where_clause = and_(self.versions.c.node == node,
302                          self.versions.c.cluster == cluster,
303                          self.versions.c.mtime <= before)
304         s = s.where(where_clause)
305         r = self.conn.execute(s)
306         row = r.fetchone()
307         nr, size = row[0], row[1]
308         r.close()
309         if not nr:
310             return ()
311         mtime = time()
312         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
313         
314         s = select([self.versions.c.serial])
315         s = s.where(where_clause)
316         r = self.conn.execute(s)
317         serials = [r[SERIAL] for r in r.fetchall()]
318         
319         #delete versions
320         s = self.versions.delete().where(where_clause)
321         r = self.conn.execute(s)
322         r.close()
323         
324         #delete nodes
325         s = select([self.nodes.c.node],
326             and_(self.nodes.c.node == node,
327                  select([func.count(self.versions.c.serial)],
328                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
329         r = self.conn.execute(s)
330         nodes = r.fetchall()
331         r.close()
332         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
333         self.conn.execute(s).close()
334         
335         return serials
336     
337     def node_remove(self, node):
338         """Remove the node specified.
339            Return false if the node has children or is not found.
340         """
341         
342         if self.node_count_children(node):
343             return False
344         
345         mtime = time()
346         s = select([func.count(self.versions.c.serial),
347                     func.sum(self.versions.c.size),
348                     self.versions.c.cluster])
349         s = s.where(self.versions.c.node == node)
350         s = s.group_by(self.versions.c.cluster)
351         r = self.conn.execute(s)
352         for population, size, cluster in r.fetchall():
353             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
354         r.close()
355         
356         s = self.nodes.delete().where(self.nodes.c.node == node)
357         self.conn.execute(s).close()
358         return True
359     
360     def statistics_get(self, node, cluster=0):
361         """Return population, total size and last mtime
362            for all versions under node that belong to the cluster.
363         """
364         
365         s = select([self.statistics.c.population,
366                     self.statistics.c.size,
367                     self.statistics.c.mtime])
368         s = s.where(and_(self.statistics.c.node == node,
369                          self.statistics.c.cluster == cluster))
370         r = self.conn.execute(s)
371         row = r.fetchone()
372         r.close()
373         return row
374     
375     def statistics_update(self, node, population, size, mtime, cluster=0):
376         """Update the statistics of the given node.
377            Statistics keep track the population, total
378            size of objects and mtime in the node's namespace.
379            May be zero or positive or negative numbers.
380         """
381         
382         s = select([self.statistics.c.population, self.statistics.c.size],
383             and_(self.statistics.c.node == node,
384                  self.statistics.c.cluster == cluster))
385         rp = self.conn.execute(s)
386         r = rp.fetchone()
387         rp.close()
388         if not r:
389             prepopulation, presize = (0, 0)
390         else:
391             prepopulation, presize = r
392         population += prepopulation
393         size += presize
394         
395         #insert or replace
396         #TODO better upsert
397         u = self.statistics.update().where(and_(self.statistics.c.node==node,
398                                            self.statistics.c.cluster==cluster))
399         u = u.values(population=population, size=size, mtime=mtime)
400         rp = self.conn.execute(u)
401         rp.close()
402         if rp.rowcount == 0:
403             ins = self.statistics.insert()
404             ins = ins.values(node=node, population=population, size=size,
405                              mtime=mtime, cluster=cluster)
406             self.conn.execute(ins).close()
407     
408     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
409         """Update the statistics of the given node's parent.
410            Then recursively update all parents up to the root.
411            Population is not recursive.
412         """
413         
414         while True:
415             if node == ROOTNODE:
416                 break
417             props = self.node_get_properties(node)
418             if props is None:
419                 break
420             parent, path = props
421             self.statistics_update(parent, population, size, mtime, cluster)
422             node = parent
423             population = 0 # Population isn't recursive
424     
425     def statistics_latest(self, node, before=inf, except_cluster=0):
426         """Return population, total size and last mtime
427            for all latest versions under node that
428            do not belong to the cluster.
429         """
430         
431         # The node.
432         props = self.node_get_properties(node)
433         if props is None:
434             return None
435         parent, path = props
436         
437         # The latest version.
438         s = select([self.versions.c.serial,
439                     self.versions.c.node,
440                     self.versions.c.size,
441                     self.versions.c.source,
442                     self.versions.c.mtime,
443                     self.versions.c.muser,
444                     self.versions.c.cluster])
445         s = s.where(and_(self.versions.c.cluster != except_cluster,
446                          self.versions.c.serial == select(
447                             [func.max(self.versions.c.serial)],
448                             and_(self.versions.c.node == node,
449                             self.versions.c.mtime < before))))
450         r = self.conn.execute(s)
451         props = r.fetchone()
452         r.close()
453         if not props:
454             return None
455         mtime = props[MTIME]
456         
457         # First level, just under node (get population).
458         v = self.versions.alias('v')
459         s = select([func.count(v.c.serial),
460                     func.sum(v.c.size),
461                     func.max(v.c.mtime)])
462         c1 = select([func.max(self.versions.c.serial)],
463             and_(self.versions.c.node == v.c.node,
464                  self.versions.c.mtime < before))
465         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
466         s = s.where(and_(v.c.serial == c1,
467                          v.c.cluster != except_cluster,
468                          v.c.node.in_(c2)))
469         rp = self.conn.execute(s)
470         r = rp.fetchone()
471         rp.close()
472         if not r:
473             return None
474         count = r[0]
475         mtime = max(mtime, r[2])
476         if count == 0:
477             return (0, 0, mtime)
478         
479         # All children (get size and mtime).
480         # XXX: This is why the full path is stored.
481         s = select([func.count(v.c.serial),
482                     func.sum(v.c.size),
483                     func.max(v.c.mtime)])
484         c1 = select([func.max(self.versions.c.serial)],
485             and_(self.versions.c.node == v.c.node,
486                  self.versions.c.mtime < before))
487         c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
488         s = s.where(and_(v.c.serial == c1,
489                          v.c.cluster != except_cluster,
490                          v.c.node.in_(c2)))
491         rp = self.conn.execute(s)
492         r = rp.fetchone()
493         rp.close()
494         if not r:
495             return None
496         size = r[1] - props[SIZE]
497         mtime = max(mtime, r[2])
498         return (count, size, mtime)
499     
500     def version_create(self, node, size, source, muser, cluster=0):
501         """Create a new version from the given properties.
502            Return the (serial, mtime) of the new version.
503         """
504         
505         mtime = time()
506         props = (node, size, source, mtime, muser, cluster)
507         props = locals()
508         props.pop('self')
509         s = self.versions.insert().values(**props)
510         serial = self.conn.execute(s).inserted_primary_key[0]
511         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
512         return serial, mtime
513     
514     def version_lookup(self, node, before=inf, cluster=0):
515         """Lookup the current version of the given node.
516            Return a list with its properties:
517            (serial, node, size, source, mtime, muser, cluster)
518            or None if the current version is not found in the given cluster.
519         """
520         
521         v = self.versions.alias('v')
522         s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
523                     v.c.muser, v.c.cluster])
524         c = select([func.max(self.versions.c.serial)],
525             and_(self.versions.c.node == node,
526                  self.versions.c.mtime < before))
527         s = s.where(and_(v.c.serial == c,
528                          v.c.cluster == cluster))
529         r = self.conn.execute(s)
530         props = r.fetchone()
531         r.close()
532         if props:
533             return props
534         return None
535     
536     def version_get_properties(self, serial, keys=(), propnames=_propnames):
537         """Return a sequence of values for the properties of
538            the version specified by serial and the keys, in the order given.
539            If keys is empty, return all properties in the order
540            (serial, node, size, source, mtime, muser, cluster).
541         """
542         
543         v = self.versions.alias()
544         s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
545                    v.c.muser, v.c.cluster], v.c.serial == serial)
546         rp = self.conn.execute(s)
547         r = rp.fetchone()
548         rp.close()
549         if r is None:
550             return r
551         
552         if not keys:
553             return r
554         return [r[propnames[k]] for k in keys if k in propnames]
555     
556     def version_recluster(self, serial, cluster):
557         """Move the version into another cluster."""
558         
559         props = self.version_get_properties(serial)
560         if not props:
561             return
562         node = props[NODE]
563         size = props[SIZE]
564         oldcluster = props[CLUSTER]
565         if cluster == oldcluster:
566             return
567         
568         mtime = time()
569         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
570         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
571         
572         s = self.versions.update()
573         s = s.where(self.versions.c.serial == serial)
574         s = s.values(cluster = cluster)
575         self.conn.execute(s).close()
576     
577     def version_remove(self, serial):
578         """Remove the serial specified."""
579         
580         props = self.node_get_properties(serial)
581         if not props:
582             return
583         node = props[NODE]
584         size = props[SIZE]
585         cluster = props[CLUSTER]
586         
587         mtime = time()
588         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
589         
590         s = self.versions.delete().where(self.versions.c.serial == serial)
591         self.conn.execute(s).close()
592         return True
593     
594     def attribute_get(self, serial, keys=()):
595         """Return a list of (key, value) pairs of the version specified by serial.
596            If keys is empty, return all attributes.
597            Othwerise, return only those specified.
598         """
599         
600         if keys:
601             attrs = self.attributes.alias()
602             s = select([attrs.c.key, attrs.c.value])
603             s = s.where(and_(attrs.c.key.in_(keys),
604                              attrs.c.serial == serial))
605         else:
606             attrs = self.attributes.alias()
607             s = select([attrs.c.key, attrs.c.value])
608             s = s.where(attrs.c.serial == serial)
609         r = self.conn.execute(s)
610         l = r.fetchall()
611         r.close()
612         return l
613     
614     def attribute_set(self, serial, items):
615         """Set the attributes of the version specified by serial.
616            Receive attributes as an iterable of (key, value) pairs.
617         """
618         #insert or replace
619         #TODO better upsert
620         for k, v in items:
621             s = self.attributes.update()
622             s = s.where(and_(self.attributes.c.serial == serial,
623                              self.attributes.c.key == k))
624             s = s.values(value = v)
625             rp = self.conn.execute(s)
626             rp.close()
627             if rp.rowcount == 0:
628                 s = self.attributes.insert()
629                 s = s.values(serial=serial, key=k, value=v)
630                 self.conn.execute(s).close()
631     
632     def attribute_del(self, serial, keys=()):
633         """Delete attributes of the version specified by serial.
634            If keys is empty, delete all attributes.
635            Otherwise delete those specified.
636         """
637         
638         if keys:
639             #TODO more efficient way to do this?
640             for key in keys:
641                 s = self.attributes.delete()
642                 s = s.where(and_(self.attributes.c.serial == serial,
643                                  self.attributes.c.key == key))
644                 self.conn.execute(s).close()
645         else:
646             s = self.attributes.delete()
647             s = s.where(self.attributes.c.serial == serial)
648             self.conn.execute(s).close()
649     
650     def attribute_copy(self, source, dest):
651         s = select([dest, self.attributes.c.key, self.attributes.c.value],
652             self.attributes.c.serial == source)
653         rp = self.conn.execute(s)
654         attributes = rp.fetchall()
655         rp.close()
656         for dest, k, v in attributes:
657             s = self.attributes.update().where(and_(
658                 self.attributes.c.serial == dest,
659                 self.attributes.c.key == k))
660             rp = self.conn.execute(s, value=v)
661             rp.close()
662             if rp.rowcount == 0:
663                 s = self.attributes.insert()
664                 values = {'serial':dest, 'key':k, 'value':v}
665                 self.conn.execute(s, values).close()
666     
667     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
668         """Return a list with all keys pairs defined
669            for all latest versions under parent that
670            do not belong to the cluster.
671         """
672         
673         # TODO: Use another table to store before=inf results.
674         a = self.attributes.alias('a')
675         v = self.versions.alias('v')
676         n = self.nodes.alias('n')
677         s = select([a.c.key]).distinct()
678         s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
679                                           and_(self.versions.c.node == v.c.node,
680                                                self.versions.c.mtime < before)))
681         s = s.where(v.c.cluster != except_cluster)
682         s = s.where(v.c.node.in_(select([self.nodes.c.node],
683             self.nodes.c.parent == parent)))
684         s = s.where(a.c.serial == v.c.serial)
685         s = s.where(n.c.node == v.c.node)
686         conj = []
687         for x in pathq:
688             conj.append(n.c.path.like(x + '%'))
689         if conj:
690             s = s.where(or_(*conj))
691         rp = self.conn.execute(s)
692         rows = rp.fetchall()
693         rp.close()
694         return [r[0] for r in rows]
695     
696     def latest_version_list(self, parent, prefix='', delimiter=None,
697                             start='', limit=10000, before=inf,
698                             except_cluster=0, pathq=[], filterq=None):
699         """Return a (list of (path, serial) tuples, list of common prefixes)
700            for the current versions of the paths with the given parent,
701            matching the following criteria.
702            
703            The property tuple for a version is returned if all
704            of these conditions are true:
705                 
706                 a. parent matches
707                 
708                 b. path > start
709                 
710                 c. path starts with prefix (and paths in pathq)
711                 
712                 d. version is the max up to before
713                 
714                 e. version is not in cluster
715                 
716                 f. the path does not have the delimiter occuring
717                    after the prefix, or ends with the delimiter
718                 
719                 g. serial matches the attribute filter query.
720                    
721                    A filter query is a comma-separated list of
722                    terms in one of these three forms:
723                    
724                    key
725                        an attribute with this key must exist
726                    
727                    !key
728                        an attribute with this key must not exist
729                    
730                    key ?op value
731                        the attribute with this key satisfies the value
732                        where ?op is one of ==, != <=, >=, <, >.
733            
734            The list of common prefixes includes the prefixes
735            matching up to the first delimiter after prefix,
736            and are reported only once, as "virtual directories".
737            The delimiter is included in the prefixes.
738            
739            If arguments are None, then the corresponding matching rule
740            will always match.
741            
742            Limit applies to the first list of tuples returned.
743         """
744         
745         if not start or start < prefix:
746             start = strprevling(prefix)
747         nextling = strnextling(prefix)
748         
749         a = self.attributes.alias('a')
750         v = self.versions.alias('v')
751         n = self.nodes.alias('n')
752         s = select([n.c.path, v.c.serial]).distinct()
753         s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
754             and_(self.versions.c.node == v.c.node,
755                  self.versions.c.mtime < before)))
756         s = s.where(v.c.cluster != except_cluster)
757         s = s.where(v.c.node.in_(select([self.nodes.c.node],
758             self.nodes.c.parent == parent)))
759         if filterq:
760             s = s.where(a.c.serial == v.c.serial)
761         
762         s = s.where(n.c.node == v.c.node)
763         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
764         conj = []
765         for x in pathq:
766             conj.append(n.c.path.like(x + '%'))
767         
768         if conj:
769             s = s.where(or_(*conj))
770         
771         if filterq:
772             s = s.where(a.c.key.in_(filterq.split(',')))
773         
774         s = s.order_by(n.c.path)
775         
776         if not delimiter:
777             s = s.limit(limit)
778             rp = self.conn.execute(s, start=start)
779             r = rp.fetchall()
780             rp.close()
781             return r, ()
782         
783         pfz = len(prefix)
784         dz = len(delimiter)
785         count = 0
786         prefixes = []
787         pappend = prefixes.append
788         matches = []
789         mappend = matches.append
790         
791         rp = self.conn.execute(s, start=start)
792         while True:
793             props = rp.fetchone()
794             if props is None:
795                 break
796             path, serial = props
797             idx = path.find(delimiter, pfz)
798             
799             if idx < 0:
800                 mappend(props)
801                 count += 1
802                 if count >= limit:
803                     break
804                 continue
805             
806             if idx + dz == len(path):
807                 mappend(props)
808                 count += 1
809                 continue # Get one more, in case there is a path.
810             pf = path[:idx + dz]
811             pappend(pf)
812             if count >= limit: 
813                 break
814             
815             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
816         
817         return matches, prefixes