Save hash maps like blocks - based on their hash.
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, Float, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
39
40 from dbworker import DBWorker
41
42 ROOTNODE  = 0
43
44 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
45
46 inf = float('inf')
47
48
49 def strnextling(prefix):
50     """Return the first unicode string
51        greater than but not starting with given prefix.
52        strnextling('hello') -> 'hellp'
53     """
54     if not prefix:
55         ## all strings start with the null string,
56         ## therefore we have to approximate strnextling('')
57         ## with the last unicode character supported by python
58         ## 0x10ffff for wide (32-bit unicode) python builds
59         ## 0x00ffff for narrow (16-bit unicode) python builds
60         ## We will not autodetect. 0xffff is safe enough.
61         return unichr(0xffff)
62     s = prefix[:-1]
63     c = ord(prefix[-1])
64     if c >= 0xffff:
65         raise RuntimeError
66     s += unichr(c+1)
67     return s
68
69 def strprevling(prefix):
70     """Return an approximation of the last unicode string
71        less than but not starting with given prefix.
72        strprevling(u'hello') -> u'helln\\xffff'
73     """
74     if not prefix:
75         ## There is no prevling for the null string
76         return prefix
77     s = prefix[:-1]
78     c = ord(prefix[-1])
79     if c > 0:
80         #s += unichr(c-1) + unichr(0xffff)
81         s += unichr(c-1)
82     return s
83
84
85 _propnames = {
86     'serial'    : 0,
87     'node'      : 1,
88     'hash'      : 2,
89     'size'      : 3,
90     'source'    : 4,
91     'mtime'     : 5,
92     'muser'     : 6,
93     'cluster'   : 7,
94 }
95
96
97 class Node(DBWorker):
98     """Nodes store path organization and have multiple versions.
99        Versions store object history and have multiple attributes.
100        Attributes store metadata.
101     """
102     
103     # TODO: Provide an interface for included and excluded clusters.
104     
105     def __init__(self, **params):
106         DBWorker.__init__(self, **params)
107         metadata = MetaData()
108         
109         #create nodes table
110         columns=[]
111         columns.append(Column('node', Integer, primary_key=True))
112         columns.append(Column('parent', Integer,
113                               ForeignKey('nodes.node',
114                                          ondelete='CASCADE',
115                                          onupdate='CASCADE'),
116                               autoincrement=False))
117         columns.append(Column('path', String(2048), default='', nullable=False))
118         self.nodes = Table('nodes', metadata, *columns)
119         # place an index on path
120         Index('idx_nodes_path', self.nodes.c.path, unique=True)
121         
122         #create statistics table
123         columns=[]
124         columns.append(Column('node', Integer,
125                               ForeignKey('nodes.node',
126                                          ondelete='CASCADE',
127                                          onupdate='CASCADE'),
128                               primary_key=True))
129         columns.append(Column('population', Integer, nullable=False, default=0))
130         columns.append(Column('size', BigInteger, nullable=False, default=0))
131         columns.append(Column('mtime', Float))
132         columns.append(Column('cluster', Integer, nullable=False, default=0,
133                               primary_key=True))
134         self.statistics = Table('statistics', metadata, *columns)
135         
136         #create versions table
137         columns=[]
138         columns.append(Column('serial', Integer, primary_key=True))
139         columns.append(Column('node', Integer,
140                               ForeignKey('nodes.node',
141                                          ondelete='CASCADE',
142                                          onupdate='CASCADE')))
143         columns.append(Column('hash', String(255)))
144         columns.append(Column('size', BigInteger, nullable=False, default=0))
145         columns.append(Column('source', Integer))
146         columns.append(Column('mtime', Float))
147         columns.append(Column('muser', String(255), nullable=False, default=''))
148         columns.append(Column('cluster', Integer, nullable=False, default=0))
149         self.versions = Table('versions', metadata, *columns)
150         Index('idx_versions_node_mtime', self.versions.c.node,
151               self.versions.c.mtime)
152         
153         #create attributes table
154         columns = []
155         columns.append(Column('serial', Integer,
156                               ForeignKey('versions.serial',
157                                          ondelete='CASCADE',
158                                          onupdate='CASCADE'),
159                               primary_key=True))
160         columns.append(Column('key', String(255), primary_key=True))
161         columns.append(Column('value', String(255)))
162         self.attributes = Table('attributes', metadata, *columns)
163         
164         metadata.create_all(self.engine)
165         
166         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
167                                            self.nodes.c.parent == ROOTNODE))
168         rp = self.conn.execute(s)
169         r = rp.fetchone()
170         rp.close()
171         if not r:
172             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
173             self.conn.execute(s)
174     
175     def node_create(self, parent, path):
176         """Create a new node from the given properties.
177            Return the node identifier of the new node.
178         """
179         #TODO catch IntegrityError?
180         s = self.nodes.insert().values(parent=parent, path=path)
181         r = self.conn.execute(s)
182         inserted_primary_key = r.inserted_primary_key[0]
183         r.close()
184         return inserted_primary_key
185     
186     def node_lookup(self, path):
187         """Lookup the current node of the given path.
188            Return None if the path is not found.
189         """
190         
191         s = select([self.nodes.c.node], self.nodes.c.path == path)
192         r = self.conn.execute(s)
193         row = r.fetchone()
194         r.close()
195         if row:
196             return row[0]
197         return None
198     
199     def node_get_properties(self, node):
200         """Return the node's (parent, path).
201            Return None if the node is not found.
202         """
203         
204         s = select([self.nodes.c.parent, self.nodes.c.path])
205         s = s.where(self.nodes.c.node == node)
206         r = self.conn.execute(s)
207         l = r.fetchone()
208         r.close()
209         return l
210     
211     def node_get_versions(self, node, keys=(), propnames=_propnames):
212         """Return the properties of all versions at node.
213            If keys is empty, return all properties in the order
214            (serial, node, size, source, mtime, muser, cluster).
215         """
216         
217         s = select([self.versions.c.serial,
218                     self.versions.c.node,
219                     self.versions.c.hash,
220                     self.versions.c.size,
221                     self.versions.c.source,
222                     self.versions.c.mtime,
223                     self.versions.c.muser,
224                     self.versions.c.cluster], self.versions.c.node == node)
225         s = s.order_by(self.versions.c.serial)
226         r = self.conn.execute(s)
227         rows = r.fetchall()
228         r.close()
229         if not rows:
230             return rows
231         
232         if not keys:
233             return rows
234         
235         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
236     
237     def node_count_children(self, node):
238         """Return node's child count."""
239         
240         s = select([func.count(self.nodes.c.node)])
241         s = s.where(and_(self.nodes.c.parent == node,
242                          self.nodes.c.node != ROOTNODE))
243         r = self.conn.execute(s)
244         row = r.fetchone()
245         r.close()
246         return row[0]
247     
248     def node_purge_children(self, parent, before=inf, cluster=0):
249         """Delete all versions with the specified
250            parent and cluster, and return
251            the serials of versions deleted.
252            Clears out nodes with no remaining versions.
253         """
254         #update statistics
255         c1 = select([self.nodes.c.node],
256             self.nodes.c.parent == parent)
257         where_clause = and_(self.versions.c.node.in_(c1),
258                             self.versions.c.cluster == cluster,
259                             self.versions.c.mtime <= before)
260         s = select([func.count(self.versions.c.serial),
261                     func.sum(self.versions.c.size)])
262         s = s.where(where_clause)
263         r = self.conn.execute(s)
264         row = r.fetchone()
265         r.close()
266         if not row:
267             return ()
268         nr, size = row[0], -row[1] if row[1] else 0
269         mtime = time()
270         self.statistics_update(parent, -nr, size, mtime, cluster)
271         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
272         
273         s = select([self.versions.c.serial])
274         s = s.where(where_clause)
275         r = self.conn.execute(s)
276         serials = [row[SERIAL] for row in r.fetchall()]
277         r.close()
278         
279         #delete versions
280         s = self.versions.delete().where(where_clause)
281         r = self.conn.execute(s)
282         r.close()
283         
284         #delete nodes
285         s = select([self.nodes.c.node],
286             and_(self.nodes.c.parent == parent,
287                  select([func.count(self.versions.c.serial)],
288                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
289         rp = self.conn.execute(s)
290         nodes = [r[0] for r in rp.fetchall()]
291         rp.close()
292         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
293         self.conn.execute(s).close()
294         
295         return serials
296     
297     def node_purge(self, node, before=inf, cluster=0):
298         """Delete all versions with the specified
299            node and cluster, and return
300            the serials of versions deleted.
301            Clears out the node if it has no remaining versions.
302         """
303         
304         #update statistics
305         s = select([func.count(self.versions.c.serial),
306                     func.sum(self.versions.c.size)])
307         where_clause = and_(self.versions.c.node == node,
308                          self.versions.c.cluster == cluster,
309                          self.versions.c.mtime <= before)
310         s = s.where(where_clause)
311         r = self.conn.execute(s)
312         row = r.fetchone()
313         nr, size = row[0], row[1]
314         r.close()
315         if not nr:
316             return ()
317         mtime = time()
318         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
319         
320         s = select([self.versions.c.serial])
321         s = s.where(where_clause)
322         r = self.conn.execute(s)
323         serials = [r[SERIAL] for r in r.fetchall()]
324         r.close()
325         
326         #delete versions
327         s = self.versions.delete().where(where_clause)
328         r = self.conn.execute(s)
329         r.close()
330         
331         #delete nodes
332         s = select([self.nodes.c.node],
333             and_(self.nodes.c.node == node,
334                  select([func.count(self.versions.c.serial)],
335                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
336         r = self.conn.execute(s)
337         nodes = r.fetchall()
338         r.close()
339         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
340         self.conn.execute(s).close()
341         
342         return serials
343     
344     def node_remove(self, node):
345         """Remove the node specified.
346            Return false if the node has children or is not found.
347         """
348         
349         if self.node_count_children(node):
350             return False
351         
352         mtime = time()
353         s = select([func.count(self.versions.c.serial),
354                     func.sum(self.versions.c.size),
355                     self.versions.c.cluster])
356         s = s.where(self.versions.c.node == node)
357         s = s.group_by(self.versions.c.cluster)
358         r = self.conn.execute(s)
359         for population, size, cluster in r.fetchall():
360             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
361         r.close()
362         
363         s = self.nodes.delete().where(self.nodes.c.node == node)
364         self.conn.execute(s).close()
365         return True
366     
367     def statistics_get(self, node, cluster=0):
368         """Return population, total size and last mtime
369            for all versions under node that belong to the cluster.
370         """
371         
372         s = select([self.statistics.c.population,
373                     self.statistics.c.size,
374                     self.statistics.c.mtime])
375         s = s.where(and_(self.statistics.c.node == node,
376                          self.statistics.c.cluster == cluster))
377         r = self.conn.execute(s)
378         row = r.fetchone()
379         r.close()
380         return row
381     
382     def statistics_update(self, node, population, size, mtime, cluster=0):
383         """Update the statistics of the given node.
384            Statistics keep track the population, total
385            size of objects and mtime in the node's namespace.
386            May be zero or positive or negative numbers.
387         """
388         
389         s = select([self.statistics.c.population, self.statistics.c.size],
390             and_(self.statistics.c.node == node,
391                  self.statistics.c.cluster == cluster))
392         rp = self.conn.execute(s)
393         r = rp.fetchone()
394         rp.close()
395         if not r:
396             prepopulation, presize = (0, 0)
397         else:
398             prepopulation, presize = r
399         population += prepopulation
400         size += presize
401         
402         #insert or replace
403         #TODO better upsert
404         u = self.statistics.update().where(and_(self.statistics.c.node==node,
405                                            self.statistics.c.cluster==cluster))
406         u = u.values(population=population, size=size, mtime=mtime)
407         rp = self.conn.execute(u)
408         rp.close()
409         if rp.rowcount == 0:
410             ins = self.statistics.insert()
411             ins = ins.values(node=node, population=population, size=size,
412                              mtime=mtime, cluster=cluster)
413             self.conn.execute(ins).close()
414     
415     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
416         """Update the statistics of the given node's parent.
417            Then recursively update all parents up to the root.
418            Population is not recursive.
419         """
420         
421         while True:
422             if node == ROOTNODE:
423                 break
424             props = self.node_get_properties(node)
425             if props is None:
426                 break
427             parent, path = props
428             self.statistics_update(parent, population, size, mtime, cluster)
429             node = parent
430             population = 0 # Population isn't recursive
431     
432     def statistics_latest(self, node, before=inf, except_cluster=0):
433         """Return population, total size and last mtime
434            for all latest versions under node that
435            do not belong to the cluster.
436         """
437         
438         # The node.
439         props = self.node_get_properties(node)
440         if props is None:
441             return None
442         parent, path = props
443         
444         # The latest version.
445         s = select([self.versions.c.serial,
446                     self.versions.c.node,
447                     self.versions.c.hash,
448                     self.versions.c.size,
449                     self.versions.c.source,
450                     self.versions.c.mtime,
451                     self.versions.c.muser,
452                     self.versions.c.cluster])
453         s = s.where(and_(self.versions.c.cluster != except_cluster,
454                          self.versions.c.serial == select(
455                             [func.max(self.versions.c.serial)],
456                             and_(self.versions.c.node == node,
457                             self.versions.c.mtime < before))))
458         r = self.conn.execute(s)
459         props = r.fetchone()
460         r.close()
461         if not props:
462             return None
463         mtime = props[MTIME]
464         
465         # First level, just under node (get population).
466         v = self.versions.alias('v')
467         s = select([func.count(v.c.serial),
468                     func.sum(v.c.size),
469                     func.max(v.c.mtime)])
470         c1 = select([func.max(self.versions.c.serial)],
471             and_(self.versions.c.node == v.c.node,
472                  self.versions.c.mtime < before))
473         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
474         s = s.where(and_(v.c.serial == c1,
475                          v.c.cluster != except_cluster,
476                          v.c.node.in_(c2)))
477         rp = self.conn.execute(s)
478         r = rp.fetchone()
479         rp.close()
480         if not r:
481             return None
482         count = r[0]
483         mtime = max(mtime, r[2])
484         if count == 0:
485             return (0, 0, mtime)
486         
487         # All children (get size and mtime).
488         # XXX: This is why the full path is stored.
489         s = select([func.count(v.c.serial),
490                     func.sum(v.c.size),
491                     func.max(v.c.mtime)])
492         c1 = select([func.max(self.versions.c.serial)],
493             and_(self.versions.c.node == v.c.node,
494                  self.versions.c.mtime < before))
495         c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
496         s = s.where(and_(v.c.serial == c1,
497                          v.c.cluster != except_cluster,
498                          v.c.node.in_(c2)))
499         rp = self.conn.execute(s)
500         r = rp.fetchone()
501         rp.close()
502         if not r:
503             return None
504         size = r[1] - props[SIZE]
505         mtime = max(mtime, r[2])
506         return (count, size, mtime)
507     
508     def version_create(self, node, hash, size, source, muser, cluster=0):
509         """Create a new version from the given properties.
510            Return the (serial, mtime) of the new version.
511         """
512         
513         mtime = time()
514         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
515                                           mtime=mtime, muser=muser, cluster=cluster)
516         serial = self.conn.execute(s).inserted_primary_key[0]
517         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
518         return serial, mtime
519     
520     def version_lookup(self, node, before=inf, cluster=0):
521         """Lookup the current version of the given node.
522            Return a list with its properties:
523            (serial, node, hash, size, source, mtime, muser, cluster)
524            or None if the current version is not found in the given cluster.
525         """
526         
527         v = self.versions.alias('v')
528         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
529                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
530         c = select([func.max(self.versions.c.serial)],
531             and_(self.versions.c.node == node,
532                  self.versions.c.mtime < before))
533         s = s.where(and_(v.c.serial == c,
534                          v.c.cluster == cluster))
535         r = self.conn.execute(s)
536         props = r.fetchone()
537         r.close()
538         if props:
539             return props
540         return None
541     
542     def version_get_properties(self, serial, keys=(), propnames=_propnames):
543         """Return a sequence of values for the properties of
544            the version specified by serial and the keys, in the order given.
545            If keys is empty, return all properties in the order
546            (serial, node, hash, size, source, mtime, muser, cluster).
547         """
548         
549         v = self.versions.alias()
550         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
551                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
552         rp = self.conn.execute(s)
553         r = rp.fetchone()
554         rp.close()
555         if r is None:
556             return r
557         
558         if not keys:
559             return r
560         return [r[propnames[k]] for k in keys if k in propnames]
561     
562     def version_recluster(self, serial, cluster):
563         """Move the version into another cluster."""
564         
565         props = self.version_get_properties(serial)
566         if not props:
567             return
568         node = props[NODE]
569         size = props[SIZE]
570         oldcluster = props[CLUSTER]
571         if cluster == oldcluster:
572             return
573         
574         mtime = time()
575         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
576         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
577         
578         s = self.versions.update()
579         s = s.where(self.versions.c.serial == serial)
580         s = s.values(cluster = cluster)
581         self.conn.execute(s).close()
582     
583     def version_remove(self, serial):
584         """Remove the serial specified."""
585         
586         props = self.node_get_properties(serial)
587         if not props:
588             return
589         node = props[NODE]
590         size = props[SIZE]
591         cluster = props[CLUSTER]
592         
593         mtime = time()
594         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
595         
596         s = self.versions.delete().where(self.versions.c.serial == serial)
597         self.conn.execute(s).close()
598         return True
599     
600     def attribute_get(self, serial, keys=()):
601         """Return a list of (key, value) pairs of the version specified by serial.
602            If keys is empty, return all attributes.
603            Othwerise, return only those specified.
604         """
605         
606         if keys:
607             attrs = self.attributes.alias()
608             s = select([attrs.c.key, attrs.c.value])
609             s = s.where(and_(attrs.c.key.in_(keys),
610                              attrs.c.serial == serial))
611         else:
612             attrs = self.attributes.alias()
613             s = select([attrs.c.key, attrs.c.value])
614             s = s.where(attrs.c.serial == serial)
615         r = self.conn.execute(s)
616         l = r.fetchall()
617         r.close()
618         return l
619     
620     def attribute_set(self, serial, items):
621         """Set the attributes of the version specified by serial.
622            Receive attributes as an iterable of (key, value) pairs.
623         """
624         #insert or replace
625         #TODO better upsert
626         for k, v in items:
627             s = self.attributes.update()
628             s = s.where(and_(self.attributes.c.serial == serial,
629                              self.attributes.c.key == k))
630             s = s.values(value = v)
631             rp = self.conn.execute(s)
632             rp.close()
633             if rp.rowcount == 0:
634                 s = self.attributes.insert()
635                 s = s.values(serial=serial, key=k, value=v)
636                 self.conn.execute(s).close()
637     
638     def attribute_del(self, serial, keys=()):
639         """Delete attributes of the version specified by serial.
640            If keys is empty, delete all attributes.
641            Otherwise delete those specified.
642         """
643         
644         if keys:
645             #TODO more efficient way to do this?
646             for key in keys:
647                 s = self.attributes.delete()
648                 s = s.where(and_(self.attributes.c.serial == serial,
649                                  self.attributes.c.key == key))
650                 self.conn.execute(s).close()
651         else:
652             s = self.attributes.delete()
653             s = s.where(self.attributes.c.serial == serial)
654             self.conn.execute(s).close()
655     
656     def attribute_copy(self, source, dest):
657         s = select([dest, self.attributes.c.key, self.attributes.c.value],
658             self.attributes.c.serial == source)
659         rp = self.conn.execute(s)
660         attributes = rp.fetchall()
661         rp.close()
662         for dest, k, v in attributes:
663             #insert or replace
664             s = self.attributes.update().where(and_(
665                 self.attributes.c.serial == dest,
666                 self.attributes.c.key == k))
667             rp = self.conn.execute(s, value=v)
668             rp.close()
669             if rp.rowcount == 0:
670                 s = self.attributes.insert()
671                 values = {'serial':dest, 'key':k, 'value':v}
672                 self.conn.execute(s, values).close()
673     
674     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
675         """Return a list with all keys pairs defined
676            for all latest versions under parent that
677            do not belong to the cluster.
678         """
679         
680         # TODO: Use another table to store before=inf results.
681         a = self.attributes.alias('a')
682         v = self.versions.alias('v')
683         n = self.nodes.alias('n')
684         s = select([a.c.key]).distinct()
685         s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
686                                           and_(self.versions.c.node == v.c.node,
687                                                self.versions.c.mtime < before)))
688         s = s.where(v.c.cluster != except_cluster)
689         s = s.where(v.c.node.in_(select([self.nodes.c.node],
690             self.nodes.c.parent == parent)))
691         s = s.where(a.c.serial == v.c.serial)
692         s = s.where(n.c.node == v.c.node)
693         conj = []
694         for x in pathq:
695             conj.append(n.c.path.like(x + '%'))
696         if conj:
697             s = s.where(or_(*conj))
698         rp = self.conn.execute(s)
699         rows = rp.fetchall()
700         rp.close()
701         return [r[0] for r in rows]
702     
703     def latest_version_list(self, parent, prefix='', delimiter=None,
704                             start='', limit=10000, before=inf,
705                             except_cluster=0, pathq=[], filterq=None):
706         """Return a (list of (path, serial) tuples, list of common prefixes)
707            for the current versions of the paths with the given parent,
708            matching the following criteria.
709            
710            The property tuple for a version is returned if all
711            of these conditions are true:
712                 
713                 a. parent matches
714                 
715                 b. path > start
716                 
717                 c. path starts with prefix (and paths in pathq)
718                 
719                 d. version is the max up to before
720                 
721                 e. version is not in cluster
722                 
723                 f. the path does not have the delimiter occuring
724                    after the prefix, or ends with the delimiter
725                 
726                 g. serial matches the attribute filter query.
727                    
728                    A filter query is a comma-separated list of
729                    terms in one of these three forms:
730                    
731                    key
732                        an attribute with this key must exist
733                    
734                    !key
735                        an attribute with this key must not exist
736                    
737                    key ?op value
738                        the attribute with this key satisfies the value
739                        where ?op is one of ==, != <=, >=, <, >.
740            
741            The list of common prefixes includes the prefixes
742            matching up to the first delimiter after prefix,
743            and are reported only once, as "virtual directories".
744            The delimiter is included in the prefixes.
745            
746            If arguments are None, then the corresponding matching rule
747            will always match.
748            
749            Limit applies to the first list of tuples returned.
750         """
751         
752         if not start or start < prefix:
753             start = strprevling(prefix)
754         nextling = strnextling(prefix)
755         
756         a = self.attributes.alias('a')
757         v = self.versions.alias('v')
758         n = self.nodes.alias('n')
759         s = select([n.c.path, v.c.serial]).distinct()
760         s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
761             and_(self.versions.c.node == v.c.node,
762                  self.versions.c.mtime < before)))
763         s = s.where(v.c.cluster != except_cluster)
764         s = s.where(v.c.node.in_(select([self.nodes.c.node],
765             self.nodes.c.parent == parent)))
766         if filterq:
767             s = s.where(a.c.serial == v.c.serial)
768         
769         s = s.where(n.c.node == v.c.node)
770         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
771         conj = []
772         for x in pathq:
773             conj.append(n.c.path.like(x + '%'))
774         
775         if conj:
776             s = s.where(or_(*conj))
777         
778         if filterq:
779             s = s.where(a.c.key.in_(filterq.split(',')))
780         
781         s = s.order_by(n.c.path)
782         
783         if not delimiter:
784             s = s.limit(limit)
785             rp = self.conn.execute(s, start=start)
786             r = rp.fetchall()
787             rp.close()
788             return r, ()
789         
790         pfz = len(prefix)
791         dz = len(delimiter)
792         count = 0
793         prefixes = []
794         pappend = prefixes.append
795         matches = []
796         mappend = matches.append
797         
798         rp = self.conn.execute(s, start=start)
799         while True:
800             props = rp.fetchone()
801             if props is None:
802                 break
803             path, serial = props
804             idx = path.find(delimiter, pfz)
805             
806             if idx < 0:
807                 mappend(props)
808                 count += 1
809                 if count >= limit:
810                     break
811                 continue
812             
813             if idx + dz == len(path):
814                 mappend(props)
815                 count += 1
816                 continue # Get one more, in case there is a path.
817             pf = path[:idx + dz]
818             pappend(pf)
819             if count >= limit: 
820                 break
821             
822             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
823         rp.close()
824         
825         return matches, prefixes