Escape catch-all characters in LIKE queries.
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, null, select, bindparam, text
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41
42 from dbworker import DBWorker
43
44 ROOTNODE  = 0
45
46 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
47
48 inf = float('inf')
49
50
51 def strnextling(prefix):
52     """Return the first unicode string
53        greater than but not starting with given prefix.
54        strnextling('hello') -> 'hellp'
55     """
56     if not prefix:
57         ## all strings start with the null string,
58         ## therefore we have to approximate strnextling('')
59         ## with the last unicode character supported by python
60         ## 0x10ffff for wide (32-bit unicode) python builds
61         ## 0x00ffff for narrow (16-bit unicode) python builds
62         ## We will not autodetect. 0xffff is safe enough.
63         return unichr(0xffff)
64     s = prefix[:-1]
65     c = ord(prefix[-1])
66     if c >= 0xffff:
67         raise RuntimeError
68     s += unichr(c+1)
69     return s
70
71 def strprevling(prefix):
72     """Return an approximation of the last unicode string
73        less than but not starting with given prefix.
74        strprevling(u'hello') -> u'helln\\xffff'
75     """
76     if not prefix:
77         ## There is no prevling for the null string
78         return prefix
79     s = prefix[:-1]
80     c = ord(prefix[-1])
81     if c > 0:
82         s += unichr(c-1) + unichr(0xffff)
83     return s
84
85
86 _propnames = {
87     'serial'    : 0,
88     'node'      : 1,
89     'hash'      : 2,
90     'size'      : 3,
91     'source'    : 4,
92     'mtime'     : 5,
93     'muser'     : 6,
94     'cluster'   : 7,
95 }
96
97
98 class Node(DBWorker):
99     """Nodes store path organization and have multiple versions.
100        Versions store object history and have multiple attributes.
101        Attributes store metadata.
102     """
103     
104     # TODO: Provide an interface for included and excluded clusters.
105     
106     def __init__(self, **params):
107         DBWorker.__init__(self, **params)
108         metadata = MetaData()
109         
110         #create nodes table
111         columns=[]
112         columns.append(Column('node', Integer, primary_key=True))
113         columns.append(Column('parent', Integer,
114                               ForeignKey('nodes.node',
115                                          ondelete='CASCADE',
116                                          onupdate='CASCADE'),
117                               autoincrement=False))
118         path_length = 2048
119         columns.append(Column('path', String(path_length), default='', nullable=False))
120         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
121         
122         #create policy table
123         columns=[]
124         columns.append(Column('node', Integer,
125                               ForeignKey('nodes.node',
126                                          ondelete='CASCADE',
127                                          onupdate='CASCADE'),
128                               primary_key=True))
129         columns.append(Column('key', String(255), primary_key=True))
130         columns.append(Column('value', String(255)))
131         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
132         
133         #create statistics table
134         columns=[]
135         columns.append(Column('node', Integer,
136                               ForeignKey('nodes.node',
137                                          ondelete='CASCADE',
138                                          onupdate='CASCADE'),
139                               primary_key=True))
140         columns.append(Column('population', Integer, nullable=False, default=0))
141         columns.append(Column('size', BigInteger, nullable=False, default=0))
142         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
143         columns.append(Column('cluster', Integer, nullable=False, default=0,
144                               primary_key=True, autoincrement=False))
145         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
146         
147         #create versions table
148         columns=[]
149         columns.append(Column('serial', Integer, primary_key=True))
150         columns.append(Column('node', Integer,
151                               ForeignKey('nodes.node',
152                                          ondelete='CASCADE',
153                                          onupdate='CASCADE')))
154         columns.append(Column('hash', String(255)))
155         columns.append(Column('size', BigInteger, nullable=False, default=0))
156         columns.append(Column('source', Integer))
157         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
158         columns.append(Column('muser', String(255), nullable=False, default=''))
159         columns.append(Column('cluster', Integer, nullable=False, default=0))
160         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
161         Index('idx_versions_node_mtime', self.versions.c.node,
162               self.versions.c.mtime)
163         
164         #create attributes table
165         columns = []
166         columns.append(Column('serial', Integer,
167                               ForeignKey('versions.serial',
168                                          ondelete='CASCADE',
169                                          onupdate='CASCADE'),
170                               primary_key=True))
171         columns.append(Column('key', String(255), primary_key=True))
172         columns.append(Column('value', String(255)))
173         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
174         
175         metadata.create_all(self.engine)
176         
177         # the following code creates an index of specific length
178         # this can be accompliced in sqlalchemy >= 0.7.3
179         # providing mysql_length option during index creation
180         insp = Inspector.from_engine(self.engine)
181         indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
182         if 'idx_nodes_path' not in indexes:
183             explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
184             s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
185             self.conn.execute(s).close()
186         
187         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
188                                            self.nodes.c.parent == ROOTNODE))
189         rp = self.conn.execute(s)
190         r = rp.fetchone()
191         rp.close()
192         if not r:
193             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
194             self.conn.execute(s)
195     
196     def node_create(self, parent, path):
197         """Create a new node from the given properties.
198            Return the node identifier of the new node.
199         """
200         #TODO catch IntegrityError?
201         s = self.nodes.insert().values(parent=parent, path=path)
202         r = self.conn.execute(s)
203         inserted_primary_key = r.inserted_primary_key[0]
204         r.close()
205         return inserted_primary_key
206     
207     def node_lookup(self, path):
208         """Lookup the current node of the given path.
209            Return None if the path is not found.
210         """
211         
212         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
213         s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
214         r = self.conn.execute(s)
215         row = r.fetchone()
216         r.close()
217         if row:
218             return row[0]
219         return None
220     
221     def node_get_properties(self, node):
222         """Return the node's (parent, path).
223            Return None if the node is not found.
224         """
225         
226         s = select([self.nodes.c.parent, self.nodes.c.path])
227         s = s.where(self.nodes.c.node == node)
228         r = self.conn.execute(s)
229         l = r.fetchone()
230         r.close()
231         return l
232     
233     def node_get_versions(self, node, keys=(), propnames=_propnames):
234         """Return the properties of all versions at node.
235            If keys is empty, return all properties in the order
236            (serial, node, size, source, mtime, muser, cluster).
237         """
238         
239         s = select([self.versions.c.serial,
240                     self.versions.c.node,
241                     self.versions.c.hash,
242                     self.versions.c.size,
243                     self.versions.c.source,
244                     self.versions.c.mtime,
245                     self.versions.c.muser,
246                     self.versions.c.cluster], self.versions.c.node == node)
247         s = s.order_by(self.versions.c.serial)
248         r = self.conn.execute(s)
249         rows = r.fetchall()
250         r.close()
251         if not rows:
252             return rows
253         
254         if not keys:
255             return rows
256         
257         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
258     
259     def node_count_children(self, node):
260         """Return node's child count."""
261         
262         s = select([func.count(self.nodes.c.node)])
263         s = s.where(and_(self.nodes.c.parent == node,
264                          self.nodes.c.node != ROOTNODE))
265         r = self.conn.execute(s)
266         row = r.fetchone()
267         r.close()
268         return row[0]
269     
270     def node_purge_children(self, parent, before=inf, cluster=0):
271         """Delete all versions with the specified
272            parent and cluster, and return
273            the hashes of versions deleted.
274            Clears out nodes with no remaining versions.
275         """
276         #update statistics
277         c1 = select([self.nodes.c.node],
278             self.nodes.c.parent == parent)
279         where_clause = and_(self.versions.c.node.in_(c1),
280                             self.versions.c.cluster == cluster)
281         s = select([func.count(self.versions.c.serial),
282                     func.sum(self.versions.c.size)])
283         s = s.where(where_clause)
284         if before != inf:
285             s = s.where(self.versions.c.mtime <= before)
286         r = self.conn.execute(s)
287         row = r.fetchone()
288         r.close()
289         if not row:
290             return ()
291         nr, size = row[0], -row[1] if row[1] else 0
292         mtime = time()
293         self.statistics_update(parent, -nr, size, mtime, cluster)
294         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
295         
296         s = select([self.versions.c.hash])
297         s = s.where(where_clause)
298         r = self.conn.execute(s)
299         hashes = [row[0] for row in r.fetchall()]
300         r.close()
301         
302         #delete versions
303         s = self.versions.delete().where(where_clause)
304         r = self.conn.execute(s)
305         r.close()
306         
307         #delete nodes
308         s = select([self.nodes.c.node],
309             and_(self.nodes.c.parent == parent,
310                  select([func.count(self.versions.c.serial)],
311                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
312         rp = self.conn.execute(s)
313         nodes = [r[0] for r in rp.fetchall()]
314         rp.close()
315         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
316         self.conn.execute(s).close()
317         
318         return hashes
319     
320     def node_purge(self, node, before=inf, cluster=0):
321         """Delete all versions with the specified
322            node and cluster, and return
323            the hashes of versions deleted.
324            Clears out the node if it has no remaining versions.
325         """
326         
327         #update statistics
328         s = select([func.count(self.versions.c.serial),
329                     func.sum(self.versions.c.size)])
330         where_clause = and_(self.versions.c.node == node,
331                          self.versions.c.cluster == cluster)
332         s = s.where(where_clause)
333         if before != inf:
334             s = s.where(self.versions.c.mtime <= before)
335         r = self.conn.execute(s)
336         row = r.fetchone()
337         nr, size = row[0], row[1]
338         r.close()
339         if not nr:
340             return ()
341         mtime = time()
342         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
343         
344         s = select([self.versions.c.hash])
345         s = s.where(where_clause)
346         r = self.conn.execute(s)
347         hashes = [r[0] for r in r.fetchall()]
348         r.close()
349         
350         #delete versions
351         s = self.versions.delete().where(where_clause)
352         r = self.conn.execute(s)
353         r.close()
354         
355         #delete nodes
356         s = select([self.nodes.c.node],
357             and_(self.nodes.c.node == node,
358                  select([func.count(self.versions.c.serial)],
359                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
360         r = self.conn.execute(s)
361         nodes = r.fetchall()
362         r.close()
363         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
364         self.conn.execute(s).close()
365         
366         return hashes
367     
368     def node_remove(self, node):
369         """Remove the node specified.
370            Return false if the node has children or is not found.
371         """
372         
373         if self.node_count_children(node):
374             return False
375         
376         mtime = time()
377         s = select([func.count(self.versions.c.serial),
378                     func.sum(self.versions.c.size),
379                     self.versions.c.cluster])
380         s = s.where(self.versions.c.node == node)
381         s = s.group_by(self.versions.c.cluster)
382         r = self.conn.execute(s)
383         for population, size, cluster in r.fetchall():
384             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
385         r.close()
386         
387         s = self.nodes.delete().where(self.nodes.c.node == node)
388         self.conn.execute(s).close()
389         return True
390     
391     def policy_get(self, node):
392         s = select([self.policies.c.key, self.policies.c.value],
393             self.policies.c.node==node)
394         r = self.conn.execute(s)
395         d = dict(r.fetchall())
396         r.close()
397         return d
398     
399     def policy_set(self, node, policy):
400         #insert or replace
401         for k, v in policy.iteritems():
402             s = self.policies.update().where(and_(self.policies.c.node == node,
403                                                   self.policies.c.key == k))
404             s = s.values(value = v)
405             rp = self.conn.execute(s)
406             rp.close()
407             if rp.rowcount == 0:
408                 s = self.policies.insert()
409                 values = {'node':node, 'key':k, 'value':v}
410                 r = self.conn.execute(s, values)
411                 r.close()
412     
413     def statistics_get(self, node, cluster=0):
414         """Return population, total size and last mtime
415            for all versions under node that belong to the cluster.
416         """
417         
418         s = select([self.statistics.c.population,
419                     self.statistics.c.size,
420                     self.statistics.c.mtime])
421         s = s.where(and_(self.statistics.c.node == node,
422                          self.statistics.c.cluster == cluster))
423         r = self.conn.execute(s)
424         row = r.fetchone()
425         r.close()
426         return row
427     
428     def statistics_update(self, node, population, size, mtime, cluster=0):
429         """Update the statistics of the given node.
430            Statistics keep track the population, total
431            size of objects and mtime in the node's namespace.
432            May be zero or positive or negative numbers.
433         """
434         s = select([self.statistics.c.population, self.statistics.c.size],
435             and_(self.statistics.c.node == node,
436                  self.statistics.c.cluster == cluster))
437         rp = self.conn.execute(s)
438         r = rp.fetchone()
439         rp.close()
440         if not r:
441             prepopulation, presize = (0, 0)
442         else:
443             prepopulation, presize = r
444         population += prepopulation
445         size += presize
446         
447         #insert or replace
448         #TODO better upsert
449         u = self.statistics.update().where(and_(self.statistics.c.node==node,
450                                            self.statistics.c.cluster==cluster))
451         u = u.values(population=population, size=size, mtime=mtime)
452         rp = self.conn.execute(u)
453         rp.close()
454         if rp.rowcount == 0:
455             ins = self.statistics.insert()
456             ins = ins.values(node=node, population=population, size=size,
457                              mtime=mtime, cluster=cluster)
458             self.conn.execute(ins).close()
459     
460     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
461         """Update the statistics of the given node's parent.
462            Then recursively update all parents up to the root.
463            Population is not recursive.
464         """
465         
466         while True:
467             if node == ROOTNODE:
468                 break
469             props = self.node_get_properties(node)
470             if props is None:
471                 break
472             parent, path = props
473             self.statistics_update(parent, population, size, mtime, cluster)
474             node = parent
475             population = 0 # Population isn't recursive
476     
477     def statistics_latest(self, node, before=inf, except_cluster=0):
478         """Return population, total size and last mtime
479            for all latest versions under node that
480            do not belong to the cluster.
481         """
482         
483         # The node.
484         props = self.node_get_properties(node)
485         if props is None:
486             return None
487         parent, path = props
488         
489         # The latest version.
490         s = select([self.versions.c.serial,
491                     self.versions.c.node,
492                     self.versions.c.hash,
493                     self.versions.c.size,
494                     self.versions.c.source,
495                     self.versions.c.mtime,
496                     self.versions.c.muser,
497                     self.versions.c.cluster])
498         filtered = select([func.max(self.versions.c.serial)],
499                             self.versions.c.node == node)
500         if before != inf:
501             filtered = filtered.where(self.versions.c.mtime < before)
502         s = s.where(and_(self.versions.c.cluster != except_cluster,
503                          self.versions.c.serial == filtered))
504         r = self.conn.execute(s)
505         props = r.fetchone()
506         r.close()
507         if not props:
508             return None
509         mtime = props[MTIME]
510         
511         # First level, just under node (get population).
512         v = self.versions.alias('v')
513         s = select([func.count(v.c.serial),
514                     func.sum(v.c.size),
515                     func.max(v.c.mtime)])
516         c1 = select([func.max(self.versions.c.serial)])
517         if before != inf:
518             c1 = c1.where(self.versions.c.mtime < before)
519         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
520         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
521                          v.c.cluster != except_cluster,
522                          v.c.node.in_(c2)))
523         rp = self.conn.execute(s)
524         r = rp.fetchone()
525         rp.close()
526         if not r:
527             return None
528         count = r[0]
529         mtime = max(mtime, r[2])
530         if count == 0:
531             return (0, 0, mtime)
532         
533         # All children (get size and mtime).
534         # XXX: This is why the full path is stored.
535         s = select([func.count(v.c.serial),
536                     func.sum(v.c.size),
537                     func.max(v.c.mtime)])
538         c1 = select([func.max(self.versions.c.serial)],
539             self.versions.c.node == v.c.node)
540         if before != inf:
541             c1 = c1.where(self.versions.c.mtime < before)
542         c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
543         s = s.where(and_(v.c.serial == c1,
544                          v.c.cluster != except_cluster,
545                          v.c.node.in_(c2)))
546         rp = self.conn.execute(s)
547         r = rp.fetchone()
548         rp.close()
549         if not r:
550             return None
551         size = r[1] - props[SIZE]
552         mtime = max(mtime, r[2])
553         return (count, size, mtime)
554     
555     def version_create(self, node, hash, size, source, muser, cluster=0):
556         """Create a new version from the given properties.
557            Return the (serial, mtime) of the new version.
558         """
559         
560         mtime = time()
561         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
562                                           mtime=mtime, muser=muser, cluster=cluster)
563         serial = self.conn.execute(s).inserted_primary_key[0]
564         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
565         return serial, mtime
566     
567     def version_lookup(self, node, before=inf, cluster=0):
568         """Lookup the current version of the given node.
569            Return a list with its properties:
570            (serial, node, hash, size, source, mtime, muser, cluster)
571            or None if the current version is not found in the given cluster.
572         """
573         
574         v = self.versions.alias('v')
575         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
576                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
577         c = select([func.max(self.versions.c.serial)],
578             self.versions.c.node == node)
579         if before != inf:
580             c = c.where(self.versions.c.mtime < before)
581         s = s.where(and_(v.c.serial == c,
582                          v.c.cluster == cluster))
583         r = self.conn.execute(s)
584         props = r.fetchone()
585         r.close()
586         if props:
587             return props
588         return None
589     
590     def version_get_properties(self, serial, keys=(), propnames=_propnames):
591         """Return a sequence of values for the properties of
592            the version specified by serial and the keys, in the order given.
593            If keys is empty, return all properties in the order
594            (serial, node, hash, size, source, mtime, muser, cluster).
595         """
596         
597         v = self.versions.alias()
598         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
599                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
600         rp = self.conn.execute(s)
601         r = rp.fetchone()
602         rp.close()
603         if r is None:
604             return r
605         
606         if not keys:
607             return r
608         return [r[propnames[k]] for k in keys if k in propnames]
609     
610     def version_recluster(self, serial, cluster):
611         """Move the version into another cluster."""
612         
613         props = self.version_get_properties(serial)
614         if not props:
615             return
616         node = props[NODE]
617         size = props[SIZE]
618         oldcluster = props[CLUSTER]
619         if cluster == oldcluster:
620             return
621         
622         mtime = time()
623         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
624         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
625         
626         s = self.versions.update()
627         s = s.where(self.versions.c.serial == serial)
628         s = s.values(cluster = cluster)
629         self.conn.execute(s).close()
630     
631     def version_remove(self, serial):
632         """Remove the serial specified."""
633         
634         props = self.version_get_properties(serial)
635         if not props:
636             return
637         node = props[NODE]
638         hash = props[HASH]
639         size = props[SIZE]
640         cluster = props[CLUSTER]
641         
642         mtime = time()
643         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
644         
645         s = self.versions.delete().where(self.versions.c.serial == serial)
646         self.conn.execute(s).close()
647         return hash
648     
649     def attribute_get(self, serial, keys=()):
650         """Return a list of (key, value) pairs of the version specified by serial.
651            If keys is empty, return all attributes.
652            Othwerise, return only those specified.
653         """
654         
655         if keys:
656             attrs = self.attributes.alias()
657             s = select([attrs.c.key, attrs.c.value])
658             s = s.where(and_(attrs.c.key.in_(keys),
659                              attrs.c.serial == serial))
660         else:
661             attrs = self.attributes.alias()
662             s = select([attrs.c.key, attrs.c.value])
663             s = s.where(attrs.c.serial == serial)
664         r = self.conn.execute(s)
665         l = r.fetchall()
666         r.close()
667         return l
668     
669     def attribute_set(self, serial, items):
670         """Set the attributes of the version specified by serial.
671            Receive attributes as an iterable of (key, value) pairs.
672         """
673         #insert or replace
674         #TODO better upsert
675         for k, v in items:
676             s = self.attributes.update()
677             s = s.where(and_(self.attributes.c.serial == serial,
678                              self.attributes.c.key == k))
679             s = s.values(value = v)
680             rp = self.conn.execute(s)
681             rp.close()
682             if rp.rowcount == 0:
683                 s = self.attributes.insert()
684                 s = s.values(serial=serial, key=k, value=v)
685                 self.conn.execute(s).close()
686     
687     def attribute_del(self, serial, keys=()):
688         """Delete attributes of the version specified by serial.
689            If keys is empty, delete all attributes.
690            Otherwise delete those specified.
691         """
692         
693         if keys:
694             #TODO more efficient way to do this?
695             for key in keys:
696                 s = self.attributes.delete()
697                 s = s.where(and_(self.attributes.c.serial == serial,
698                                  self.attributes.c.key == key))
699                 self.conn.execute(s).close()
700         else:
701             s = self.attributes.delete()
702             s = s.where(self.attributes.c.serial == serial)
703             self.conn.execute(s).close()
704     
705     def attribute_copy(self, source, dest):
706         s = select([dest, self.attributes.c.key, self.attributes.c.value],
707             self.attributes.c.serial == source)
708         rp = self.conn.execute(s)
709         attributes = rp.fetchall()
710         rp.close()
711         for dest, k, v in attributes:
712             #insert or replace
713             s = self.attributes.update().where(and_(
714                 self.attributes.c.serial == dest,
715                 self.attributes.c.key == k))
716             rp = self.conn.execute(s, value=v)
717             rp.close()
718             if rp.rowcount == 0:
719                 s = self.attributes.insert()
720                 values = {'serial':dest, 'key':k, 'value':v}
721                 self.conn.execute(s, values).close()
722     
723     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
724         """Return a list with all keys pairs defined
725            for all latest versions under parent that
726            do not belong to the cluster.
727         """
728         
729         # TODO: Use another table to store before=inf results.
730         a = self.attributes.alias('a')
731         v = self.versions.alias('v')
732         n = self.nodes.alias('n')
733         s = select([a.c.key]).distinct()
734         filtered = select([func.max(self.versions.c.serial)])
735         if before != inf:
736             filtered = filtered.where(self.versions.c.mtime < before)
737         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
738         s = s.where(v.c.cluster != except_cluster)
739         s = s.where(v.c.node.in_(select([self.nodes.c.node],
740             self.nodes.c.parent == parent)))
741         s = s.where(a.c.serial == v.c.serial)
742         s = s.where(n.c.node == v.c.node)
743         conj = []
744         for x in pathq:
745             conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
746         if conj:
747             s = s.where(or_(*conj))
748         rp = self.conn.execute(s)
749         rows = rp.fetchall()
750         rp.close()
751         return [r[0] for r in rows]
752     
753     def latest_version_list(self, parent, prefix='', delimiter=None,
754                             start='', limit=10000, before=inf,
755                             except_cluster=0, pathq=[], filterq=None):
756         """Return a (list of (path, serial) tuples, list of common prefixes)
757            for the current versions of the paths with the given parent,
758            matching the following criteria.
759            
760            The property tuple for a version is returned if all
761            of these conditions are true:
762                 
763                 a. parent matches
764                 
765                 b. path > start
766                 
767                 c. path starts with prefix (and paths in pathq)
768                 
769                 d. version is the max up to before
770                 
771                 e. version is not in cluster
772                 
773                 f. the path does not have the delimiter occuring
774                    after the prefix, or ends with the delimiter
775                 
776                 g. serial matches the attribute filter query.
777                    
778                    A filter query is a comma-separated list of
779                    terms in one of these three forms:
780                    
781                    key
782                        an attribute with this key must exist
783                    
784                    !key
785                        an attribute with this key must not exist
786                    
787                    key ?op value
788                        the attribute with this key satisfies the value
789                        where ?op is one of ==, != <=, >=, <, >.
790            
791            The list of common prefixes includes the prefixes
792            matching up to the first delimiter after prefix,
793            and are reported only once, as "virtual directories".
794            The delimiter is included in the prefixes.
795            
796            If arguments are None, then the corresponding matching rule
797            will always match.
798            
799            Limit applies to the first list of tuples returned.
800         """
801         
802         if not start or start < prefix:
803             start = strprevling(prefix)
804         nextling = strnextling(prefix)
805         
806         a = self.attributes.alias('a')
807         v = self.versions.alias('v')
808         n = self.nodes.alias('n')
809         s = select([n.c.path, v.c.serial]).distinct()
810         filtered = select([func.max(self.versions.c.serial)])
811         if before != inf:
812             filtered = filtered.where(self.versions.c.mtime < before)
813         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
814         s = s.where(v.c.cluster != except_cluster)
815         s = s.where(v.c.node.in_(select([self.nodes.c.node],
816             self.nodes.c.parent == parent)))
817         if filterq:
818             s = s.where(a.c.serial == v.c.serial)
819         
820         s = s.where(n.c.node == v.c.node)
821         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
822         conj = []
823         for x in pathq:
824             conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
825         
826         if conj:
827             s = s.where(or_(*conj))
828         
829         if filterq:
830             s = s.where(a.c.key.in_(filterq.split(',')))
831         
832         s = s.order_by(n.c.path)
833         
834         if not delimiter:
835             s = s.limit(limit)
836             rp = self.conn.execute(s, start=start)
837             r = rp.fetchall()
838             rp.close()
839             return r, ()
840         
841         pfz = len(prefix)
842         dz = len(delimiter)
843         count = 0
844         prefixes = []
845         pappend = prefixes.append
846         matches = []
847         mappend = matches.append
848         
849         rp = self.conn.execute(s, start=start)
850         while True:
851             props = rp.fetchone()
852             if props is None:
853                 break
854             path, serial = props
855             idx = path.find(delimiter, pfz)
856             
857             if idx < 0:
858                 mappend(props)
859                 count += 1
860                 if count >= limit:
861                     break
862                 continue
863             
864             if idx + dz == len(path):
865                 mappend(props)
866                 count += 1
867                 continue # Get one more, in case there is a path.
868             pf = path[:idx + dz]
869             pappend(pf)
870             if count >= limit: 
871                 break
872             
873             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
874         rp.close()
875         
876         return matches, prefixes