Merge branch 'master' into packaging
[pithos] / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41
42 from dbworker import DBWorker
43
44 from pithos.lib.filter import parse_filters
45
46
47 ROOTNODE  = 0
48
49 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
50
51 inf = float('inf')
52
53
54 def strnextling(prefix):
55     """Return the first unicode string
56        greater than but not starting with given prefix.
57        strnextling('hello') -> 'hellp'
58     """
59     if not prefix:
60         ## all strings start with the null string,
61         ## therefore we have to approximate strnextling('')
62         ## with the last unicode character supported by python
63         ## 0x10ffff for wide (32-bit unicode) python builds
64         ## 0x00ffff for narrow (16-bit unicode) python builds
65         ## We will not autodetect. 0xffff is safe enough.
66         return unichr(0xffff)
67     s = prefix[:-1]
68     c = ord(prefix[-1])
69     if c >= 0xffff:
70         raise RuntimeError
71     s += unichr(c+1)
72     return s
73
74 def strprevling(prefix):
75     """Return an approximation of the last unicode string
76        less than but not starting with given prefix.
77        strprevling(u'hello') -> u'helln\\xffff'
78     """
79     if not prefix:
80         ## There is no prevling for the null string
81         return prefix
82     s = prefix[:-1]
83     c = ord(prefix[-1])
84     if c > 0:
85         s += unichr(c-1) + unichr(0xffff)
86     return s
87
88 _propnames = {
89     'serial'    : 0,
90     'node'      : 1,
91     'hash'      : 2,
92     'size'      : 3,
93     'source'    : 4,
94     'mtime'     : 5,
95     'muser'     : 6,
96     'uuid'      : 7,
97     'cluster'   : 8
98 }
99
100
101 class Node(DBWorker):
102     """Nodes store path organization and have multiple versions.
103        Versions store object history and have multiple attributes.
104        Attributes store metadata.
105     """
106     
107     # TODO: Provide an interface for included and excluded clusters.
108     
109     def __init__(self, **params):
110         DBWorker.__init__(self, **params)
111         metadata = MetaData()
112         
113         #create nodes table
114         columns=[]
115         columns.append(Column('node', Integer, primary_key=True))
116         columns.append(Column('parent', Integer,
117                               ForeignKey('nodes.node',
118                                          ondelete='CASCADE',
119                                          onupdate='CASCADE'),
120                               autoincrement=False))
121         path_length = 2048
122         columns.append(Column('path', String(path_length), default='', nullable=False))
123         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
124         Index('idx_nodes_path', self.nodes.c.path, unique=True)
125         
126         #create policy table
127         columns=[]
128         columns.append(Column('node', Integer,
129                               ForeignKey('nodes.node',
130                                          ondelete='CASCADE',
131                                          onupdate='CASCADE'),
132                               primary_key=True))
133         columns.append(Column('key', String(255), primary_key=True))
134         columns.append(Column('value', String(255)))
135         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
136         
137         #create statistics table
138         columns=[]
139         columns.append(Column('node', Integer,
140                               ForeignKey('nodes.node',
141                                          ondelete='CASCADE',
142                                          onupdate='CASCADE'),
143                               primary_key=True))
144         columns.append(Column('population', Integer, nullable=False, default=0))
145         columns.append(Column('size', BigInteger, nullable=False, default=0))
146         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
147         columns.append(Column('cluster', Integer, nullable=False, default=0,
148                               primary_key=True, autoincrement=False))
149         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
150         
151         #create versions table
152         columns=[]
153         columns.append(Column('serial', Integer, primary_key=True))
154         columns.append(Column('node', Integer,
155                               ForeignKey('nodes.node',
156                                          ondelete='CASCADE',
157                                          onupdate='CASCADE')))
158         columns.append(Column('hash', String(255)))
159         columns.append(Column('size', BigInteger, nullable=False, default=0))
160         columns.append(Column('source', Integer))
161         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
162         columns.append(Column('muser', String(255), nullable=False, default=''))
163         columns.append(Column('uuid', String(64), nullable=False, default=''))
164         columns.append(Column('cluster', Integer, nullable=False, default=0))
165         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
166         Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
167         Index('idx_versions_node_uuid', self.versions.c.uuid)
168         
169         #create attributes table
170         columns = []
171         columns.append(Column('serial', Integer,
172                               ForeignKey('versions.serial',
173                                          ondelete='CASCADE',
174                                          onupdate='CASCADE'),
175                               primary_key=True))
176         columns.append(Column('domain', String(255), primary_key=True))
177         columns.append(Column('key', String(255), primary_key=True))
178         columns.append(Column('value', String(255)))
179         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
180         
181         metadata.create_all(self.engine)
182         
183         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
184                                            self.nodes.c.parent == ROOTNODE))
185         rp = self.conn.execute(s)
186         r = rp.fetchone()
187         rp.close()
188         if not r:
189             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
190             self.conn.execute(s)
191     
192     def node_create(self, parent, path):
193         """Create a new node from the given properties.
194            Return the node identifier of the new node.
195         """
196         #TODO catch IntegrityError?
197         s = self.nodes.insert().values(parent=parent, path=path)
198         r = self.conn.execute(s)
199         inserted_primary_key = r.inserted_primary_key[0]
200         r.close()
201         return inserted_primary_key
202     
203     def node_lookup(self, path):
204         """Lookup the current node of the given path.
205            Return None if the path is not found.
206         """
207         
208         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
209         s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
210         r = self.conn.execute(s)
211         row = r.fetchone()
212         r.close()
213         if row:
214             return row[0]
215         return None
216     
217     def node_get_properties(self, node):
218         """Return the node's (parent, path).
219            Return None if the node is not found.
220         """
221         
222         s = select([self.nodes.c.parent, self.nodes.c.path])
223         s = s.where(self.nodes.c.node == node)
224         r = self.conn.execute(s)
225         l = r.fetchone()
226         r.close()
227         return l
228     
229     def node_get_versions(self, node, keys=(), propnames=_propnames):
230         """Return the properties of all versions at node.
231            If keys is empty, return all properties in the order
232            (serial, node, hash, size, source, mtime, muser, uuid, cluster).
233         """
234         
235         s = select([self.versions.c.serial,
236                     self.versions.c.node,
237                     self.versions.c.hash,
238                     self.versions.c.size,
239                     self.versions.c.source,
240                     self.versions.c.mtime,
241                     self.versions.c.muser,
242                     self.versions.c.uuid,
243                     self.versions.c.cluster], self.versions.c.node == node)
244         s = s.order_by(self.versions.c.serial)
245         r = self.conn.execute(s)
246         rows = r.fetchall()
247         r.close()
248         if not rows:
249             return rows
250         
251         if not keys:
252             return rows
253         
254         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
255     
256     def node_count_children(self, node):
257         """Return node's child count."""
258         
259         s = select([func.count(self.nodes.c.node)])
260         s = s.where(and_(self.nodes.c.parent == node,
261                          self.nodes.c.node != ROOTNODE))
262         r = self.conn.execute(s)
263         row = r.fetchone()
264         r.close()
265         return row[0]
266     
267     def node_purge_children(self, parent, before=inf, cluster=0):
268         """Delete all versions with the specified
269            parent and cluster, and return
270            the hashes of versions deleted.
271            Clears out nodes with no remaining versions.
272         """
273         #update statistics
274         c1 = select([self.nodes.c.node],
275             self.nodes.c.parent == parent)
276         where_clause = and_(self.versions.c.node.in_(c1),
277                             self.versions.c.cluster == cluster)
278         s = select([func.count(self.versions.c.serial),
279                     func.sum(self.versions.c.size)])
280         s = s.where(where_clause)
281         if before != inf:
282             s = s.where(self.versions.c.mtime <= before)
283         r = self.conn.execute(s)
284         row = r.fetchone()
285         r.close()
286         if not row:
287             return ()
288         nr, size = row[0], -row[1] if row[1] else 0
289         mtime = time()
290         self.statistics_update(parent, -nr, size, mtime, cluster)
291         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
292         
293         s = select([self.versions.c.hash])
294         s = s.where(where_clause)
295         r = self.conn.execute(s)
296         hashes = [row[0] for row in r.fetchall()]
297         r.close()
298         
299         #delete versions
300         s = self.versions.delete().where(where_clause)
301         r = self.conn.execute(s)
302         r.close()
303         
304         #delete nodes
305         s = select([self.nodes.c.node],
306             and_(self.nodes.c.parent == parent,
307                  select([func.count(self.versions.c.serial)],
308                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
309         rp = self.conn.execute(s)
310         nodes = [r[0] for r in rp.fetchall()]
311         rp.close()
312         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
313         self.conn.execute(s).close()
314         
315         return hashes
316     
317     def node_purge(self, node, before=inf, cluster=0):
318         """Delete all versions with the specified
319            node and cluster, and return
320            the hashes of versions deleted.
321            Clears out the node if it has no remaining versions.
322         """
323         
324         #update statistics
325         s = select([func.count(self.versions.c.serial),
326                     func.sum(self.versions.c.size)])
327         where_clause = and_(self.versions.c.node == node,
328                          self.versions.c.cluster == cluster)
329         s = s.where(where_clause)
330         if before != inf:
331             s = s.where(self.versions.c.mtime <= before)
332         r = self.conn.execute(s)
333         row = r.fetchone()
334         nr, size = row[0], row[1]
335         r.close()
336         if not nr:
337             return ()
338         mtime = time()
339         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
340         
341         s = select([self.versions.c.hash])
342         s = s.where(where_clause)
343         r = self.conn.execute(s)
344         hashes = [r[0] for r in r.fetchall()]
345         r.close()
346         
347         #delete versions
348         s = self.versions.delete().where(where_clause)
349         r = self.conn.execute(s)
350         r.close()
351         
352         #delete nodes
353         s = select([self.nodes.c.node],
354             and_(self.nodes.c.node == node,
355                  select([func.count(self.versions.c.serial)],
356                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
357         r = self.conn.execute(s)
358         nodes = r.fetchall()
359         r.close()
360         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
361         self.conn.execute(s).close()
362         
363         return hashes
364     
365     def node_remove(self, node):
366         """Remove the node specified.
367            Return false if the node has children or is not found.
368         """
369         
370         if self.node_count_children(node):
371             return False
372         
373         mtime = time()
374         s = select([func.count(self.versions.c.serial),
375                     func.sum(self.versions.c.size),
376                     self.versions.c.cluster])
377         s = s.where(self.versions.c.node == node)
378         s = s.group_by(self.versions.c.cluster)
379         r = self.conn.execute(s)
380         for population, size, cluster in r.fetchall():
381             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
382         r.close()
383         
384         s = self.nodes.delete().where(self.nodes.c.node == node)
385         self.conn.execute(s).close()
386         return True
387     
388     def policy_get(self, node):
389         s = select([self.policies.c.key, self.policies.c.value],
390             self.policies.c.node==node)
391         r = self.conn.execute(s)
392         d = dict(r.fetchall())
393         r.close()
394         return d
395     
396     def policy_set(self, node, policy):
397         #insert or replace
398         for k, v in policy.iteritems():
399             s = self.policies.update().where(and_(self.policies.c.node == node,
400                                                   self.policies.c.key == k))
401             s = s.values(value = v)
402             rp = self.conn.execute(s)
403             rp.close()
404             if rp.rowcount == 0:
405                 s = self.policies.insert()
406                 values = {'node':node, 'key':k, 'value':v}
407                 r = self.conn.execute(s, values)
408                 r.close()
409     
410     def statistics_get(self, node, cluster=0):
411         """Return population, total size and last mtime
412            for all versions under node that belong to the cluster.
413         """
414         
415         s = select([self.statistics.c.population,
416                     self.statistics.c.size,
417                     self.statistics.c.mtime])
418         s = s.where(and_(self.statistics.c.node == node,
419                          self.statistics.c.cluster == cluster))
420         r = self.conn.execute(s)
421         row = r.fetchone()
422         r.close()
423         return row
424     
425     def statistics_update(self, node, population, size, mtime, cluster=0):
426         """Update the statistics of the given node.
427            Statistics keep track the population, total
428            size of objects and mtime in the node's namespace.
429            May be zero or positive or negative numbers.
430         """
431         s = select([self.statistics.c.population, self.statistics.c.size],
432             and_(self.statistics.c.node == node,
433                  self.statistics.c.cluster == cluster))
434         rp = self.conn.execute(s)
435         r = rp.fetchone()
436         rp.close()
437         if not r:
438             prepopulation, presize = (0, 0)
439         else:
440             prepopulation, presize = r
441         population += prepopulation
442         size += presize
443         
444         #insert or replace
445         #TODO better upsert
446         u = self.statistics.update().where(and_(self.statistics.c.node==node,
447                                            self.statistics.c.cluster==cluster))
448         u = u.values(population=population, size=size, mtime=mtime)
449         rp = self.conn.execute(u)
450         rp.close()
451         if rp.rowcount == 0:
452             ins = self.statistics.insert()
453             ins = ins.values(node=node, population=population, size=size,
454                              mtime=mtime, cluster=cluster)
455             self.conn.execute(ins).close()
456     
457     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
458         """Update the statistics of the given node's parent.
459            Then recursively update all parents up to the root.
460            Population is not recursive.
461         """
462         
463         while True:
464             if node == ROOTNODE:
465                 break
466             props = self.node_get_properties(node)
467             if props is None:
468                 break
469             parent, path = props
470             self.statistics_update(parent, population, size, mtime, cluster)
471             node = parent
472             population = 0 # Population isn't recursive
473     
474     def statistics_latest(self, node, before=inf, except_cluster=0):
475         """Return population, total size and last mtime
476            for all latest versions under node that
477            do not belong to the cluster.
478         """
479         
480         # The node.
481         props = self.node_get_properties(node)
482         if props is None:
483             return None
484         parent, path = props
485         
486         # The latest version.
487         s = select([self.versions.c.serial,
488                     self.versions.c.node,
489                     self.versions.c.hash,
490                     self.versions.c.size,
491                     self.versions.c.source,
492                     self.versions.c.mtime,
493                     self.versions.c.muser,
494                     self.versions.c.uuid,
495                     self.versions.c.cluster])
496         filtered = select([func.max(self.versions.c.serial)],
497                             self.versions.c.node == node)
498         if before != inf:
499             filtered = filtered.where(self.versions.c.mtime < before)
500         s = s.where(and_(self.versions.c.cluster != except_cluster,
501                          self.versions.c.serial == filtered))
502         r = self.conn.execute(s)
503         props = r.fetchone()
504         r.close()
505         if not props:
506             return None
507         mtime = props[MTIME]
508         
509         # First level, just under node (get population).
510         v = self.versions.alias('v')
511         s = select([func.count(v.c.serial),
512                     func.sum(v.c.size),
513                     func.max(v.c.mtime)])
514         c1 = select([func.max(self.versions.c.serial)])
515         if before != inf:
516             c1 = c1.where(self.versions.c.mtime < before)
517         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
518         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
519                          v.c.cluster != except_cluster,
520                          v.c.node.in_(c2)))
521         rp = self.conn.execute(s)
522         r = rp.fetchone()
523         rp.close()
524         if not r:
525             return None
526         count = r[0]
527         mtime = max(mtime, r[2])
528         if count == 0:
529             return (0, 0, mtime)
530         
531         # All children (get size and mtime).
532         # XXX: This is why the full path is stored.
533         s = select([func.count(v.c.serial),
534                     func.sum(v.c.size),
535                     func.max(v.c.mtime)])
536         c1 = select([func.max(self.versions.c.serial)],
537             self.versions.c.node == v.c.node)
538         if before != inf:
539             c1 = c1.where(self.versions.c.mtime < before)
540         c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
541         s = s.where(and_(v.c.serial == c1,
542                          v.c.cluster != except_cluster,
543                          v.c.node.in_(c2)))
544         rp = self.conn.execute(s)
545         r = rp.fetchone()
546         rp.close()
547         if not r:
548             return None
549         size = r[1] - props[SIZE]
550         mtime = max(mtime, r[2])
551         return (count, size, mtime)
552     
553     def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
554         """Create a new version from the given properties.
555            Return the (serial, mtime) of the new version.
556         """
557         
558         mtime = time()
559         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
560                                           mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
561         serial = self.conn.execute(s).inserted_primary_key[0]
562         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
563         return serial, mtime
564     
565     def version_lookup(self, node, before=inf, cluster=0):
566         """Lookup the current version of the given node.
567            Return a list with its properties:
568            (serial, node, hash, size, source, mtime, muser, uuid, cluster)
569            or None if the current version is not found in the given cluster.
570         """
571         
572         v = self.versions.alias('v')
573         s = select([v.c.serial, v.c.node, v.c.hash,
574                     v.c.size, v.c.source, v.c.mtime,
575                     v.c.muser, v.c.uuid, v.c.cluster])
576         c = select([func.max(self.versions.c.serial)],
577             self.versions.c.node == node)
578         if before != inf:
579             c = c.where(self.versions.c.mtime < before)
580         s = s.where(and_(v.c.serial == c,
581                          v.c.cluster == cluster))
582         r = self.conn.execute(s)
583         props = r.fetchone()
584         r.close()
585         if props:
586             return props
587         return None
588     
589     def version_get_properties(self, serial, keys=(), propnames=_propnames):
590         """Return a sequence of values for the properties of
591            the version specified by serial and the keys, in the order given.
592            If keys is empty, return all properties in the order
593            (serial, node, hash, size, source, mtime, muser, uuid, cluster).
594         """
595         
596         v = self.versions.alias()
597         s = select([v.c.serial, v.c.node, v.c.hash,
598                     v.c.size, v.c.source, v.c.mtime,
599                     v.c.muser, v.c.uuid, v.c.cluster], v.c.serial == serial)
600         rp = self.conn.execute(s)
601         r = rp.fetchone()
602         rp.close()
603         if r is None:
604             return r
605         
606         if not keys:
607             return r
608         return [r[propnames[k]] for k in keys if k in propnames]
609     
610     def version_recluster(self, serial, cluster):
611         """Move the version into another cluster."""
612         
613         props = self.version_get_properties(serial)
614         if not props:
615             return
616         node = props[NODE]
617         size = props[SIZE]
618         oldcluster = props[CLUSTER]
619         if cluster == oldcluster:
620             return
621         
622         mtime = time()
623         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
624         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
625         
626         s = self.versions.update()
627         s = s.where(self.versions.c.serial == serial)
628         s = s.values(cluster = cluster)
629         self.conn.execute(s).close()
630     
631     def version_remove(self, serial):
632         """Remove the serial specified."""
633         
634         props = self.version_get_properties(serial)
635         if not props:
636             return
637         node = props[NODE]
638         hash = props[HASH]
639         size = props[SIZE]
640         cluster = props[CLUSTER]
641         
642         mtime = time()
643         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
644         
645         s = self.versions.delete().where(self.versions.c.serial == serial)
646         self.conn.execute(s).close()
647         return hash
648     
649     def attribute_get(self, serial, domain, keys=()):
650         """Return a list of (key, value) pairs of the version specified by serial.
651            If keys is empty, return all attributes.
652            Othwerise, return only those specified.
653         """
654         
655         if keys:
656             attrs = self.attributes.alias()
657             s = select([attrs.c.key, attrs.c.value])
658             s = s.where(and_(attrs.c.key.in_(keys),
659                              attrs.c.serial == serial,
660                              attrs.c.domain == domain))
661         else:
662             attrs = self.attributes.alias()
663             s = select([attrs.c.key, attrs.c.value])
664             s = s.where(and_(attrs.c.serial == serial,
665                              attrs.c.domain == domain))
666         r = self.conn.execute(s)
667         l = r.fetchall()
668         r.close()
669         return l
670     
671     def attribute_set(self, serial, domain, items):
672         """Set the attributes of the version specified by serial.
673            Receive attributes as an iterable of (key, value) pairs.
674         """
675         #insert or replace
676         #TODO better upsert
677         for k, v in items:
678             s = self.attributes.update()
679             s = s.where(and_(self.attributes.c.serial == serial,
680                              self.attributes.c.domain == domain,
681                              self.attributes.c.key == k))
682             s = s.values(value = v)
683             rp = self.conn.execute(s)
684             rp.close()
685             if rp.rowcount == 0:
686                 s = self.attributes.insert()
687                 s = s.values(serial=serial, domain=domain, key=k, value=v)
688                 self.conn.execute(s).close()
689     
690     def attribute_del(self, serial, domain, keys=()):
691         """Delete attributes of the version specified by serial.
692            If keys is empty, delete all attributes.
693            Otherwise delete those specified.
694         """
695         
696         if keys:
697             #TODO more efficient way to do this?
698             for key in keys:
699                 s = self.attributes.delete()
700                 s = s.where(and_(self.attributes.c.serial == serial,
701                                  self.attributes.c.domain == domain,
702                                  self.attributes.c.key == key))
703                 self.conn.execute(s).close()
704         else:
705             s = self.attributes.delete()
706             s = s.where(and_(self.attributes.c.serial == serial,
707                              self.attributes.c.domain == domain))
708             self.conn.execute(s).close()
709     
710     def attribute_copy(self, source, dest):
711         s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
712             self.attributes.c.serial == source)
713         rp = self.conn.execute(s)
714         attributes = rp.fetchall()
715         rp.close()
716         for dest, domain, k, v in attributes:
717             #insert or replace
718             s = self.attributes.update().where(and_(
719                 self.attributes.c.serial == dest,
720                 self.attributes.c.domain == domain,
721                 self.attributes.c.key == k))
722             rp = self.conn.execute(s, value=v)
723             rp.close()
724             if rp.rowcount == 0:
725                 s = self.attributes.insert()
726                 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
727                 self.conn.execute(s, values).close()
728     
729     def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
730         """Return a list with all keys pairs defined
731            for all latest versions under parent that
732            do not belong to the cluster.
733         """
734         
735         # TODO: Use another table to store before=inf results.
736         a = self.attributes.alias('a')
737         v = self.versions.alias('v')
738         n = self.nodes.alias('n')
739         s = select([a.c.key]).distinct()
740         filtered = select([func.max(self.versions.c.serial)])
741         if before != inf:
742             filtered = filtered.where(self.versions.c.mtime < before)
743         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
744         s = s.where(v.c.cluster != except_cluster)
745         s = s.where(v.c.node.in_(select([self.nodes.c.node],
746             self.nodes.c.parent == parent)))
747         s = s.where(a.c.serial == v.c.serial)
748         s = s.where(a.c.domain == domain)
749         s = s.where(n.c.node == v.c.node)
750         conj = []
751         for x in pathq:
752             conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
753         if conj:
754             s = s.where(or_(*conj))
755         rp = self.conn.execute(s)
756         rows = rp.fetchall()
757         rp.close()
758         return [r[0] for r in rows]
759     
760     def latest_version_list(self, parent, prefix='', delimiter=None,
761                             start='', limit=10000, before=inf,
762                             except_cluster=0, pathq=[], domain=None, filterq=[], sizeq=None):
763         """Return a (list of (path, serial) tuples, list of common prefixes)
764            for the current versions of the paths with the given parent,
765            matching the following criteria.
766            
767            The property tuple for a version is returned if all
768            of these conditions are true:
769                 
770                 a. parent matches
771                 
772                 b. path > start
773                 
774                 c. path starts with prefix (and paths in pathq)
775                 
776                 d. version is the max up to before
777                 
778                 e. version is not in cluster
779                 
780                 f. the path does not have the delimiter occuring
781                    after the prefix, or ends with the delimiter
782                 
783                 g. serial matches the attribute filter query.
784                    
785                    A filter query is a comma-separated list of
786                    terms in one of these three forms:
787                    
788                    key
789                        an attribute with this key must exist
790                    
791                    !key
792                        an attribute with this key must not exist
793                    
794                    key ?op value
795                        the attribute with this key satisfies the value
796                        where ?op is one of ==, != <=, >=, <, >.
797                 
798                 h. the size is in the range set by sizeq
799            
800            The list of common prefixes includes the prefixes
801            matching up to the first delimiter after prefix,
802            and are reported only once, as "virtual directories".
803            The delimiter is included in the prefixes.
804            
805            If arguments are None, then the corresponding matching rule
806            will always match.
807            
808            Limit applies to the first list of tuples returned.
809         """
810         
811         if not start or start < prefix:
812             start = strprevling(prefix)
813         nextling = strnextling(prefix)
814         
815         v = self.versions.alias('v')
816         n = self.nodes.alias('n')
817         s = select([n.c.path, v.c.serial]).distinct()
818         filtered = select([func.max(self.versions.c.serial)])
819         if before != inf:
820             filtered = filtered.where(self.versions.c.mtime < before)
821         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
822         s = s.where(v.c.cluster != except_cluster)
823         s = s.where(v.c.node.in_(select([self.nodes.c.node],
824             self.nodes.c.parent == parent)))
825         
826         s = s.where(n.c.node == v.c.node)
827         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
828         conj = []
829         for x in pathq:
830             conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
831         if conj:
832             s = s.where(or_(*conj))
833         
834         if sizeq and len(sizeq) == 2:
835             if sizeq[0]:
836                 s = s.where(v.c.size >= sizeq[0])
837             if sizeq[1]:
838                 s = s.where(v.c.size < sizeq[1])
839         
840         if domain and filterq:
841             a = self.attributes.alias('a')
842             included, excluded, opers = parse_filters(filterq)
843             if included:
844                 subs = select([1])
845                 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
846                 subs = subs.where(a.c.domain == domain)
847                 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
848                 s = s.where(exists(subs))
849             if excluded:
850                 subs = select([1])
851                 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
852                 subs = subs.where(a.c.domain == domain)
853                 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
854                 s = s.where(not_(exists(subs)))
855             if opers:
856                 for k, o, val in opers:
857                     subs = select([1])
858                     subs = subs.where(a.c.serial == v.c.serial).correlate(v)
859                     subs = subs.where(a.c.domain == domain)
860                     subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
861                     s = s.where(exists(subs))
862         
863         s = s.order_by(n.c.path)
864         
865         if not delimiter:
866             s = s.limit(limit)
867             rp = self.conn.execute(s, start=start)
868             r = rp.fetchall()
869             rp.close()
870             return r, ()
871         
872         pfz = len(prefix)
873         dz = len(delimiter)
874         count = 0
875         prefixes = []
876         pappend = prefixes.append
877         matches = []
878         mappend = matches.append
879         
880         rp = self.conn.execute(s, start=start)
881         while True:
882             props = rp.fetchone()
883             if props is None:
884                 break
885             path, serial = props
886             idx = path.find(delimiter, pfz)
887             
888             if idx < 0:
889                 mappend(props)
890                 count += 1
891                 if count >= limit:
892                     break
893                 continue
894             
895             if idx + dz == len(path):
896                 mappend(props)
897                 count += 1
898                 continue # Get one more, in case there is a path.
899             pf = path[:idx + dz]
900             pappend(pf)
901             if count >= limit: 
902                 break
903             
904             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
905         rp.close()
906         
907         return matches, prefixes
908     
909     def latest_uuid(self, uuid):
910         """Return a (path, serial) tuple, for the latest version of the given uuid."""
911         
912         v = self.versions.alias('v')
913         n = self.nodes.alias('n')
914         s = select([n.c.path, v.c.serial])
915         filtered = select([func.max(self.versions.c.serial)])
916         s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
917         s = s.where(n.c.node == v.c.node)
918         
919         r = self.conn.execute(s)
920         l = r.fetchone()
921         r.close()
922         return l