Merge branch 'master' of https://code.grnet.gr/git/pithos
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
39
40 from dbworker import DBWorker
41
42 ROOTNODE  = 0
43
44 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
45
46 inf = float('inf')
47
48
49 def strnextling(prefix):
50     """Return the first unicode string
51        greater than but not starting with given prefix.
52        strnextling('hello') -> 'hellp'
53     """
54     if not prefix:
55         ## all strings start with the null string,
56         ## therefore we have to approximate strnextling('')
57         ## with the last unicode character supported by python
58         ## 0x10ffff for wide (32-bit unicode) python builds
59         ## 0x00ffff for narrow (16-bit unicode) python builds
60         ## We will not autodetect. 0xffff is safe enough.
61         return unichr(0xffff)
62     s = prefix[:-1]
63     c = ord(prefix[-1])
64     if c >= 0xffff:
65         raise RuntimeError
66     s += unichr(c+1)
67     return s
68
69 def strprevling(prefix):
70     """Return an approximation of the last unicode string
71        less than but not starting with given prefix.
72        strprevling(u'hello') -> u'helln\\xffff'
73     """
74     if not prefix:
75         ## There is no prevling for the null string
76         return prefix
77     s = prefix[:-1]
78     c = ord(prefix[-1])
79     if c > 0:
80         #s += unichr(c-1) + unichr(0xffff)
81         s += unichr(c-1)
82     return s
83
84
85 _propnames = {
86     'serial'    : 0,
87     'node'      : 1,
88     'hash'      : 2,
89     'size'      : 3,
90     'source'    : 4,
91     'mtime'     : 5,
92     'muser'     : 6,
93     'cluster'   : 7,
94 }
95
96
97 class Node(DBWorker):
98     """Nodes store path organization and have multiple versions.
99        Versions store object history and have multiple attributes.
100        Attributes store metadata.
101     """
102     
103     # TODO: Provide an interface for included and excluded clusters.
104     
105     def __init__(self, **params):
106         DBWorker.__init__(self, **params)
107         metadata = MetaData()
108         
109         #create nodes table
110         columns=[]
111         columns.append(Column('node', Integer, primary_key=True))
112         columns.append(Column('parent', Integer,
113                               ForeignKey('nodes.node',
114                                          ondelete='CASCADE',
115                                          onupdate='CASCADE'),
116                               autoincrement=False))
117         columns.append(Column('path', String(2048), default='', nullable=False))
118         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
119         # place an index on path
120         Index('idx_nodes_path', self.nodes.c.path)
121         
122         #create policy table
123         columns=[]
124         columns.append(Column('node', Integer,
125                               ForeignKey('nodes.node',
126                                          ondelete='CASCADE',
127                                          onupdate='CASCADE'),
128                               primary_key=True))
129         columns.append(Column('key', String(255), primary_key=True))
130         columns.append(Column('value', String(255)))
131         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
132         
133         #create statistics table
134         columns=[]
135         columns.append(Column('node', Integer,
136                               ForeignKey('nodes.node',
137                                          ondelete='CASCADE',
138                                          onupdate='CASCADE'),
139                               primary_key=True))
140         columns.append(Column('population', Integer, nullable=False, default=0))
141         columns.append(Column('size', BigInteger, nullable=False, default=0))
142         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
143         columns.append(Column('cluster', Integer, nullable=False, default=0,
144                               primary_key=True, autoincrement=False))
145         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
146         
147         #create versions table
148         columns=[]
149         columns.append(Column('serial', Integer, primary_key=True))
150         columns.append(Column('node', Integer,
151                               ForeignKey('nodes.node',
152                                          ondelete='CASCADE',
153                                          onupdate='CASCADE')))
154         columns.append(Column('hash', String(255)))
155         columns.append(Column('size', BigInteger, nullable=False, default=0))
156         columns.append(Column('source', Integer))
157         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
158         columns.append(Column('muser', String(255), nullable=False, default=''))
159         columns.append(Column('cluster', Integer, nullable=False, default=0))
160         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
161         Index('idx_versions_node_mtime', self.versions.c.node,
162               self.versions.c.mtime)
163         
164         #create attributes table
165         columns = []
166         columns.append(Column('serial', Integer,
167                               ForeignKey('versions.serial',
168                                          ondelete='CASCADE',
169                                          onupdate='CASCADE'),
170                               primary_key=True))
171         columns.append(Column('key', String(255), primary_key=True))
172         columns.append(Column('value', String(255)))
173         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
174         
175         metadata.create_all(self.engine)
176         
177         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
178                                            self.nodes.c.parent == ROOTNODE))
179         rp = self.conn.execute(s)
180         r = rp.fetchone()
181         rp.close()
182         if not r:
183             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
184             self.conn.execute(s)
185     
186     def node_create(self, parent, path):
187         """Create a new node from the given properties.
188            Return the node identifier of the new node.
189         """
190         #TODO catch IntegrityError?
191         s = self.nodes.insert().values(parent=parent, path=path)
192         r = self.conn.execute(s)
193         inserted_primary_key = r.inserted_primary_key[0]
194         r.close()
195         return inserted_primary_key
196     
197     def node_lookup(self, path):
198         """Lookup the current node of the given path.
199            Return None if the path is not found.
200         """
201         
202         s = select([self.nodes.c.node], self.nodes.c.path == path)
203         r = self.conn.execute(s)
204         row = r.fetchone()
205         r.close()
206         if row:
207             return row[0]
208         return None
209     
210     def node_get_properties(self, node):
211         """Return the node's (parent, path).
212            Return None if the node is not found.
213         """
214         
215         s = select([self.nodes.c.parent, self.nodes.c.path])
216         s = s.where(self.nodes.c.node == node)
217         r = self.conn.execute(s)
218         l = r.fetchone()
219         r.close()
220         return l
221     
222     def node_get_versions(self, node, keys=(), propnames=_propnames):
223         """Return the properties of all versions at node.
224            If keys is empty, return all properties in the order
225            (serial, node, size, source, mtime, muser, cluster).
226         """
227         
228         s = select([self.versions.c.serial,
229                     self.versions.c.node,
230                     self.versions.c.hash,
231                     self.versions.c.size,
232                     self.versions.c.source,
233                     self.versions.c.mtime,
234                     self.versions.c.muser,
235                     self.versions.c.cluster], self.versions.c.node == node)
236         s = s.order_by(self.versions.c.serial)
237         r = self.conn.execute(s)
238         rows = r.fetchall()
239         r.close()
240         if not rows:
241             return rows
242         
243         if not keys:
244             return rows
245         
246         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
247     
248     def node_count_children(self, node):
249         """Return node's child count."""
250         
251         s = select([func.count(self.nodes.c.node)])
252         s = s.where(and_(self.nodes.c.parent == node,
253                          self.nodes.c.node != ROOTNODE))
254         r = self.conn.execute(s)
255         row = r.fetchone()
256         r.close()
257         return row[0]
258     
259     def node_purge_children(self, parent, before=inf, cluster=0):
260         """Delete all versions with the specified
261            parent and cluster, and return
262            the serials of versions deleted.
263            Clears out nodes with no remaining versions.
264         """
265         #update statistics
266         c1 = select([self.nodes.c.node],
267             self.nodes.c.parent == parent)
268         where_clause = and_(self.versions.c.node.in_(c1),
269                             self.versions.c.cluster == cluster)
270         s = select([func.count(self.versions.c.serial),
271                     func.sum(self.versions.c.size)])
272         s = s.where(where_clause)
273         if before != inf:
274             s = s.where(self.versions.c.mtime <= before)
275         r = self.conn.execute(s)
276         row = r.fetchone()
277         r.close()
278         if not row:
279             return ()
280         nr, size = row[0], -row[1] if row[1] else 0
281         mtime = time()
282         self.statistics_update(parent, -nr, size, mtime, cluster)
283         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
284         
285         s = select([self.versions.c.serial])
286         s = s.where(where_clause)
287         r = self.conn.execute(s)
288         serials = [row[SERIAL] for row in r.fetchall()]
289         r.close()
290         
291         #delete versions
292         s = self.versions.delete().where(where_clause)
293         r = self.conn.execute(s)
294         r.close()
295         
296         #delete nodes
297         s = select([self.nodes.c.node],
298             and_(self.nodes.c.parent == parent,
299                  select([func.count(self.versions.c.serial)],
300                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
301         rp = self.conn.execute(s)
302         nodes = [r[0] for r in rp.fetchall()]
303         rp.close()
304         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
305         self.conn.execute(s).close()
306         
307         return serials
308     
309     def node_purge(self, node, before=inf, cluster=0):
310         """Delete all versions with the specified
311            node and cluster, and return
312            the serials of versions deleted.
313            Clears out the node if it has no remaining versions.
314         """
315         
316         #update statistics
317         s = select([func.count(self.versions.c.serial),
318                     func.sum(self.versions.c.size)])
319         where_clause = and_(self.versions.c.node == node,
320                          self.versions.c.cluster == cluster)
321         s = s.where(where_clause)
322         if before != inf:
323             s = s.where(self.versions.c.mtime <= before)
324         r = self.conn.execute(s)
325         row = r.fetchone()
326         nr, size = row[0], row[1]
327         r.close()
328         if not nr:
329             return ()
330         mtime = time()
331         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
332         
333         s = select([self.versions.c.serial])
334         s = s.where(where_clause)
335         r = self.conn.execute(s)
336         serials = [r[SERIAL] for r in r.fetchall()]
337         r.close()
338         
339         #delete versions
340         s = self.versions.delete().where(where_clause)
341         r = self.conn.execute(s)
342         r.close()
343         
344         #delete nodes
345         s = select([self.nodes.c.node],
346             and_(self.nodes.c.node == node,
347                  select([func.count(self.versions.c.serial)],
348                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
349         r = self.conn.execute(s)
350         nodes = r.fetchall()
351         r.close()
352         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
353         self.conn.execute(s).close()
354         
355         return serials
356     
357     def node_remove(self, node):
358         """Remove the node specified.
359            Return false if the node has children or is not found.
360         """
361         
362         if self.node_count_children(node):
363             return False
364         
365         mtime = time()
366         s = select([func.count(self.versions.c.serial),
367                     func.sum(self.versions.c.size),
368                     self.versions.c.cluster])
369         s = s.where(self.versions.c.node == node)
370         s = s.group_by(self.versions.c.cluster)
371         r = self.conn.execute(s)
372         for population, size, cluster in r.fetchall():
373             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
374         r.close()
375         
376         s = self.nodes.delete().where(self.nodes.c.node == node)
377         self.conn.execute(s).close()
378         return True
379     
380     def policy_get(self, node):
381         s = select([self.policies.c.key, self.policies.c.value],
382             self.policies.c.node==node)
383         r = self.conn.execute(s)
384         d = dict(r.fetchall())
385         r.close()
386         return d
387     
388     def policy_set(self, node, policy):
389         #insert or replace
390         for k, v in policy.iteritems():
391             s = self.policies.update().where(and_(self.policies.c.node == node,
392                                                   self.policies.c.key == k))
393             s = s.values(value = v)
394             rp = self.conn.execute(s)
395             rp.close()
396             if rp.rowcount == 0:
397                 s = self.policies.insert()
398                 values = {'node':node, 'key':k, 'value':v}
399                 r = self.conn.execute(s, values)
400                 r.close()
401     
402     def statistics_get(self, node, cluster=0):
403         """Return population, total size and last mtime
404            for all versions under node that belong to the cluster.
405         """
406         
407         s = select([self.statistics.c.population,
408                     self.statistics.c.size,
409                     self.statistics.c.mtime])
410         s = s.where(and_(self.statistics.c.node == node,
411                          self.statistics.c.cluster == cluster))
412         r = self.conn.execute(s)
413         row = r.fetchone()
414         r.close()
415         return row
416     
417     def statistics_update(self, node, population, size, mtime, cluster=0):
418         """Update the statistics of the given node.
419            Statistics keep track the population, total
420            size of objects and mtime in the node's namespace.
421            May be zero or positive or negative numbers.
422         """
423         
424         s = select([self.statistics.c.population, self.statistics.c.size],
425             and_(self.statistics.c.node == node,
426                  self.statistics.c.cluster == cluster))
427         rp = self.conn.execute(s)
428         r = rp.fetchone()
429         rp.close()
430         if not r:
431             prepopulation, presize = (0, 0)
432         else:
433             prepopulation, presize = r
434         population += prepopulation
435         size += presize
436         
437         #insert or replace
438         #TODO better upsert
439         u = self.statistics.update().where(and_(self.statistics.c.node==node,
440                                            self.statistics.c.cluster==cluster))
441         u = u.values(population=population, size=size, mtime=mtime)
442         rp = self.conn.execute(u)
443         rp.close()
444         if rp.rowcount == 0:
445             ins = self.statistics.insert()
446             ins = ins.values(node=node, population=population, size=size,
447                              mtime=mtime, cluster=cluster)
448             self.conn.execute(ins).close()
449     
450     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
451         """Update the statistics of the given node's parent.
452            Then recursively update all parents up to the root.
453            Population is not recursive.
454         """
455         
456         while True:
457             if node == ROOTNODE:
458                 break
459             props = self.node_get_properties(node)
460             if props is None:
461                 break
462             parent, path = props
463             self.statistics_update(parent, population, size, mtime, cluster)
464             node = parent
465             population = 0 # Population isn't recursive
466     
467     def statistics_latest(self, node, before=inf, except_cluster=0):
468         """Return population, total size and last mtime
469            for all latest versions under node that
470            do not belong to the cluster.
471         """
472         
473         # The node.
474         props = self.node_get_properties(node)
475         if props is None:
476             return None
477         parent, path = props
478         
479         # The latest version.
480         s = select([self.versions.c.serial,
481                     self.versions.c.node,
482                     self.versions.c.hash,
483                     self.versions.c.size,
484                     self.versions.c.source,
485                     self.versions.c.mtime,
486                     self.versions.c.muser,
487                     self.versions.c.cluster])
488         filtered = select([func.max(self.versions.c.serial)],
489                             self.versions.c.node == node)
490         if before != inf:
491             filtered = filtered.where(self.versions.c.mtime < before)
492         s = s.where(and_(self.versions.c.cluster != except_cluster,
493                          self.versions.c.serial == filtered))
494         r = self.conn.execute(s)
495         props = r.fetchone()
496         r.close()
497         if not props:
498             return None
499         mtime = props[MTIME]
500         
501         # First level, just under node (get population).
502         v = self.versions.alias('v')
503         s = select([func.count(v.c.serial),
504                     func.sum(v.c.size),
505                     func.max(v.c.mtime)])
506         c1 = select([func.max(self.versions.c.serial)])
507         if before != inf:
508             c1 = c1.where(self.versions.c.mtime < before)
509         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
510         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
511                          v.c.cluster != except_cluster,
512                          v.c.node.in_(c2)))
513         rp = self.conn.execute(s)
514         r = rp.fetchone()
515         rp.close()
516         if not r:
517             return None
518         count = r[0]
519         mtime = max(mtime, r[2])
520         if count == 0:
521             return (0, 0, mtime)
522         
523         # All children (get size and mtime).
524         # XXX: This is why the full path is stored.
525         s = select([func.count(v.c.serial),
526                     func.sum(v.c.size),
527                     func.max(v.c.mtime)])
528         c1 = select([func.max(self.versions.c.serial)],
529             self.versions.c.node == v.c.node)
530         if before != inf:
531             c1 = c1.where(self.versions.c.mtime < before)
532         c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
533         s = s.where(and_(v.c.serial == c1,
534                          v.c.cluster != except_cluster,
535                          v.c.node.in_(c2)))
536         rp = self.conn.execute(s)
537         r = rp.fetchone()
538         rp.close()
539         if not r:
540             return None
541         size = r[1] - props[SIZE]
542         mtime = max(mtime, r[2])
543         return (count, size, mtime)
544     
545     def version_create(self, node, hash, size, source, muser, cluster=0):
546         """Create a new version from the given properties.
547            Return the (serial, mtime) of the new version.
548         """
549         
550         mtime = time()
551         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
552                                           mtime=mtime, muser=muser, cluster=cluster)
553         serial = self.conn.execute(s).inserted_primary_key[0]
554         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
555         return serial, mtime
556     
557     def version_lookup(self, node, before=inf, cluster=0):
558         """Lookup the current version of the given node.
559            Return a list with its properties:
560            (serial, node, hash, size, source, mtime, muser, cluster)
561            or None if the current version is not found in the given cluster.
562         """
563         
564         v = self.versions.alias('v')
565         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
566                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
567         c = select([func.max(self.versions.c.serial)],
568             self.versions.c.node == node)
569         if before != inf:
570             c = c.where(self.versions.c.mtime < before)
571         s = s.where(and_(v.c.serial == c,
572                          v.c.cluster == cluster))
573         r = self.conn.execute(s)
574         props = r.fetchone()
575         r.close()
576         if props:
577             return props
578         return None
579     
580     def version_get_properties(self, serial, keys=(), propnames=_propnames):
581         """Return a sequence of values for the properties of
582            the version specified by serial and the keys, in the order given.
583            If keys is empty, return all properties in the order
584            (serial, node, hash, size, source, mtime, muser, cluster).
585         """
586         
587         v = self.versions.alias()
588         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
589                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
590         rp = self.conn.execute(s)
591         r = rp.fetchone()
592         rp.close()
593         if r is None:
594             return r
595         
596         if not keys:
597             return r
598         return [r[propnames[k]] for k in keys if k in propnames]
599     
600     def version_recluster(self, serial, cluster):
601         """Move the version into another cluster."""
602         
603         props = self.version_get_properties(serial)
604         if not props:
605             return
606         node = props[NODE]
607         size = props[SIZE]
608         oldcluster = props[CLUSTER]
609         if cluster == oldcluster:
610             return
611         
612         mtime = time()
613         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
614         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
615         
616         s = self.versions.update()
617         s = s.where(self.versions.c.serial == serial)
618         s = s.values(cluster = cluster)
619         self.conn.execute(s).close()
620     
621     def version_remove(self, serial):
622         """Remove the serial specified."""
623         
624         props = self.node_get_properties(serial)
625         if not props:
626             return
627         node = props[NODE]
628         size = props[SIZE]
629         cluster = props[CLUSTER]
630         
631         mtime = time()
632         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
633         
634         s = self.versions.delete().where(self.versions.c.serial == serial)
635         self.conn.execute(s).close()
636         return True
637     
638     def attribute_get(self, serial, keys=()):
639         """Return a list of (key, value) pairs of the version specified by serial.
640            If keys is empty, return all attributes.
641            Othwerise, return only those specified.
642         """
643         
644         if keys:
645             attrs = self.attributes.alias()
646             s = select([attrs.c.key, attrs.c.value])
647             s = s.where(and_(attrs.c.key.in_(keys),
648                              attrs.c.serial == serial))
649         else:
650             attrs = self.attributes.alias()
651             s = select([attrs.c.key, attrs.c.value])
652             s = s.where(attrs.c.serial == serial)
653         r = self.conn.execute(s)
654         l = r.fetchall()
655         r.close()
656         return l
657     
658     def attribute_set(self, serial, items):
659         """Set the attributes of the version specified by serial.
660            Receive attributes as an iterable of (key, value) pairs.
661         """
662         #insert or replace
663         #TODO better upsert
664         for k, v in items:
665             s = self.attributes.update()
666             s = s.where(and_(self.attributes.c.serial == serial,
667                              self.attributes.c.key == k))
668             s = s.values(value = v)
669             rp = self.conn.execute(s)
670             rp.close()
671             if rp.rowcount == 0:
672                 s = self.attributes.insert()
673                 s = s.values(serial=serial, key=k, value=v)
674                 self.conn.execute(s).close()
675     
676     def attribute_del(self, serial, keys=()):
677         """Delete attributes of the version specified by serial.
678            If keys is empty, delete all attributes.
679            Otherwise delete those specified.
680         """
681         
682         if keys:
683             #TODO more efficient way to do this?
684             for key in keys:
685                 s = self.attributes.delete()
686                 s = s.where(and_(self.attributes.c.serial == serial,
687                                  self.attributes.c.key == key))
688                 self.conn.execute(s).close()
689         else:
690             s = self.attributes.delete()
691             s = s.where(self.attributes.c.serial == serial)
692             self.conn.execute(s).close()
693     
694     def attribute_copy(self, source, dest):
695         s = select([dest, self.attributes.c.key, self.attributes.c.value],
696             self.attributes.c.serial == source)
697         rp = self.conn.execute(s)
698         attributes = rp.fetchall()
699         rp.close()
700         for dest, k, v in attributes:
701             #insert or replace
702             s = self.attributes.update().where(and_(
703                 self.attributes.c.serial == dest,
704                 self.attributes.c.key == k))
705             rp = self.conn.execute(s, value=v)
706             rp.close()
707             if rp.rowcount == 0:
708                 s = self.attributes.insert()
709                 values = {'serial':dest, 'key':k, 'value':v}
710                 self.conn.execute(s, values).close()
711     
712     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
713         """Return a list with all keys pairs defined
714            for all latest versions under parent that
715            do not belong to the cluster.
716         """
717         
718         # TODO: Use another table to store before=inf results.
719         a = self.attributes.alias('a')
720         v = self.versions.alias('v')
721         n = self.nodes.alias('n')
722         s = select([a.c.key]).distinct()
723         filtered = select([func.max(self.versions.c.serial)])
724         if before != inf:
725             filtered = filtered.where(self.versions.c.mtime < before)
726         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
727         s = s.where(v.c.cluster != except_cluster)
728         s = s.where(v.c.node.in_(select([self.nodes.c.node],
729             self.nodes.c.parent == parent)))
730         s = s.where(a.c.serial == v.c.serial)
731         s = s.where(n.c.node == v.c.node)
732         conj = []
733         for x in pathq:
734             conj.append(n.c.path.like(x + '%'))
735         if conj:
736             s = s.where(or_(*conj))
737         rp = self.conn.execute(s)
738         rows = rp.fetchall()
739         rp.close()
740         return [r[0] for r in rows]
741     
742     def latest_version_list(self, parent, prefix='', delimiter=None,
743                             start='', limit=10000, before=inf,
744                             except_cluster=0, pathq=[], filterq=None):
745         """Return a (list of (path, serial) tuples, list of common prefixes)
746            for the current versions of the paths with the given parent,
747            matching the following criteria.
748            
749            The property tuple for a version is returned if all
750            of these conditions are true:
751                 
752                 a. parent matches
753                 
754                 b. path > start
755                 
756                 c. path starts with prefix (and paths in pathq)
757                 
758                 d. version is the max up to before
759                 
760                 e. version is not in cluster
761                 
762                 f. the path does not have the delimiter occuring
763                    after the prefix, or ends with the delimiter
764                 
765                 g. serial matches the attribute filter query.
766                    
767                    A filter query is a comma-separated list of
768                    terms in one of these three forms:
769                    
770                    key
771                        an attribute with this key must exist
772                    
773                    !key
774                        an attribute with this key must not exist
775                    
776                    key ?op value
777                        the attribute with this key satisfies the value
778                        where ?op is one of ==, != <=, >=, <, >.
779            
780            The list of common prefixes includes the prefixes
781            matching up to the first delimiter after prefix,
782            and are reported only once, as "virtual directories".
783            The delimiter is included in the prefixes.
784            
785            If arguments are None, then the corresponding matching rule
786            will always match.
787            
788            Limit applies to the first list of tuples returned.
789         """
790         
791         if not start or start < prefix:
792             start = strprevling(prefix)
793         nextling = strnextling(prefix)
794         
795         a = self.attributes.alias('a')
796         v = self.versions.alias('v')
797         n = self.nodes.alias('n')
798         s = select([n.c.path, v.c.serial]).distinct()
799         filtered = select([func.max(self.versions.c.serial)])
800         if before != inf:
801             filtered = filtered.where(self.versions.c.mtime < before)
802         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
803         s = s.where(v.c.cluster != except_cluster)
804         s = s.where(v.c.node.in_(select([self.nodes.c.node],
805             self.nodes.c.parent == parent)))
806         if filterq:
807             s = s.where(a.c.serial == v.c.serial)
808         
809         s = s.where(n.c.node == v.c.node)
810         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
811         conj = []
812         for x in pathq:
813             conj.append(n.c.path.like(x + '%'))
814         
815         if conj:
816             s = s.where(or_(*conj))
817         
818         if filterq:
819             s = s.where(a.c.key.in_(filterq.split(',')))
820         
821         s = s.order_by(n.c.path)
822         
823         if not delimiter:
824             s = s.limit(limit)
825             rp = self.conn.execute(s, start=start)
826             r = rp.fetchall()
827             rp.close()
828             return r, ()
829         
830         pfz = len(prefix)
831         dz = len(delimiter)
832         count = 0
833         prefixes = []
834         pappend = prefixes.append
835         matches = []
836         mappend = matches.append
837         
838         rp = self.conn.execute(s, start=start)
839         while True:
840             props = rp.fetchone()
841             if props is None:
842                 break
843             path, serial = props
844             idx = path.find(delimiter, pfz)
845             
846             if idx < 0:
847                 mappend(props)
848                 count += 1
849                 if count >= limit:
850                     break
851                 continue
852             
853             if idx + dz == len(path):
854                 mappend(props)
855                 count += 1
856                 continue # Get one more, in case there is a path.
857             pf = path[:idx + dz]
858             pappend(pf)
859             if count >= limit: 
860                 break
861             
862             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
863         rp.close()
864         
865         return matches, prefixes