d24a0b4be94f5cc9341c7b46a4fe319423de2dd5
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
39
40 from dbworker import DBWorker
41
42 ROOTNODE  = 0
43
44 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
45
46 inf = float('inf')
47
48
49 def strnextling(prefix):
50     """Return the first unicode string
51        greater than but not starting with given prefix.
52        strnextling('hello') -> 'hellp'
53     """
54     if not prefix:
55         ## all strings start with the null string,
56         ## therefore we have to approximate strnextling('')
57         ## with the last unicode character supported by python
58         ## 0x10ffff for wide (32-bit unicode) python builds
59         ## 0x00ffff for narrow (16-bit unicode) python builds
60         ## We will not autodetect. 0xffff is safe enough.
61         return unichr(0xffff)
62     s = prefix[:-1]
63     c = ord(prefix[-1])
64     if c >= 0xffff:
65         raise RuntimeError
66     s += unichr(c+1)
67     return s
68
69 def strprevling(prefix):
70     """Return an approximation of the last unicode string
71        less than but not starting with given prefix.
72        strprevling(u'hello') -> u'helln\\xffff'
73     """
74     if not prefix:
75         ## There is no prevling for the null string
76         return prefix
77     s = prefix[:-1]
78     c = ord(prefix[-1])
79     if c > 0:
80         #s += unichr(c-1) + unichr(0xffff)
81         s += unichr(c-1)
82     return s
83
84
85 _propnames = {
86     'serial'    : 0,
87     'node'      : 1,
88     'hash'      : 2,
89     'size'      : 3,
90     'source'    : 4,
91     'mtime'     : 5,
92     'muser'     : 6,
93     'cluster'   : 7,
94 }
95
96
97 class Node(DBWorker):
98     """Nodes store path organization and have multiple versions.
99        Versions store object history and have multiple attributes.
100        Attributes store metadata.
101     """
102     
103     # TODO: Provide an interface for included and excluded clusters.
104     
105     def __init__(self, **params):
106         DBWorker.__init__(self, **params)
107         metadata = MetaData()
108         
109         #create nodes table
110         columns=[]
111         columns.append(Column('node', Integer, primary_key=True))
112         columns.append(Column('parent', Integer,
113                               ForeignKey('nodes.node',
114                                          ondelete='CASCADE',
115                                          onupdate='CASCADE'),
116                               autoincrement=False))
117         columns.append(Column('path', String(2048), default='', nullable=False))
118         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
119         # place an index on path
120         Index('idx_nodes_path', self.nodes.c.path)
121         
122         #create statistics table
123         columns=[]
124         columns.append(Column('node', Integer,
125                               ForeignKey('nodes.node',
126                                          ondelete='CASCADE',
127                                          onupdate='CASCADE'),
128                               primary_key=True))
129         columns.append(Column('population', Integer, nullable=False, default=0))
130         columns.append(Column('size', BigInteger, nullable=False, default=0))
131         columns.append(Column('mtime', DECIMAL))
132         columns.append(Column('cluster', Integer, nullable=False, default=0,
133                               primary_key=True, autoincrement=False))
134         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
135         
136         #create versions table
137         columns=[]
138         columns.append(Column('serial', Integer, primary_key=True))
139         columns.append(Column('node', Integer,
140                               ForeignKey('nodes.node',
141                                          ondelete='CASCADE',
142                                          onupdate='CASCADE')))
143         columns.append(Column('hash', String(255)))
144         columns.append(Column('size', BigInteger, nullable=False, default=0))
145         columns.append(Column('source', Integer))
146         columns.append(Column('mtime', DECIMAL))
147         columns.append(Column('muser', String(255), nullable=False, default=''))
148         columns.append(Column('cluster', Integer, nullable=False, default=0))
149         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
150         Index('idx_versions_node_mtime', self.versions.c.node,
151               self.versions.c.mtime)
152         
153         #create attributes table
154         columns = []
155         columns.append(Column('serial', Integer,
156                               ForeignKey('versions.serial',
157                                          ondelete='CASCADE',
158                                          onupdate='CASCADE'),
159                               primary_key=True))
160         columns.append(Column('key', String(255), primary_key=True))
161         columns.append(Column('value', String(255)))
162         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
163         
164         metadata.create_all(self.engine)
165         
166         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
167                                            self.nodes.c.parent == ROOTNODE))
168         rp = self.conn.execute(s)
169         r = rp.fetchone()
170         rp.close()
171         if not r:
172             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
173             self.conn.execute(s)
174     
175     def node_create(self, parent, path):
176         """Create a new node from the given properties.
177            Return the node identifier of the new node.
178         """
179         #TODO catch IntegrityError?
180         s = self.nodes.insert().values(parent=parent, path=path)
181         r = self.conn.execute(s)
182         inserted_primary_key = r.inserted_primary_key[0]
183         r.close()
184         return inserted_primary_key
185     
186     def node_lookup(self, path):
187         """Lookup the current node of the given path.
188            Return None if the path is not found.
189         """
190         
191         s = select([self.nodes.c.node], self.nodes.c.path == path)
192         r = self.conn.execute(s)
193         row = r.fetchone()
194         r.close()
195         if row:
196             return row[0]
197         return None
198     
199     def node_get_properties(self, node):
200         """Return the node's (parent, path).
201            Return None if the node is not found.
202         """
203         
204         s = select([self.nodes.c.parent, self.nodes.c.path])
205         s = s.where(self.nodes.c.node == node)
206         r = self.conn.execute(s)
207         l = r.fetchone()
208         r.close()
209         return l
210     
211     def node_get_versions(self, node, keys=(), propnames=_propnames):
212         """Return the properties of all versions at node.
213            If keys is empty, return all properties in the order
214            (serial, node, size, source, mtime, muser, cluster).
215         """
216         
217         s = select([self.versions.c.serial,
218                     self.versions.c.node,
219                     self.versions.c.hash,
220                     self.versions.c.size,
221                     self.versions.c.source,
222                     self.versions.c.mtime,
223                     self.versions.c.muser,
224                     self.versions.c.cluster], self.versions.c.node == node)
225         s = s.order_by(self.versions.c.serial)
226         r = self.conn.execute(s)
227         rows = r.fetchall()
228         r.close()
229         if not rows:
230             return rows
231         
232         if not keys:
233             return rows
234         
235         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
236     
237     def node_count_children(self, node):
238         """Return node's child count."""
239         
240         s = select([func.count(self.nodes.c.node)])
241         s = s.where(and_(self.nodes.c.parent == node,
242                          self.nodes.c.node != ROOTNODE))
243         r = self.conn.execute(s)
244         row = r.fetchone()
245         r.close()
246         return row[0]
247     
248     def node_purge_children(self, parent, before=inf, cluster=0):
249         """Delete all versions with the specified
250            parent and cluster, and return
251            the serials of versions deleted.
252            Clears out nodes with no remaining versions.
253         """
254         #update statistics
255         c1 = select([self.nodes.c.node],
256             self.nodes.c.parent == parent)
257         where_clause = and_(self.versions.c.node.in_(c1),
258                             self.versions.c.cluster == cluster)
259         s = select([func.count(self.versions.c.serial),
260                     func.sum(self.versions.c.size)])
261         s = s.where(where_clause)
262         if before != inf:
263             s = s.where(self.versions.c.mtime <= before)
264         r = self.conn.execute(s)
265         row = r.fetchone()
266         r.close()
267         if not row:
268             return ()
269         nr, size = row[0], -row[1] if row[1] else 0
270         mtime = time()
271         self.statistics_update(parent, -nr, size, mtime, cluster)
272         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
273         
274         s = select([self.versions.c.serial])
275         s = s.where(where_clause)
276         r = self.conn.execute(s)
277         serials = [row[SERIAL] for row in r.fetchall()]
278         r.close()
279         
280         #delete versions
281         s = self.versions.delete().where(where_clause)
282         r = self.conn.execute(s)
283         r.close()
284         
285         #delete nodes
286         s = select([self.nodes.c.node],
287             and_(self.nodes.c.parent == parent,
288                  select([func.count(self.versions.c.serial)],
289                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
290         rp = self.conn.execute(s)
291         nodes = [r[0] for r in rp.fetchall()]
292         rp.close()
293         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
294         self.conn.execute(s).close()
295         
296         return serials
297     
298     def node_purge(self, node, before=inf, cluster=0):
299         """Delete all versions with the specified
300            node and cluster, and return
301            the serials of versions deleted.
302            Clears out the node if it has no remaining versions.
303         """
304         
305         #update statistics
306         s = select([func.count(self.versions.c.serial),
307                     func.sum(self.versions.c.size)])
308         where_clause = and_(self.versions.c.node == node,
309                          self.versions.c.cluster == cluster)
310         s = s.where(where_clause)
311         if before != inf:
312             s = s.where(self.versions.c.mtime <= before)
313         r = self.conn.execute(s)
314         row = r.fetchone()
315         nr, size = row[0], row[1]
316         r.close()
317         if not nr:
318             return ()
319         mtime = time()
320         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
321         
322         s = select([self.versions.c.serial])
323         s = s.where(where_clause)
324         r = self.conn.execute(s)
325         serials = [r[SERIAL] for r in r.fetchall()]
326         r.close()
327         
328         #delete versions
329         s = self.versions.delete().where(where_clause)
330         r = self.conn.execute(s)
331         r.close()
332         
333         #delete nodes
334         s = select([self.nodes.c.node],
335             and_(self.nodes.c.node == node,
336                  select([func.count(self.versions.c.serial)],
337                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
338         r = self.conn.execute(s)
339         nodes = r.fetchall()
340         r.close()
341         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
342         self.conn.execute(s).close()
343         
344         return serials
345     
346     def node_remove(self, node):
347         """Remove the node specified.
348            Return false if the node has children or is not found.
349         """
350         
351         if self.node_count_children(node):
352             return False
353         
354         mtime = time()
355         s = select([func.count(self.versions.c.serial),
356                     func.sum(self.versions.c.size),
357                     self.versions.c.cluster])
358         s = s.where(self.versions.c.node == node)
359         s = s.group_by(self.versions.c.cluster)
360         r = self.conn.execute(s)
361         for population, size, cluster in r.fetchall():
362             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
363         r.close()
364         
365         s = self.nodes.delete().where(self.nodes.c.node == node)
366         self.conn.execute(s).close()
367         return True
368     
369     def statistics_get(self, node, cluster=0):
370         """Return population, total size and last mtime
371            for all versions under node that belong to the cluster.
372         """
373         
374         s = select([self.statistics.c.population,
375                     self.statistics.c.size,
376                     self.statistics.c.mtime])
377         s = s.where(and_(self.statistics.c.node == node,
378                          self.statistics.c.cluster == cluster))
379         r = self.conn.execute(s)
380         row = r.fetchone()
381         r.close()
382         return row
383     
384     def statistics_update(self, node, population, size, mtime, cluster=0):
385         """Update the statistics of the given node.
386            Statistics keep track the population, total
387            size of objects and mtime in the node's namespace.
388            May be zero or positive or negative numbers.
389         """
390         
391         s = select([self.statistics.c.population, self.statistics.c.size],
392             and_(self.statistics.c.node == node,
393                  self.statistics.c.cluster == cluster))
394         rp = self.conn.execute(s)
395         r = rp.fetchone()
396         rp.close()
397         if not r:
398             prepopulation, presize = (0, 0)
399         else:
400             prepopulation, presize = r
401         population += prepopulation
402         size += presize
403         
404         #insert or replace
405         #TODO better upsert
406         u = self.statistics.update().where(and_(self.statistics.c.node==node,
407                                            self.statistics.c.cluster==cluster))
408         u = u.values(population=population, size=size, mtime=mtime)
409         rp = self.conn.execute(u)
410         rp.close()
411         if rp.rowcount == 0:
412             ins = self.statistics.insert()
413             ins = ins.values(node=node, population=population, size=size,
414                              mtime=mtime, cluster=cluster)
415             self.conn.execute(ins).close()
416     
417     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
418         """Update the statistics of the given node's parent.
419            Then recursively update all parents up to the root.
420            Population is not recursive.
421         """
422         
423         while True:
424             if node == ROOTNODE:
425                 break
426             props = self.node_get_properties(node)
427             if props is None:
428                 break
429             parent, path = props
430             self.statistics_update(parent, population, size, mtime, cluster)
431             node = parent
432             population = 0 # Population isn't recursive
433     
434     def statistics_latest(self, node, before=inf, except_cluster=0):
435         """Return population, total size and last mtime
436            for all latest versions under node that
437            do not belong to the cluster.
438         """
439         
440         # The node.
441         props = self.node_get_properties(node)
442         if props is None:
443             return None
444         parent, path = props
445         
446         # The latest version.
447         s = select([self.versions.c.serial,
448                     self.versions.c.node,
449                     self.versions.c.hash,
450                     self.versions.c.size,
451                     self.versions.c.source,
452                     self.versions.c.mtime,
453                     self.versions.c.muser,
454                     self.versions.c.cluster])
455         filtered = select([func.max(self.versions.c.serial)],
456                             self.versions.c.node == node)
457         if before != inf:
458             filtered = filtered.where(self.versions.c.mtime < before)
459         s = s.where(and_(self.versions.c.cluster != except_cluster,
460                          self.versions.c.serial == filtered))
461         r = self.conn.execute(s)
462         props = r.fetchone()
463         r.close()
464         if not props:
465             return None
466         mtime = props[MTIME]
467         
468         # First level, just under node (get population).
469         v = self.versions.alias('v')
470         s = select([func.count(v.c.serial),
471                     func.sum(v.c.size),
472                     func.max(v.c.mtime)])
473         c1 = select([func.max(self.versions.c.serial)])
474         if before != inf:
475             c1 = c1.where(self.versions.c.mtime < before)
476         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
477         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
478                          v.c.cluster != except_cluster,
479                          v.c.node.in_(c2)))
480         rp = self.conn.execute(s)
481         r = rp.fetchone()
482         rp.close()
483         if not r:
484             return None
485         count = r[0]
486         mtime = max(mtime, r[2])
487         if count == 0:
488             return (0, 0, mtime)
489         
490         # All children (get size and mtime).
491         # XXX: This is why the full path is stored.
492         s = select([func.count(v.c.serial),
493                     func.sum(v.c.size),
494                     func.max(v.c.mtime)])
495         c1 = select([func.max(self.versions.c.serial)],
496             self.versions.c.node == v.c.node)
497         if before != inf:
498             c1 = c1.where(self.versions.c.mtime < before)
499         c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
500         s = s.where(and_(v.c.serial == c1,
501                          v.c.cluster != except_cluster,
502                          v.c.node.in_(c2)))
503         rp = self.conn.execute(s)
504         r = rp.fetchone()
505         rp.close()
506         if not r:
507             return None
508         size = r[1] - props[SIZE]
509         mtime = max(mtime, r[2])
510         return (count, size, mtime)
511     
512     def version_create(self, node, hash, size, source, muser, cluster=0):
513         """Create a new version from the given properties.
514            Return the (serial, mtime) of the new version.
515         """
516         
517         mtime = time()
518         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
519                                           mtime=mtime, muser=muser, cluster=cluster)
520         serial = self.conn.execute(s).inserted_primary_key[0]
521         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
522         return serial, mtime
523     
524     def version_lookup(self, node, before=inf, cluster=0):
525         """Lookup the current version of the given node.
526            Return a list with its properties:
527            (serial, node, hash, size, source, mtime, muser, cluster)
528            or None if the current version is not found in the given cluster.
529         """
530         
531         v = self.versions.alias('v')
532         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
533                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
534         c = select([func.max(self.versions.c.serial)],
535             self.versions.c.node == node)
536         if before != inf:
537             c = c.where(self.versions.c.mtime < before)
538         s = s.where(and_(v.c.serial == c,
539                          v.c.cluster == cluster))
540         r = self.conn.execute(s)
541         props = r.fetchone()
542         r.close()
543         if props:
544             return props
545         return None
546     
547     def version_get_properties(self, serial, keys=(), propnames=_propnames):
548         """Return a sequence of values for the properties of
549            the version specified by serial and the keys, in the order given.
550            If keys is empty, return all properties in the order
551            (serial, node, hash, size, source, mtime, muser, cluster).
552         """
553         
554         v = self.versions.alias()
555         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
556                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
557         rp = self.conn.execute(s)
558         r = rp.fetchone()
559         rp.close()
560         if r is None:
561             return r
562         
563         if not keys:
564             return r
565         return [r[propnames[k]] for k in keys if k in propnames]
566     
567     def version_recluster(self, serial, cluster):
568         """Move the version into another cluster."""
569         
570         props = self.version_get_properties(serial)
571         if not props:
572             return
573         node = props[NODE]
574         size = props[SIZE]
575         oldcluster = props[CLUSTER]
576         if cluster == oldcluster:
577             return
578         
579         mtime = time()
580         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
581         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
582         
583         s = self.versions.update()
584         s = s.where(self.versions.c.serial == serial)
585         s = s.values(cluster = cluster)
586         self.conn.execute(s).close()
587     
588     def version_remove(self, serial):
589         """Remove the serial specified."""
590         
591         props = self.node_get_properties(serial)
592         if not props:
593             return
594         node = props[NODE]
595         size = props[SIZE]
596         cluster = props[CLUSTER]
597         
598         mtime = time()
599         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
600         
601         s = self.versions.delete().where(self.versions.c.serial == serial)
602         self.conn.execute(s).close()
603         return True
604     
605     def attribute_get(self, serial, keys=()):
606         """Return a list of (key, value) pairs of the version specified by serial.
607            If keys is empty, return all attributes.
608            Othwerise, return only those specified.
609         """
610         
611         if keys:
612             attrs = self.attributes.alias()
613             s = select([attrs.c.key, attrs.c.value])
614             s = s.where(and_(attrs.c.key.in_(keys),
615                              attrs.c.serial == serial))
616         else:
617             attrs = self.attributes.alias()
618             s = select([attrs.c.key, attrs.c.value])
619             s = s.where(attrs.c.serial == serial)
620         r = self.conn.execute(s)
621         l = r.fetchall()
622         r.close()
623         return l
624     
625     def attribute_set(self, serial, items):
626         """Set the attributes of the version specified by serial.
627            Receive attributes as an iterable of (key, value) pairs.
628         """
629         #insert or replace
630         #TODO better upsert
631         for k, v in items:
632             s = self.attributes.update()
633             s = s.where(and_(self.attributes.c.serial == serial,
634                              self.attributes.c.key == k))
635             s = s.values(value = v)
636             rp = self.conn.execute(s)
637             rp.close()
638             if rp.rowcount == 0:
639                 s = self.attributes.insert()
640                 s = s.values(serial=serial, key=k, value=v)
641                 self.conn.execute(s).close()
642     
643     def attribute_del(self, serial, keys=()):
644         """Delete attributes of the version specified by serial.
645            If keys is empty, delete all attributes.
646            Otherwise delete those specified.
647         """
648         
649         if keys:
650             #TODO more efficient way to do this?
651             for key in keys:
652                 s = self.attributes.delete()
653                 s = s.where(and_(self.attributes.c.serial == serial,
654                                  self.attributes.c.key == key))
655                 self.conn.execute(s).close()
656         else:
657             s = self.attributes.delete()
658             s = s.where(self.attributes.c.serial == serial)
659             self.conn.execute(s).close()
660     
661     def attribute_copy(self, source, dest):
662         s = select([dest, self.attributes.c.key, self.attributes.c.value],
663             self.attributes.c.serial == source)
664         rp = self.conn.execute(s)
665         attributes = rp.fetchall()
666         rp.close()
667         for dest, k, v in attributes:
668             #insert or replace
669             s = self.attributes.update().where(and_(
670                 self.attributes.c.serial == dest,
671                 self.attributes.c.key == k))
672             rp = self.conn.execute(s, value=v)
673             rp.close()
674             if rp.rowcount == 0:
675                 s = self.attributes.insert()
676                 values = {'serial':dest, 'key':k, 'value':v}
677                 self.conn.execute(s, values).close()
678     
679     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
680         """Return a list with all keys pairs defined
681            for all latest versions under parent that
682            do not belong to the cluster.
683         """
684         
685         # TODO: Use another table to store before=inf results.
686         a = self.attributes.alias('a')
687         v = self.versions.alias('v')
688         n = self.nodes.alias('n')
689         s = select([a.c.key]).distinct()
690         filtered = select([func.max(self.versions.c.serial)])
691         if before != inf:
692             filtered = filtered.where(self.versions.c.mtime < before)
693         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
694         s = s.where(v.c.cluster != except_cluster)
695         s = s.where(v.c.node.in_(select([self.nodes.c.node],
696             self.nodes.c.parent == parent)))
697         s = s.where(a.c.serial == v.c.serial)
698         s = s.where(n.c.node == v.c.node)
699         conj = []
700         for x in pathq:
701             conj.append(n.c.path.like(x + '%'))
702         if conj:
703             s = s.where(or_(*conj))
704         rp = self.conn.execute(s)
705         rows = rp.fetchall()
706         rp.close()
707         return [r[0] for r in rows]
708     
709     def latest_version_list(self, parent, prefix='', delimiter=None,
710                             start='', limit=10000, before=inf,
711                             except_cluster=0, pathq=[], filterq=None):
712         """Return a (list of (path, serial) tuples, list of common prefixes)
713            for the current versions of the paths with the given parent,
714            matching the following criteria.
715            
716            The property tuple for a version is returned if all
717            of these conditions are true:
718                 
719                 a. parent matches
720                 
721                 b. path > start
722                 
723                 c. path starts with prefix (and paths in pathq)
724                 
725                 d. version is the max up to before
726                 
727                 e. version is not in cluster
728                 
729                 f. the path does not have the delimiter occuring
730                    after the prefix, or ends with the delimiter
731                 
732                 g. serial matches the attribute filter query.
733                    
734                    A filter query is a comma-separated list of
735                    terms in one of these three forms:
736                    
737                    key
738                        an attribute with this key must exist
739                    
740                    !key
741                        an attribute with this key must not exist
742                    
743                    key ?op value
744                        the attribute with this key satisfies the value
745                        where ?op is one of ==, != <=, >=, <, >.
746            
747            The list of common prefixes includes the prefixes
748            matching up to the first delimiter after prefix,
749            and are reported only once, as "virtual directories".
750            The delimiter is included in the prefixes.
751            
752            If arguments are None, then the corresponding matching rule
753            will always match.
754            
755            Limit applies to the first list of tuples returned.
756         """
757         
758         if not start or start < prefix:
759             start = strprevling(prefix)
760         nextling = strnextling(prefix)
761         
762         a = self.attributes.alias('a')
763         v = self.versions.alias('v')
764         n = self.nodes.alias('n')
765         s = select([n.c.path, v.c.serial]).distinct()
766         filtered = select([func.max(self.versions.c.serial)])
767         if before != inf:
768             filtered = filtered.where(self.versions.c.mtime < before)
769         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
770         s = s.where(v.c.cluster != except_cluster)
771         s = s.where(v.c.node.in_(select([self.nodes.c.node],
772             self.nodes.c.parent == parent)))
773         if filterq:
774             s = s.where(a.c.serial == v.c.serial)
775         
776         s = s.where(n.c.node == v.c.node)
777         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
778         conj = []
779         for x in pathq:
780             conj.append(n.c.path.like(x + '%'))
781         
782         if conj:
783             s = s.where(or_(*conj))
784         
785         if filterq:
786             s = s.where(a.c.key.in_(filterq.split(',')))
787         
788         s = s.order_by(n.c.path)
789         
790         if not delimiter:
791             s = s.limit(limit)
792             rp = self.conn.execute(s, start=start)
793             r = rp.fetchall()
794             rp.close()
795             return r, ()
796         
797         pfz = len(prefix)
798         dz = len(delimiter)
799         count = 0
800         prefixes = []
801         pappend = prefixes.append
802         matches = []
803         mappend = matches.append
804         
805         rp = self.conn.execute(s, start=start)
806         while True:
807             props = rp.fetchone()
808             if props is None:
809                 break
810             path, serial = props
811             idx = path.find(delimiter, pfz)
812             
813             if idx < 0:
814                 mappend(props)
815                 count += 1
816                 if count >= limit:
817                     break
818                 continue
819             
820             if idx + dz == len(path):
821                 mappend(props)
822                 count += 1
823                 continue # Get one more, in case there is a path.
824             pf = path[:idx + dz]
825             pappend(pf)
826             if count >= limit: 
827                 break
828             
829             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
830         rp.close()
831         
832         return matches, prefixes