backend components in SQLAlchemy: Progress IV
[pithos] / pithos / backends / lib_alchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, Float, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from duplicate import insertOnDuplicate
39 from dbworker import DBWorker
40
41 ROOTNODE  = 0
42
43 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
44
45 inf = float('inf')
46
47
48 def strnextling(prefix):
49     """Return the first unicode string
50        greater than but not starting with given prefix.
51        strnextling('hello') -> 'hellp'
52     """
53     if not prefix:
54         ## all strings start with the null string,
55         ## therefore we have to approximate strnextling('')
56         ## with the last unicode character supported by python
57         ## 0x10ffff for wide (32-bit unicode) python builds
58         ## 0x00ffff for narrow (16-bit unicode) python builds
59         ## We will not autodetect. 0xffff is safe enough.
60         return unichr(0xffff)
61     s = prefix[:-1]
62     c = ord(prefix[-1])
63     if c >= 0xffff:
64         raise RuntimeError
65     s += unichr(c+1)
66     return s
67
68 def strprevling(prefix):
69     """Return an approximation of the last unicode string
70        less than but not starting with given prefix.
71        strprevling(u'hello') -> u'helln\\xffff'
72     """
73     if not prefix:
74         ## There is no prevling for the null string
75         return prefix
76     s = prefix[:-1]
77     c = ord(prefix[-1])
78     if c > 0:
79         #s += unichr(c-1) + unichr(0xffff)
80         s += unichr(c-1)
81     return s
82
83
84 _propnames = {
85     'serial'    : 0,
86     'node'      : 1,
87     'size'      : 2,
88     'source'    : 3,
89     'mtime'     : 4,
90     'muser'     : 5,
91     'cluster'   : 6,
92 }
93
94
95 class Node(DBWorker):
96     """Nodes store path organization and have multiple versions.
97        Versions store object history and have multiple attributes.
98        Attributes store metadata.
99     """
100     
101     # TODO: Provide an interface for included and excluded clusters.
102     
103     def __init__(self, **params):
104         DBWorker.__init__(self, **params)
105         metadata = MetaData()
106         
107         #create nodes table
108         columns=[]
109         columns.append(Column('node', Integer, primary_key=True))
110         columns.append(Column('parent', Integer,
111                               ForeignKey('nodes.node',
112                                          ondelete='CASCADE',
113                                          onupdate='CASCADE'),
114                               autoincrement=False))
115         columns.append(Column('path', String(2048), default='', nullable=False))
116         self.nodes = Table('nodes', metadata, *columns)
117         # place an index on path
118         Index('idx_nodes_path', self.nodes.c.path, unique=True)
119         
120         #create statistics table
121         columns=[]
122         columns.append(Column('node', Integer,
123                               ForeignKey('nodes.node',
124                                          ondelete='CASCADE',
125                                          onupdate='CASCADE'),
126                               primary_key=True))
127         columns.append(Column('population', Integer, nullable=False, default=0))
128         columns.append(Column('size', Integer, nullable=False, default=0))
129         columns.append(Column('mtime', Float))
130         columns.append(Column('cluster', Integer, nullable=False, default=0,
131                               primary_key=True))
132         self.statistics = Table('statistics', metadata, *columns)
133         
134         #create versions table
135         columns=[]
136         columns.append(Column('serial', Integer, primary_key=True))
137         columns.append(Column('node', Integer,
138                               ForeignKey('nodes.node',
139                                          ondelete='CASCADE',
140                                          onupdate='CASCADE')))
141         columns.append(Column('size', Integer, nullable=False, default=0))
142         columns.append(Column('source', Integer))
143         columns.append(Column('mtime', Float))
144         columns.append(Column('muser', String(255), nullable=False, default=''))
145         columns.append(Column('cluster', Integer, nullable=False, default=0))
146         self.versions = Table('versions', metadata, *columns)
147         # place an index on node
148         Index('idx_versions_node', self.versions.c.node)
149         # TODO: Sort out if more indexes are needed.
150         #Index('idx_versions_node', self.versions.c.mtime)
151         
152         #create attributes table
153         columns = []
154         columns.append(Column('serial', Integer,
155                               ForeignKey('versions.serial',
156                                          ondelete='CASCADE',
157                                          onupdate='CASCADE'),
158                               primary_key=True))
159         columns.append(Column('key', String(255), primary_key=True))
160         columns.append(Column('value', String(255)))
161         self.attributes = Table('attributes', metadata, *columns)
162         
163         metadata.create_all(self.engine)
164         
165         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
166                                            self.nodes.c.parent == ROOTNODE))
167         r = self.conn.execute(s).fetchone()
168         if not r:
169             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
170             self.conn.execute(s)
171     
172     def node_create(self, parent, path):
173         """Create a new node from the given properties.
174            Return the node identifier of the new node.
175         """
176         #TODO catch IntegrityError?
177         s = self.nodes.insert().values(parent=parent, path=path)
178         r = self.conn.execute(s)
179         inserted_primary_key = r.inserted_primary_key[0]
180         r.close()
181         return inserted_primary_key
182     
183     def node_lookup(self, path):
184         """Lookup the current node of the given path.
185            Return None if the path is not found.
186         """
187         
188         s = select([self.nodes.c.node], self.nodes.c.path == path)
189         r = self.conn.execute(s)
190         row = r.fetchone()
191         r.close()
192         if row:
193             return row[0]
194         return None
195     
196     def node_get_properties(self, node):
197         """Return the node's (parent, path).
198            Return None if the node is not found.
199         """
200         
201         s = select([self.nodes.c.parent, self.nodes.c.path])
202         s = s.where(self.nodes.c.node == node)
203         r = self.conn.execute(s)
204         l = r.fetchone()
205         r.close()
206         return l
207     
208     def node_get_versions(self, node, keys=(), propnames=_propnames):
209         """Return the properties of all versions at node.
210            If keys is empty, return all properties in the order
211            (serial, node, size, source, mtime, muser, cluster).
212         """
213         
214         s = select(['*'], self.versions.c.node == node)
215         r = self.conn.execute(s)
216         rows = r.fetchall()
217         if not rows:
218             return rows
219         
220         if not keys:
221             return rows
222         
223         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
224     
225     def node_count_children(self, node):
226         """Return node's child count."""
227         
228         s = select([func.count(self.nodes.c.node)])
229         s = s.where(and_(self.nodes.c.parent == node,
230                          self.nodes.c.node != ROOTNODE))
231         r = self.conn.execute(s)
232         row = r.fetchone()
233         r.close()
234         return row[0]
235     
236     def node_purge_children(self, parent, before=inf, cluster=0):
237         """Delete all versions with the specified
238            parent and cluster, and return
239            the serials of versions deleted.
240            Clears out nodes with no remaining versions.
241         """
242         #update statistics
243         #TODO handle before=inf
244         c1 = select([self.nodes.c.node],
245             self.nodes.c.parent == parent)
246         where_clause = and_(self.versions.c.node.in_(c1),
247                             self.versions.c.cluster == cluster,
248                             self.versions.c.mtime <= before)
249         s = select([func.count(self.versions.c.serial),
250                     func.sum(self.versions.c.size)])
251         s = s.where(where_clause)
252         r = self.conn.execute(s)
253         row = r.fetchone()
254         r.close()
255         if not row:
256             return ()
257         nr, size = row[0], -row[1] if row[1] else 0
258         mtime = time()
259         self.statistics_update(parent, -nr, size, mtime, cluster)
260         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
261         
262         s = select([self.versions.c.serial])
263         s = s.where(where_clause)
264         r = self.conn.execute(s)
265         serials = [row[SERIAL] for row in r.fetchall()]
266         r.close()
267         
268         #delete versions
269         s = self.versions.delete().where(where_clause)
270         r = self.conn.execute(s)
271         r.close()
272         
273         #delete nodes
274         s = select([self.nodes.c.node],
275             and_(self.nodes.c.parent == parent,
276                  select([func.count(self.versions.c.serial)],
277                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
278         r = self.conn.execute(s)
279         nodes = r.fetchall()
280         r.close()
281         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
282         self.conn.execute(s).close()
283         
284         return serials
285     
286     def node_purge(self, node, before=inf, cluster=0):
287         """Delete all versions with the specified
288            node and cluster, and return
289            the serials of versions deleted.
290            Clears out the node if it has no remaining versions.
291         """
292         
293         #update statistics
294         s = select([func.count(self.versions.c.serial),
295                     func.sum(self.versions.c.size)])
296         where_clause = and_(self.versions.c.node == node,
297                          self.versions.c.cluster == cluster,
298                          self.versions.c.mtime <= before)
299         s = s.where(where_clause)
300         r = self.conn.execute(s)
301         row = r.fetchone()
302         nr, size = row[0], row[1]
303         r.close()
304         if not nr:
305             return ()
306         mtime = time()
307         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
308         
309         s = select([self.versions.c.serial])
310         s = s.where(where_clause)
311         r = self.conn.execute(s)
312         serials = [r[SERIAL] for r in r.fetchall()]
313         
314         #delete versions
315         s = self.versions.delete().where(where_clause)
316         r = self.conn.execute(s)
317         r.close()
318         
319         #delete nodes
320         s = select([self.nodes.c.node],
321             and_(self.nodes.c.node == node,
322                  select([func.count(self.versions.c.serial)],
323                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
324         r = self.conn.execute(s)
325         nodes = r.fetchall()
326         r.close()
327         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
328         self.conn.execute(s).close()
329         
330         return serials
331     
332     def node_remove(self, node):
333         """Remove the node specified.
334            Return false if the node has children or is not found.
335         """
336         
337         if self.node_count_children(node):
338             return False
339         
340         mtime = time()
341         s = select([func.count(self.versions.c.serial),
342                     func.sum(self.versions.c.size),
343                     self.versions.c.cluster])
344         s = s.where(self.versions.c.node == node)
345         s = s.group_by(self.versions.c.cluster)
346         r = self.conn.execute(s)
347         for population, size, cluster in r.fetchall():
348             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
349         r.close()
350         
351         s = self.nodes.delete().where(self.nodes.c.node == node)
352         self.conn.execute(s).close()
353         return True
354     
355     def statistics_get(self, node, cluster=0):
356         """Return population, total size and last mtime
357            for all versions under node that belong to the cluster.
358         """
359         
360         s = select([self.statistics.c.population,
361                     self.statistics.c.size,
362                     self.statistics.c.mtime])
363         s = s.where(and_(self.statistics.c.node == node,
364                          self.statistics.c.cluster == cluster))
365         r = self.conn.execute(s)
366         row = r.fetchone()
367         r.close()
368         return row
369     
370     def statistics_update(self, node, population, size, mtime, cluster=0):
371         """Update the statistics of the given node.
372            Statistics keep track the population, total
373            size of objects and mtime in the node's namespace.
374            May be zero or positive or negative numbers.
375         """
376         
377         s = select([self.statistics.c.population, self.statistics.c.size],
378             and_(self.statistics.c.node == node,
379                  self.statistics.c.cluster == cluster))
380         rp = self.conn.execute(s)
381         r = rp.fetchone()
382         rp.close()
383         if not r:
384             prepopulation, presize = (0, 0)
385         else:
386             prepopulation, presize = r
387         population += prepopulation
388         size += presize
389         
390         #insert or replace
391         u = self.statistics.update().where(and_(self.statistics.c.node==node,
392                                            self.statistics.c.cluster==cluster))
393         u = u.values(population=population, size=size, mtime=mtime)
394         rp = self.conn.execute(u)
395         rp.close()
396         if rp.rowcount == 0:
397             ins = self.statistics.insert()
398             ins = ins.values(node=node, population=population, size=size,
399                              mtime=mtime, cluster=cluster)
400             self.conn.execute(ins).close()
401     
402     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
403         """Update the statistics of the given node's parent.
404            Then recursively update all parents up to the root.
405            Population is not recursive.
406         """
407         
408         while True:
409             if node == ROOTNODE:
410                 break
411             props = self.node_get_properties(node)
412             if props is None:
413                 break
414             parent, path = props
415             self.statistics_update(parent, population, size, mtime, cluster)
416             node = parent
417             population = 0 # Population isn't recursive
418     
419     def statistics_latest(self, node, before=inf, except_cluster=0):
420         """Return population, total size and last mtime
421            for all latest versions under node that
422            do not belong to the cluster.
423         """
424         
425         # The node.
426         props = self.node_get_properties(node)
427         if props is None:
428             return None
429         parent, path = props
430         
431         # The latest version.
432         s = select([self.versions.c.serial,
433                     self.versions.c.node,
434                     self.versions.c.size,
435                     self.versions.c.mtime,
436                     self.versions.c.muser,
437                     self.versions.c.cluster])
438         s = s.where(and_(self.versions.c.cluster != except_cluster,
439                          self.versions.c.serial == select(
440                             [func.max(self.versions.c.serial)],
441                             and_(self.versions.c.node == node,
442                             self.versions.c.mtime < before))))
443         r = self.conn.execute(s)
444         props = r.fetchone()
445         r.close()
446         if not props:
447             return None
448         mtime = props[MTIME]
449         
450         # First level, just under node (get population).
451         v = self.versions.alias('v')
452         s = select([func.count(v.c.serial),
453                     func.sum(v.c.size),
454                     func.max(v.c.mtime)])
455         c1 = select([func.max(self.versions.c.serial)],
456             and_(self.versions.c.node == v.c.node,
457                  self.versions.c.mtime < before))
458         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
459         s = s.where(and_(v.c.serial == c1,
460                          v.c.cluster != except_cluster,
461                          v.c.node.in_(c2)))
462         rp = self.conn.execute(s)
463         r = rp.fetchone()
464         rp.close()
465         if not r:
466             return None
467         count = r[0]
468         mtime = max(mtime, r[2])
469         if count == 0:
470             return (0, 0, mtime)
471         
472         # All children (get size and mtime).
473         # XXX: This is why the full path is stored.
474         s = select([func.count(v.c.serial),
475                     func.sum(v.c.size),
476                     func.max(v.c.mtime)])
477         c1 = select([func.max(self.versions.c.serial)],
478             and_(self.versions.c.node == v.c.node,
479                  self.versions.c.mtime < before))
480         c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
481         s = s.where(and_(v.c.serial == c1,
482                          v.c.cluster != except_cluster,
483                          v.c.node.in_(c2)))
484         rp = self.conn.execute(s)
485         r = rp.fetchone()
486         rp.close()
487         if not r:
488             return None
489         size = r[1] - props[SIZE]
490         mtime = max(mtime, r[2])
491         return (count, size, mtime)
492     
493     def version_create(self, node, size, source, muser, cluster=0):
494         """Create a new version from the given properties.
495            Return the (serial, mtime) of the new version.
496         """
497         
498         mtime = time()
499         props = (node, size, source, mtime, muser, cluster)
500         props = locals()
501         props.pop('self')
502         s = self.versions.insert().values(**props)
503         serial = self.conn.execute(s).inserted_primary_key[0]
504         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
505         return serial, mtime
506     
507     def version_lookup(self, node, before=inf, cluster=0):
508         """Lookup the current version of the given node.
509            Return a list with its properties:
510            (serial, node, size, source, mtime, muser, cluster)
511            or None if the current version is not found in the given cluster.
512         """
513         
514         v = self.versions.alias('v')
515         s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
516                     v.c.muser, v.c.cluster])
517         c = select([func.max(self.versions.c.serial)],
518             and_(self.versions.c.node == node,
519                  self.versions.c.mtime < before))
520         s = s.where(and_(v.c.serial == c,
521                          v.c.cluster == cluster))
522         r = self.conn.execute(s)
523         props = r.fetchone()
524         r.close()
525         if not props:
526             return props
527         return None
528     
529     def version_get_properties(self, serial, keys=(), propnames=_propnames):
530         """Return a sequence of values for the properties of
531            the version specified by serial and the keys, in the order given.
532            If keys is empty, return all properties in the order
533            (serial, node, size, source, mtime, muser, cluster).
534         """
535         
536         v = self.versions.alias()
537         s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
538                    v.c.muser, v.c.cluster], v.c.serial == serial)
539         rp = self.conn.execute(s)
540         r = rp.fetchone()
541         rp.close()
542         if r is None:
543             return r
544         
545         if not keys:
546             return r
547         return [r[propnames[k]] for k in keys if k in propnames]
548     
549     def version_recluster(self, serial, cluster):
550         """Move the version into another cluster."""
551         
552         props = self.version_get_properties(serial)
553         if not props:
554             return
555         node = props[NODE]
556         size = props[SIZE]
557         oldcluster = props[CLUSTER]
558         if cluster == oldcluster:
559             return
560         
561         mtime = time()
562         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
563         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
564         
565         s = self.versions.update()
566         s = s.where(self.versions.c.serial == serial)
567         s = s.values(cluster = cluster)
568         self.conn.execute(s).close()
569     
570     def version_remove(self, serial):
571         """Remove the serial specified."""
572         
573         props = self.node_get_properties(serial)
574         if not props:
575             return
576         node = props[NODE]
577         size = props[SIZE]
578         cluster = props[CLUSTER]
579         
580         mtime = time()
581         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
582         
583         s = self.versions.delete().where(self.versions.c.serial == serial)
584         self.conn.execute(s).close()
585         return True
586     
587     def attribute_get(self, serial, keys=()):
588         """Return a list of (key, value) pairs of the version specified by serial.
589            If keys is empty, return all attributes.
590            Othwerise, return only those specified.
591         """
592         
593         execute = self.execute
594         if keys:
595             attrs = self.attributes.alias()
596             s = select([attrs.c.key, attrs.c.value])
597             s = s.where(and_(attrs.c.key.in_(keys),
598                              attrs.c.serial == serial))
599         else:
600             attrs = self.attributes.alias()
601             s = select([attrs.c.key, attrs.c.value])
602             s = s.where(attrs.c.serial == serial)
603         r = self.conn.execute(s)
604         l = r.fetchall()
605         r.close()
606         return l
607     
608     def attribute_set(self, serial, items):
609         """Set the attributes of the version specified by serial.
610            Receive attributes as an iterable of (key, value) pairs.
611         """
612         values = [{'serial':serial, 'key':k, 'value':v} for k, v in items]
613         self.conn.execute(self.attributes.insert(), values).close()
614     
615     def attribute_del(self, serial, keys=()):
616         """Delete attributes of the version specified by serial.
617            If keys is empty, delete all attributes.
618            Otherwise delete those specified.
619         """
620         
621         if keys:
622             #TODO more efficient way to do this?
623             for key in keys:
624                 s = self.attributes.delete()
625                 s = s.where(and_(self.attributes.c.serial == serial,
626                                  self.attributes.c.key == key))
627                 self.conn.execute(s).close()
628         else:
629             s = self.attributes.delete()
630             s = s.where(self.attributes.c.serial == serial)
631             self.conn.execute(s).close()
632     
633     def attribute_copy(self, source, dest):
634         from sqlalchemy.ext.compiler import compiles
635         from sqlalchemy.sql.expression import UpdateBase
636                 
637         class InsertFromSelect(UpdateBase):
638             def __init__(self, table, select):
639                 self.table = table
640                 self.select = select
641         
642         @compiles(InsertFromSelect)
643         def visit_insert_from_select(element, compiler, **kw):
644             return "INSERT INTO %s (%s)" % (
645                 compiler.process(element.table, asfrom=True),
646                 compiler.process(element.select)
647             )
648         
649         s = select([dest, self.attributes.c.key, self.attributes.c.value],
650             self.attributes.c.serial == source)
651         ins = InsertFromSelect(self.attributes, s)
652         self.conn.execute(ins).close()
653     
654     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
655         """Return a list with all keys pairs defined
656            for all latest versions under parent that
657            do not belong to the cluster.
658         """
659         
660         # TODO: Use another table to store before=inf results.
661         a = self.attributes.alias('a')
662         v = self.versions.alias('v')
663         n = self.nodes.alias('n')
664         s = select([a.c.key]).distinct()
665         s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
666                                           and_(self.versions.c.node == v.c.node,
667                                                self.versions.c.mtime < before)))
668         s = s.where(v.c.cluster != except_cluster)
669         s = s.where(v.c.node.in_(select([self.nodes.c.node],
670             self.nodes.c.parent == parent)))
671         s = s.where(a.c.serial == v.c.serial)
672         s = s.where(n.c.node == v.c.node)
673         conj = []
674         for x in pathq:
675             conj.append(n.c.path.like(x + '%'))
676         if conj:
677             s = s.where(or_(*conj))
678         rp = self.conn.execute(s)
679         r = rp.fetchall()
680         rp.close()
681         return [r[0] for r in self.fetchall()]
682     
683     def latest_version_list(self, parent, prefix='', delimiter=None,
684                             start='', limit=10000, before=inf,
685                             except_cluster=0, pathq=[], filterq=None):
686         """Return a (list of (path, serial) tuples, list of common prefixes)
687            for the current versions of the paths with the given parent,
688            matching the following criteria.
689            
690            The property tuple for a version is returned if all
691            of these conditions are true:
692                 
693                 a. parent matches
694                 
695                 b. path > start
696                 
697                 c. path starts with prefix (and paths in pathq)
698                 
699                 d. version is the max up to before
700                 
701                 e. version is not in cluster
702                 
703                 f. the path does not have the delimiter occuring
704                    after the prefix, or ends with the delimiter
705                 
706                 g. serial matches the attribute filter query.
707                    
708                    A filter query is a comma-separated list of
709                    terms in one of these three forms:
710                    
711                    key
712                        an attribute with this key must exist
713                    
714                    !key
715                        an attribute with this key must not exist
716                    
717                    key ?op value
718                        the attribute with this key satisfies the value
719                        where ?op is one of ==, != <=, >=, <, >.
720            
721            The list of common prefixes includes the prefixes
722            matching up to the first delimiter after prefix,
723            and are reported only once, as "virtual directories".
724            The delimiter is included in the prefixes.
725            
726            If arguments are None, then the corresponding matching rule
727            will always match.
728            
729            Limit applies to the first list of tuples returned.
730         """
731         
732         print '#', locals()
733         if not start or start < prefix:
734             start = strprevling(prefix)
735         nextling = strnextling(prefix)
736         
737         a = self.attributes.alias('a')
738         v = self.versions.alias('v')
739         n = self.nodes.alias('n')
740         s = select([n.c.path, v.c.serial]).distinct()
741         s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
742             and_(self.versions.c.node == v.c.node,
743                  self.versions.c.mtime < before)))
744         s = s.where(v.c.cluster != except_cluster)
745         s = s.where(v.c.node.in_(select([self.nodes.c.node],
746             self.nodes.c.parent == parent)))
747         if filterq:
748             s = s.where(a.c.serial == v.c.serial)
749         
750         s = s.where(n.c.node == v.c.node)
751         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
752         conj = []
753         for x in pathq:
754             conj.append(n.c.path.like(x + '%'))
755         
756         if conj:
757             s = s.where(or_(*conj))
758         
759         if filterq:
760             s = s.where(a.c.key.in_(filterq.split(',')))
761         
762         s = s.order_by(n.c.path)
763         
764         if not delimiter:
765             s = s.limit(limit)
766             rp = self.conn.execute(s, start=start)
767             r = rp.fetchall()
768             rp.close()
769             return r, ()
770         
771         pfz = len(prefix)
772         dz = len(delimiter)
773         count = 0
774         prefixes = []
775         pappend = prefixes.append
776         matches = []
777         mappend = matches.append
778         
779         rp = self.conn.execute(s, start=start)
780         while True:
781             props = rp.fetchone()
782             if props is None:
783                 break
784             path, serial = props
785             idx = path.find(delimiter, pfz)
786             
787             if idx < 0:
788                 mappend(props)
789                 count += 1
790                 if count >= limit:
791                     break
792                 continue
793             
794             if idx + dz == len(path):
795                 mappend(props)
796                 count += 1
797                 continue # Get one more, in case there is a path.
798             pf = path[:idx + dz]
799             pappend(pf)
800             if count >= limit: 
801                 break
802             
803             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
804         
805         return matches, prefixes