change size db column to bigint
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, Float, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
39
40 from dbworker import DBWorker
41
42 ROOTNODE  = 0
43
44 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
45
46 inf = float('inf')
47
48
49 def strnextling(prefix):
50     """Return the first unicode string
51        greater than but not starting with given prefix.
52        strnextling('hello') -> 'hellp'
53     """
54     if not prefix:
55         ## all strings start with the null string,
56         ## therefore we have to approximate strnextling('')
57         ## with the last unicode character supported by python
58         ## 0x10ffff for wide (32-bit unicode) python builds
59         ## 0x00ffff for narrow (16-bit unicode) python builds
60         ## We will not autodetect. 0xffff is safe enough.
61         return unichr(0xffff)
62     s = prefix[:-1]
63     c = ord(prefix[-1])
64     if c >= 0xffff:
65         raise RuntimeError
66     s += unichr(c+1)
67     return s
68
69 def strprevling(prefix):
70     """Return an approximation of the last unicode string
71        less than but not starting with given prefix.
72        strprevling(u'hello') -> u'helln\\xffff'
73     """
74     if not prefix:
75         ## There is no prevling for the null string
76         return prefix
77     s = prefix[:-1]
78     c = ord(prefix[-1])
79     if c > 0:
80         #s += unichr(c-1) + unichr(0xffff)
81         s += unichr(c-1)
82     return s
83
84
85 _propnames = {
86     'serial'    : 0,
87     'node'      : 1,
88     'size'      : 2,
89     'source'    : 3,
90     'mtime'     : 4,
91     'muser'     : 5,
92     'cluster'   : 6,
93 }
94
95
96 class Node(DBWorker):
97     """Nodes store path organization and have multiple versions.
98        Versions store object history and have multiple attributes.
99        Attributes store metadata.
100     """
101     
102     # TODO: Provide an interface for included and excluded clusters.
103     
104     def __init__(self, **params):
105         DBWorker.__init__(self, **params)
106         metadata = MetaData()
107         
108         #create nodes table
109         columns=[]
110         columns.append(Column('node', Integer, primary_key=True))
111         columns.append(Column('parent', Integer,
112                               ForeignKey('nodes.node',
113                                          ondelete='CASCADE',
114                                          onupdate='CASCADE'),
115                               autoincrement=False))
116         columns.append(Column('path', String(2048), default='', nullable=False))
117         self.nodes = Table('nodes', metadata, *columns)
118         # place an index on path
119         Index('idx_nodes_path', self.nodes.c.path, unique=True)
120         
121         #create statistics table
122         columns=[]
123         columns.append(Column('node', Integer,
124                               ForeignKey('nodes.node',
125                                          ondelete='CASCADE',
126                                          onupdate='CASCADE'),
127                               primary_key=True))
128         columns.append(Column('population', Integer, nullable=False, default=0))
129         columns.append(Column('size', BigInteger, nullable=False, default=0))
130         columns.append(Column('mtime', Float))
131         columns.append(Column('cluster', Integer, nullable=False, default=0,
132                               primary_key=True))
133         self.statistics = Table('statistics', metadata, *columns)
134         
135         #create versions table
136         columns=[]
137         columns.append(Column('serial', Integer, primary_key=True))
138         columns.append(Column('node', Integer,
139                               ForeignKey('nodes.node',
140                                          ondelete='CASCADE',
141                                          onupdate='CASCADE')))
142         columns.append(Column('size', BigInteger, nullable=False, default=0))
143         columns.append(Column('source', Integer))
144         columns.append(Column('mtime', Float))
145         columns.append(Column('muser', String(255), nullable=False, default=''))
146         columns.append(Column('cluster', Integer, nullable=False, default=0))
147         self.versions = Table('versions', metadata, *columns)
148         Index('idx_versions_node_mtime', self.versions.c.node,
149               self.versions.c.mtime)
150         
151         #create attributes table
152         columns = []
153         columns.append(Column('serial', Integer,
154                               ForeignKey('versions.serial',
155                                          ondelete='CASCADE',
156                                          onupdate='CASCADE'),
157                               primary_key=True))
158         columns.append(Column('key', String(255), primary_key=True))
159         columns.append(Column('value', String(255)))
160         self.attributes = Table('attributes', metadata, *columns)
161         
162         metadata.create_all(self.engine)
163         
164         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
165                                            self.nodes.c.parent == ROOTNODE))
166         rp = self.conn.execute(s)
167         r = rp.fetchone()
168         rp.close()
169         if not r:
170             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
171             self.conn.execute(s)
172     
173     def node_create(self, parent, path):
174         """Create a new node from the given properties.
175            Return the node identifier of the new node.
176         """
177         #TODO catch IntegrityError?
178         s = self.nodes.insert().values(parent=parent, path=path)
179         r = self.conn.execute(s)
180         inserted_primary_key = r.inserted_primary_key[0]
181         r.close()
182         return inserted_primary_key
183     
184     def node_lookup(self, path):
185         """Lookup the current node of the given path.
186            Return None if the path is not found.
187         """
188         
189         s = select([self.nodes.c.node], self.nodes.c.path == path)
190         r = self.conn.execute(s)
191         row = r.fetchone()
192         r.close()
193         if row:
194             return row[0]
195         return None
196     
197     def node_get_properties(self, node):
198         """Return the node's (parent, path).
199            Return None if the node is not found.
200         """
201         
202         s = select([self.nodes.c.parent, self.nodes.c.path])
203         s = s.where(self.nodes.c.node == node)
204         r = self.conn.execute(s)
205         l = r.fetchone()
206         r.close()
207         return l
208     
209     def node_get_versions(self, node, keys=(), propnames=_propnames):
210         """Return the properties of all versions at node.
211            If keys is empty, return all properties in the order
212            (serial, node, size, source, mtime, muser, cluster).
213         """
214         
215         s = select(['*'], self.versions.c.node == node)
216         s = s.order_by(self.versions.c.serial)
217         r = self.conn.execute(s)
218         rows = r.fetchall()
219         r.close()
220         if not rows:
221             return rows
222         
223         if not keys:
224             return rows
225         
226         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
227     
228     def node_count_children(self, node):
229         """Return node's child count."""
230         
231         s = select([func.count(self.nodes.c.node)])
232         s = s.where(and_(self.nodes.c.parent == node,
233                          self.nodes.c.node != ROOTNODE))
234         r = self.conn.execute(s)
235         row = r.fetchone()
236         r.close()
237         return row[0]
238     
239     def node_purge_children(self, parent, before=inf, cluster=0):
240         """Delete all versions with the specified
241            parent and cluster, and return
242            the serials of versions deleted.
243            Clears out nodes with no remaining versions.
244         """
245         #update statistics
246         c1 = select([self.nodes.c.node],
247             self.nodes.c.parent == parent)
248         where_clause = and_(self.versions.c.node.in_(c1),
249                             self.versions.c.cluster == cluster,
250                             self.versions.c.mtime <= before)
251         s = select([func.count(self.versions.c.serial),
252                     func.sum(self.versions.c.size)])
253         s = s.where(where_clause)
254         r = self.conn.execute(s)
255         row = r.fetchone()
256         r.close()
257         if not row:
258             return ()
259         nr, size = row[0], -row[1] if row[1] else 0
260         mtime = time()
261         self.statistics_update(parent, -nr, size, mtime, cluster)
262         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
263         
264         s = select([self.versions.c.serial])
265         s = s.where(where_clause)
266         r = self.conn.execute(s)
267         serials = [row[SERIAL] for row in r.fetchall()]
268         r.close()
269         
270         #delete versions
271         s = self.versions.delete().where(where_clause)
272         r = self.conn.execute(s)
273         r.close()
274         
275         #delete nodes
276         s = select([self.nodes.c.node],
277             and_(self.nodes.c.parent == parent,
278                  select([func.count(self.versions.c.serial)],
279                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
280         rp = self.conn.execute(s)
281         nodes = [r[0] for r in rp.fetchall()]
282         rp.close()
283         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
284         self.conn.execute(s).close()
285         
286         return serials
287     
288     def node_purge(self, node, before=inf, cluster=0):
289         """Delete all versions with the specified
290            node and cluster, and return
291            the serials of versions deleted.
292            Clears out the node if it has no remaining versions.
293         """
294         
295         #update statistics
296         s = select([func.count(self.versions.c.serial),
297                     func.sum(self.versions.c.size)])
298         where_clause = and_(self.versions.c.node == node,
299                          self.versions.c.cluster == cluster,
300                          self.versions.c.mtime <= before)
301         s = s.where(where_clause)
302         r = self.conn.execute(s)
303         row = r.fetchone()
304         nr, size = row[0], row[1]
305         r.close()
306         if not nr:
307             return ()
308         mtime = time()
309         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
310         
311         s = select([self.versions.c.serial])
312         s = s.where(where_clause)
313         r = self.conn.execute(s)
314         serials = [r[SERIAL] for r in r.fetchall()]
315         r.close()
316         
317         #delete versions
318         s = self.versions.delete().where(where_clause)
319         r = self.conn.execute(s)
320         r.close()
321         
322         #delete nodes
323         s = select([self.nodes.c.node],
324             and_(self.nodes.c.node == node,
325                  select([func.count(self.versions.c.serial)],
326                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
327         r = self.conn.execute(s)
328         nodes = r.fetchall()
329         r.close()
330         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
331         self.conn.execute(s).close()
332         
333         return serials
334     
335     def node_remove(self, node):
336         """Remove the node specified.
337            Return false if the node has children or is not found.
338         """
339         
340         if self.node_count_children(node):
341             return False
342         
343         mtime = time()
344         s = select([func.count(self.versions.c.serial),
345                     func.sum(self.versions.c.size),
346                     self.versions.c.cluster])
347         s = s.where(self.versions.c.node == node)
348         s = s.group_by(self.versions.c.cluster)
349         r = self.conn.execute(s)
350         for population, size, cluster in r.fetchall():
351             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
352         r.close()
353         
354         s = self.nodes.delete().where(self.nodes.c.node == node)
355         self.conn.execute(s).close()
356         return True
357     
358     def statistics_get(self, node, cluster=0):
359         """Return population, total size and last mtime
360            for all versions under node that belong to the cluster.
361         """
362         
363         s = select([self.statistics.c.population,
364                     self.statistics.c.size,
365                     self.statistics.c.mtime])
366         s = s.where(and_(self.statistics.c.node == node,
367                          self.statistics.c.cluster == cluster))
368         r = self.conn.execute(s)
369         row = r.fetchone()
370         r.close()
371         return row
372     
373     def statistics_update(self, node, population, size, mtime, cluster=0):
374         """Update the statistics of the given node.
375            Statistics keep track the population, total
376            size of objects and mtime in the node's namespace.
377            May be zero or positive or negative numbers.
378         """
379         
380         s = select([self.statistics.c.population, self.statistics.c.size],
381             and_(self.statistics.c.node == node,
382                  self.statistics.c.cluster == cluster))
383         rp = self.conn.execute(s)
384         r = rp.fetchone()
385         rp.close()
386         if not r:
387             prepopulation, presize = (0, 0)
388         else:
389             prepopulation, presize = r
390         population += prepopulation
391         size += presize
392         
393         #insert or replace
394         #TODO better upsert
395         u = self.statistics.update().where(and_(self.statistics.c.node==node,
396                                            self.statistics.c.cluster==cluster))
397         u = u.values(population=population, size=size, mtime=mtime)
398         rp = self.conn.execute(u)
399         rp.close()
400         if rp.rowcount == 0:
401             ins = self.statistics.insert()
402             ins = ins.values(node=node, population=population, size=size,
403                              mtime=mtime, cluster=cluster)
404             self.conn.execute(ins).close()
405     
406     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
407         """Update the statistics of the given node's parent.
408            Then recursively update all parents up to the root.
409            Population is not recursive.
410         """
411         
412         while True:
413             if node == ROOTNODE:
414                 break
415             props = self.node_get_properties(node)
416             if props is None:
417                 break
418             parent, path = props
419             self.statistics_update(parent, population, size, mtime, cluster)
420             node = parent
421             population = 0 # Population isn't recursive
422     
423     def statistics_latest(self, node, before=inf, except_cluster=0):
424         """Return population, total size and last mtime
425            for all latest versions under node that
426            do not belong to the cluster.
427         """
428         
429         # The node.
430         props = self.node_get_properties(node)
431         if props is None:
432             return None
433         parent, path = props
434         
435         # The latest version.
436         s = select([self.versions.c.serial,
437                     self.versions.c.node,
438                     self.versions.c.size,
439                     self.versions.c.source,
440                     self.versions.c.mtime,
441                     self.versions.c.muser,
442                     self.versions.c.cluster])
443         s = s.where(and_(self.versions.c.cluster != except_cluster,
444                          self.versions.c.serial == select(
445                             [func.max(self.versions.c.serial)],
446                             and_(self.versions.c.node == node,
447                             self.versions.c.mtime < before))))
448         r = self.conn.execute(s)
449         props = r.fetchone()
450         r.close()
451         if not props:
452             return None
453         mtime = props[MTIME]
454         
455         # First level, just under node (get population).
456         v = self.versions.alias('v')
457         s = select([func.count(v.c.serial),
458                     func.sum(v.c.size),
459                     func.max(v.c.mtime)])
460         c1 = select([func.max(self.versions.c.serial)],
461             and_(self.versions.c.node == v.c.node,
462                  self.versions.c.mtime < before))
463         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
464         s = s.where(and_(v.c.serial == c1,
465                          v.c.cluster != except_cluster,
466                          v.c.node.in_(c2)))
467         rp = self.conn.execute(s)
468         r = rp.fetchone()
469         rp.close()
470         if not r:
471             return None
472         count = r[0]
473         mtime = max(mtime, r[2])
474         if count == 0:
475             return (0, 0, mtime)
476         
477         # All children (get size and mtime).
478         # XXX: This is why the full path is stored.
479         s = select([func.count(v.c.serial),
480                     func.sum(v.c.size),
481                     func.max(v.c.mtime)])
482         c1 = select([func.max(self.versions.c.serial)],
483             and_(self.versions.c.node == v.c.node,
484                  self.versions.c.mtime < before))
485         c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
486         s = s.where(and_(v.c.serial == c1,
487                          v.c.cluster != except_cluster,
488                          v.c.node.in_(c2)))
489         rp = self.conn.execute(s)
490         r = rp.fetchone()
491         rp.close()
492         if not r:
493             return None
494         size = r[1] - props[SIZE]
495         mtime = max(mtime, r[2])
496         return (count, size, mtime)
497     
498     def version_create(self, node, size, source, muser, cluster=0):
499         """Create a new version from the given properties.
500            Return the (serial, mtime) of the new version.
501         """
502         
503         mtime = time()
504         props = (node, size, source, mtime, muser, cluster)
505         props = locals()
506         props.pop('self')
507         s = self.versions.insert().values(**props)
508         serial = self.conn.execute(s).inserted_primary_key[0]
509         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
510         return serial, mtime
511     
512     def version_lookup(self, node, before=inf, cluster=0):
513         """Lookup the current version of the given node.
514            Return a list with its properties:
515            (serial, node, size, source, mtime, muser, cluster)
516            or None if the current version is not found in the given cluster.
517         """
518         
519         v = self.versions.alias('v')
520         s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
521                     v.c.muser, v.c.cluster])
522         c = select([func.max(self.versions.c.serial)],
523             and_(self.versions.c.node == node,
524                  self.versions.c.mtime < before))
525         s = s.where(and_(v.c.serial == c,
526                          v.c.cluster == cluster))
527         r = self.conn.execute(s)
528         props = r.fetchone()
529         r.close()
530         if props:
531             return props
532         return None
533     
534     def version_get_properties(self, serial, keys=(), propnames=_propnames):
535         """Return a sequence of values for the properties of
536            the version specified by serial and the keys, in the order given.
537            If keys is empty, return all properties in the order
538            (serial, node, size, source, mtime, muser, cluster).
539         """
540         
541         v = self.versions.alias()
542         s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
543                    v.c.muser, v.c.cluster], v.c.serial == serial)
544         rp = self.conn.execute(s)
545         r = rp.fetchone()
546         rp.close()
547         if r is None:
548             return r
549         
550         if not keys:
551             return r
552         return [r[propnames[k]] for k in keys if k in propnames]
553     
554     def version_recluster(self, serial, cluster):
555         """Move the version into another cluster."""
556         
557         props = self.version_get_properties(serial)
558         if not props:
559             return
560         node = props[NODE]
561         size = props[SIZE]
562         oldcluster = props[CLUSTER]
563         if cluster == oldcluster:
564             return
565         
566         mtime = time()
567         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
568         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
569         
570         s = self.versions.update()
571         s = s.where(self.versions.c.serial == serial)
572         s = s.values(cluster = cluster)
573         self.conn.execute(s).close()
574     
575     def version_remove(self, serial):
576         """Remove the serial specified."""
577         
578         props = self.node_get_properties(serial)
579         if not props:
580             return
581         node = props[NODE]
582         size = props[SIZE]
583         cluster = props[CLUSTER]
584         
585         mtime = time()
586         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
587         
588         s = self.versions.delete().where(self.versions.c.serial == serial)
589         self.conn.execute(s).close()
590         return True
591     
592     def attribute_get(self, serial, keys=()):
593         """Return a list of (key, value) pairs of the version specified by serial.
594            If keys is empty, return all attributes.
595            Othwerise, return only those specified.
596         """
597         
598         if keys:
599             attrs = self.attributes.alias()
600             s = select([attrs.c.key, attrs.c.value])
601             s = s.where(and_(attrs.c.key.in_(keys),
602                              attrs.c.serial == serial))
603         else:
604             attrs = self.attributes.alias()
605             s = select([attrs.c.key, attrs.c.value])
606             s = s.where(attrs.c.serial == serial)
607         r = self.conn.execute(s)
608         l = r.fetchall()
609         r.close()
610         return l
611     
612     def attribute_set(self, serial, items):
613         """Set the attributes of the version specified by serial.
614            Receive attributes as an iterable of (key, value) pairs.
615         """
616         #insert or replace
617         #TODO better upsert
618         for k, v in items:
619             s = self.attributes.update()
620             s = s.where(and_(self.attributes.c.serial == serial,
621                              self.attributes.c.key == k))
622             s = s.values(value = v)
623             rp = self.conn.execute(s)
624             rp.close()
625             if rp.rowcount == 0:
626                 s = self.attributes.insert()
627                 s = s.values(serial=serial, key=k, value=v)
628                 self.conn.execute(s).close()
629     
630     def attribute_del(self, serial, keys=()):
631         """Delete attributes of the version specified by serial.
632            If keys is empty, delete all attributes.
633            Otherwise delete those specified.
634         """
635         
636         if keys:
637             #TODO more efficient way to do this?
638             for key in keys:
639                 s = self.attributes.delete()
640                 s = s.where(and_(self.attributes.c.serial == serial,
641                                  self.attributes.c.key == key))
642                 self.conn.execute(s).close()
643         else:
644             s = self.attributes.delete()
645             s = s.where(self.attributes.c.serial == serial)
646             self.conn.execute(s).close()
647     
648     def attribute_copy(self, source, dest):
649         s = select([dest, self.attributes.c.key, self.attributes.c.value],
650             self.attributes.c.serial == source)
651         rp = self.conn.execute(s)
652         attributes = rp.fetchall()
653         rp.close()
654         for dest, k, v in attributes:
655             #insert or replace
656             s = self.attributes.update().where(and_(
657                 self.attributes.c.serial == dest,
658                 self.attributes.c.key == k))
659             rp = self.conn.execute(s, value=v)
660             rp.close()
661             if rp.rowcount == 0:
662                 s = self.attributes.insert()
663                 values = {'serial':dest, 'key':k, 'value':v}
664                 self.conn.execute(s, values).close()
665     
666     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
667         """Return a list with all keys pairs defined
668            for all latest versions under parent that
669            do not belong to the cluster.
670         """
671         
672         # TODO: Use another table to store before=inf results.
673         a = self.attributes.alias('a')
674         v = self.versions.alias('v')
675         n = self.nodes.alias('n')
676         s = select([a.c.key]).distinct()
677         s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
678                                           and_(self.versions.c.node == v.c.node,
679                                                self.versions.c.mtime < before)))
680         s = s.where(v.c.cluster != except_cluster)
681         s = s.where(v.c.node.in_(select([self.nodes.c.node],
682             self.nodes.c.parent == parent)))
683         s = s.where(a.c.serial == v.c.serial)
684         s = s.where(n.c.node == v.c.node)
685         conj = []
686         for x in pathq:
687             conj.append(n.c.path.like(x + '%'))
688         if conj:
689             s = s.where(or_(*conj))
690         rp = self.conn.execute(s)
691         rows = rp.fetchall()
692         rp.close()
693         return [r[0] for r in rows]
694     
695     def latest_version_list(self, parent, prefix='', delimiter=None,
696                             start='', limit=10000, before=inf,
697                             except_cluster=0, pathq=[], filterq=None):
698         """Return a (list of (path, serial) tuples, list of common prefixes)
699            for the current versions of the paths with the given parent,
700            matching the following criteria.
701            
702            The property tuple for a version is returned if all
703            of these conditions are true:
704                 
705                 a. parent matches
706                 
707                 b. path > start
708                 
709                 c. path starts with prefix (and paths in pathq)
710                 
711                 d. version is the max up to before
712                 
713                 e. version is not in cluster
714                 
715                 f. the path does not have the delimiter occuring
716                    after the prefix, or ends with the delimiter
717                 
718                 g. serial matches the attribute filter query.
719                    
720                    A filter query is a comma-separated list of
721                    terms in one of these three forms:
722                    
723                    key
724                        an attribute with this key must exist
725                    
726                    !key
727                        an attribute with this key must not exist
728                    
729                    key ?op value
730                        the attribute with this key satisfies the value
731                        where ?op is one of ==, != <=, >=, <, >.
732            
733            The list of common prefixes includes the prefixes
734            matching up to the first delimiter after prefix,
735            and are reported only once, as "virtual directories".
736            The delimiter is included in the prefixes.
737            
738            If arguments are None, then the corresponding matching rule
739            will always match.
740            
741            Limit applies to the first list of tuples returned.
742         """
743         
744         if not start or start < prefix:
745             start = strprevling(prefix)
746         nextling = strnextling(prefix)
747         
748         a = self.attributes.alias('a')
749         v = self.versions.alias('v')
750         n = self.nodes.alias('n')
751         s = select([n.c.path, v.c.serial]).distinct()
752         s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
753             and_(self.versions.c.node == v.c.node,
754                  self.versions.c.mtime < before)))
755         s = s.where(v.c.cluster != except_cluster)
756         s = s.where(v.c.node.in_(select([self.nodes.c.node],
757             self.nodes.c.parent == parent)))
758         if filterq:
759             s = s.where(a.c.serial == v.c.serial)
760         
761         s = s.where(n.c.node == v.c.node)
762         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
763         conj = []
764         for x in pathq:
765             conj.append(n.c.path.like(x + '%'))
766         
767         if conj:
768             s = s.where(or_(*conj))
769         
770         if filterq:
771             s = s.where(a.c.key.in_(filterq.split(',')))
772         
773         s = s.order_by(n.c.path)
774         
775         if not delimiter:
776             s = s.limit(limit)
777             rp = self.conn.execute(s, start=start)
778             r = rp.fetchall()
779             rp.close()
780             return r, ()
781         
782         pfz = len(prefix)
783         dz = len(delimiter)
784         count = 0
785         prefixes = []
786         pappend = prefixes.append
787         matches = []
788         mappend = matches.append
789         
790         rp = self.conn.execute(s, start=start)
791         while True:
792             props = rp.fetchone()
793             if props is None:
794                 break
795             path, serial = props
796             idx = path.find(delimiter, pfz)
797             
798             if idx < 0:
799                 mappend(props)
800                 count += 1
801                 if count >= limit:
802                     break
803                 continue
804             
805             if idx + dz == len(path):
806                 mappend(props)
807                 count += 1
808                 continue # Get one more, in case there is a path.
809             pf = path[:idx + dz]
810             pappend(pf)
811             if count >= limit: 
812                 break
813             
814             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
815         rp.close()
816         
817         return matches, prefixes