Fixes trailing spaces handling
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, null, select, bindparam, text
39 from sqlalchemy.ext.compiler import compiles
40 #from sqlalchemy.dialects.mysql import VARBINARY
41 from sqlalchemy.engine.reflection import Inspector
42
43 from dbworker import DBWorker
44
45 ROOTNODE  = 0
46
47 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
48
49 inf = float('inf')
50
51
52 def strnextling(prefix):
53     """Return the first unicode string
54        greater than but not starting with given prefix.
55        strnextling('hello') -> 'hellp'
56     """
57     if not prefix:
58         ## all strings start with the null string,
59         ## therefore we have to approximate strnextling('')
60         ## with the last unicode character supported by python
61         ## 0x10ffff for wide (32-bit unicode) python builds
62         ## 0x00ffff for narrow (16-bit unicode) python builds
63         ## We will not autodetect. 0xffff is safe enough.
64         return unichr(0xffff)
65     s = prefix[:-1]
66     c = ord(prefix[-1])
67     if c >= 0xffff:
68         raise RuntimeError
69     s += unichr(c+1)
70     return s
71
72 def strprevling(prefix):
73     """Return an approximation of the last unicode string
74        less than but not starting with given prefix.
75        strprevling(u'hello') -> u'helln\\xffff'
76     """
77     if not prefix:
78         ## There is no prevling for the null string
79         return prefix
80     s = prefix[:-1]
81     c = ord(prefix[-1])
82     if c > 0:
83         #s += unichr(c-1) + unichr(0xffff)
84         s += unichr(c-1)
85     return s
86
87
88 _propnames = {
89     'serial'    : 0,
90     'node'      : 1,
91     'hash'      : 2,
92     'size'      : 3,
93     'source'    : 4,
94     'mtime'     : 5,
95     'muser'     : 6,
96     'cluster'   : 7,
97 }
98
99
100 class Node(DBWorker):
101     """Nodes store path organization and have multiple versions.
102        Versions store object history and have multiple attributes.
103        Attributes store metadata.
104     """
105     
106     # TODO: Provide an interface for included and excluded clusters.
107     
108     def __init__(self, **params):
109         DBWorker.__init__(self, **params)
110         metadata = MetaData()
111         
112         #create nodes table
113         columns=[]
114         columns.append(Column('node', Integer, primary_key=True))
115         columns.append(Column('parent', Integer,
116                               ForeignKey('nodes.node',
117                                          ondelete='CASCADE',
118                                          onupdate='CASCADE'),
119                               autoincrement=False))
120         path_length = 2048
121         path_length_in_bytes = path_length * 4
122         columns.append(Column('path', Text(path_length_in_bytes), default='', nullable=False))
123         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
124         # place an index on path
125         #Index('idx_nodes_path', self.nodes.c.path)
126         
127         #create policy table
128         columns=[]
129         columns.append(Column('node', Integer,
130                               ForeignKey('nodes.node',
131                                          ondelete='CASCADE',
132                                          onupdate='CASCADE'),
133                               primary_key=True))
134         columns.append(Column('key', String(255), primary_key=True))
135         columns.append(Column('value', String(255)))
136         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
137         
138         #create statistics table
139         columns=[]
140         columns.append(Column('node', Integer,
141                               ForeignKey('nodes.node',
142                                          ondelete='CASCADE',
143                                          onupdate='CASCADE'),
144                               primary_key=True))
145         columns.append(Column('population', Integer, nullable=False, default=0))
146         columns.append(Column('size', BigInteger, nullable=False, default=0))
147         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
148         columns.append(Column('cluster', Integer, nullable=False, default=0,
149                               primary_key=True, autoincrement=False))
150         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
151         
152         #create versions table
153         columns=[]
154         columns.append(Column('serial', Integer, primary_key=True))
155         columns.append(Column('node', Integer,
156                               ForeignKey('nodes.node',
157                                          ondelete='CASCADE',
158                                          onupdate='CASCADE')))
159         columns.append(Column('hash', String(255)))
160         columns.append(Column('size', BigInteger, nullable=False, default=0))
161         columns.append(Column('source', Integer))
162         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
163         columns.append(Column('muser', String(255), nullable=False, default=''))
164         columns.append(Column('cluster', Integer, nullable=False, default=0))
165         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
166         Index('idx_versions_node_mtime', self.versions.c.node,
167               self.versions.c.mtime)
168         
169         #create attributes table
170         columns = []
171         columns.append(Column('serial', Integer,
172                               ForeignKey('versions.serial',
173                                          ondelete='CASCADE',
174                                          onupdate='CASCADE'),
175                               primary_key=True))
176         columns.append(Column('key', String(255), primary_key=True))
177         columns.append(Column('value', String(255)))
178         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
179         
180         metadata.create_all(self.engine)
181         
182         # the following code creates an index of specific length
183         # this can be accompliced in sqlalchemy >= 0.7.3
184         # providing mysql_length option during index creation
185         insp = Inspector.from_engine(self.engine)
186         indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
187         if 'idx_nodes_path' not in indexes:
188             s = text('CREATE INDEX idx_nodes_path ON nodes (path(%s))' %path_length_in_bytes)
189             self.conn.execute(s).close()
190         
191         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
192                                            self.nodes.c.parent == ROOTNODE))
193         rp = self.conn.execute(s)
194         r = rp.fetchone()
195         rp.close()
196         if not r:
197             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
198             self.conn.execute(s)
199     
200     def node_create(self, parent, path):
201         """Create a new node from the given properties.
202            Return the node identifier of the new node.
203         """
204         #TODO catch IntegrityError?
205         s = self.nodes.insert().values(parent=parent, path=path)
206         r = self.conn.execute(s)
207         inserted_primary_key = r.inserted_primary_key[0]
208         r.close()
209         return inserted_primary_key
210     
211     def node_lookup(self, path):
212         """Lookup the current node of the given path.
213            Return None if the path is not found.
214         """
215         
216         s = select([self.nodes.c.node], self.nodes.c.path.like(path))
217         r = self.conn.execute(s)
218         row = r.fetchone()
219         r.close()
220         if row:
221             return row[0]
222         return None
223     
224     def node_get_properties(self, node):
225         """Return the node's (parent, path).
226            Return None if the node is not found.
227         """
228         
229         s = select([self.nodes.c.parent, self.nodes.c.path])
230         s = s.where(self.nodes.c.node == node)
231         r = self.conn.execute(s)
232         l = r.fetchone()
233         r.close()
234         return l
235     
236     def node_get_versions(self, node, keys=(), propnames=_propnames):
237         """Return the properties of all versions at node.
238            If keys is empty, return all properties in the order
239            (serial, node, size, source, mtime, muser, cluster).
240         """
241         
242         s = select([self.versions.c.serial,
243                     self.versions.c.node,
244                     self.versions.c.hash,
245                     self.versions.c.size,
246                     self.versions.c.source,
247                     self.versions.c.mtime,
248                     self.versions.c.muser,
249                     self.versions.c.cluster], self.versions.c.node == node)
250         s = s.order_by(self.versions.c.serial)
251         r = self.conn.execute(s)
252         rows = r.fetchall()
253         r.close()
254         if not rows:
255             return rows
256         
257         if not keys:
258             return rows
259         
260         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
261     
262     def node_count_children(self, node):
263         """Return node's child count."""
264         
265         s = select([func.count(self.nodes.c.node)])
266         s = s.where(and_(self.nodes.c.parent == node,
267                          self.nodes.c.node != ROOTNODE))
268         r = self.conn.execute(s)
269         row = r.fetchone()
270         r.close()
271         return row[0]
272     
273     def node_purge_children(self, parent, before=inf, cluster=0):
274         """Delete all versions with the specified
275            parent and cluster, and return
276            the serials of versions deleted.
277            Clears out nodes with no remaining versions.
278         """
279         #update statistics
280         c1 = select([self.nodes.c.node],
281             self.nodes.c.parent == parent)
282         where_clause = and_(self.versions.c.node.in_(c1),
283                             self.versions.c.cluster == cluster)
284         s = select([func.count(self.versions.c.serial),
285                     func.sum(self.versions.c.size)])
286         s = s.where(where_clause)
287         if before != inf:
288             s = s.where(self.versions.c.mtime <= before)
289         r = self.conn.execute(s)
290         row = r.fetchone()
291         r.close()
292         if not row:
293             return ()
294         nr, size = row[0], -row[1] if row[1] else 0
295         mtime = time()
296         self.statistics_update(parent, -nr, size, mtime, cluster)
297         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
298         
299         s = select([self.versions.c.serial])
300         s = s.where(where_clause)
301         r = self.conn.execute(s)
302         serials = [row[SERIAL] for row in r.fetchall()]
303         r.close()
304         
305         #delete versions
306         s = self.versions.delete().where(where_clause)
307         r = self.conn.execute(s)
308         r.close()
309         
310         #delete nodes
311         s = select([self.nodes.c.node],
312             and_(self.nodes.c.parent == parent,
313                  select([func.count(self.versions.c.serial)],
314                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
315         rp = self.conn.execute(s)
316         nodes = [r[0] for r in rp.fetchall()]
317         rp.close()
318         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
319         self.conn.execute(s).close()
320         
321         return serials
322     
323     def node_purge(self, node, before=inf, cluster=0):
324         """Delete all versions with the specified
325            node and cluster, and return
326            the serials of versions deleted.
327            Clears out the node if it has no remaining versions.
328         """
329         
330         #update statistics
331         s = select([func.count(self.versions.c.serial),
332                     func.sum(self.versions.c.size)])
333         where_clause = and_(self.versions.c.node == node,
334                          self.versions.c.cluster == cluster)
335         s = s.where(where_clause)
336         if before != inf:
337             s = s.where(self.versions.c.mtime <= before)
338         r = self.conn.execute(s)
339         row = r.fetchone()
340         nr, size = row[0], row[1]
341         r.close()
342         if not nr:
343             return ()
344         mtime = time()
345         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
346         
347         s = select([self.versions.c.serial])
348         s = s.where(where_clause)
349         r = self.conn.execute(s)
350         serials = [r[SERIAL] for r in r.fetchall()]
351         r.close()
352         
353         #delete versions
354         s = self.versions.delete().where(where_clause)
355         r = self.conn.execute(s)
356         r.close()
357         
358         #delete nodes
359         s = select([self.nodes.c.node],
360             and_(self.nodes.c.node == node,
361                  select([func.count(self.versions.c.serial)],
362                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
363         r = self.conn.execute(s)
364         nodes = r.fetchall()
365         r.close()
366         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
367         self.conn.execute(s).close()
368         
369         return serials
370     
371     def node_remove(self, node):
372         """Remove the node specified.
373            Return false if the node has children or is not found.
374         """
375         
376         if self.node_count_children(node):
377             return False
378         
379         mtime = time()
380         s = select([func.count(self.versions.c.serial),
381                     func.sum(self.versions.c.size),
382                     self.versions.c.cluster])
383         s = s.where(self.versions.c.node == node)
384         s = s.group_by(self.versions.c.cluster)
385         r = self.conn.execute(s)
386         for population, size, cluster in r.fetchall():
387             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
388         r.close()
389         
390         s = self.nodes.delete().where(self.nodes.c.node == node)
391         self.conn.execute(s).close()
392         return True
393     
394     def policy_get(self, node):
395         s = select([self.policies.c.key, self.policies.c.value],
396             self.policies.c.node==node)
397         r = self.conn.execute(s)
398         d = dict(r.fetchall())
399         r.close()
400         return d
401     
402     def policy_set(self, node, policy):
403         #insert or replace
404         for k, v in policy.iteritems():
405             s = self.policies.update().where(and_(self.policies.c.node == node,
406                                                   self.policies.c.key == k))
407             s = s.values(value = v)
408             rp = self.conn.execute(s)
409             rp.close()
410             if rp.rowcount == 0:
411                 s = self.policies.insert()
412                 values = {'node':node, 'key':k, 'value':v}
413                 r = self.conn.execute(s, values)
414                 r.close()
415     
416     def statistics_get(self, node, cluster=0):
417         """Return population, total size and last mtime
418            for all versions under node that belong to the cluster.
419         """
420         
421         s = select([self.statistics.c.population,
422                     self.statistics.c.size,
423                     self.statistics.c.mtime])
424         s = s.where(and_(self.statistics.c.node == node,
425                          self.statistics.c.cluster == cluster))
426         r = self.conn.execute(s)
427         row = r.fetchone()
428         r.close()
429         return row
430     
431     def statistics_update(self, node, population, size, mtime, cluster=0):
432         """Update the statistics of the given node.
433            Statistics keep track the population, total
434            size of objects and mtime in the node's namespace.
435            May be zero or positive or negative numbers.
436         """
437         
438         s = select([self.statistics.c.population, self.statistics.c.size],
439             and_(self.statistics.c.node == node,
440                  self.statistics.c.cluster == cluster))
441         rp = self.conn.execute(s)
442         r = rp.fetchone()
443         rp.close()
444         if not r:
445             prepopulation, presize = (0, 0)
446         else:
447             prepopulation, presize = r
448         population += prepopulation
449         size += presize
450         
451         #insert or replace
452         #TODO better upsert
453         u = self.statistics.update().where(and_(self.statistics.c.node==node,
454                                            self.statistics.c.cluster==cluster))
455         u = u.values(population=population, size=size, mtime=mtime)
456         rp = self.conn.execute(u)
457         rp.close()
458         if rp.rowcount == 0:
459             ins = self.statistics.insert()
460             ins = ins.values(node=node, population=population, size=size,
461                              mtime=mtime, cluster=cluster)
462             self.conn.execute(ins).close()
463     
464     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
465         """Update the statistics of the given node's parent.
466            Then recursively update all parents up to the root.
467            Population is not recursive.
468         """
469         
470         while True:
471             if node == ROOTNODE:
472                 break
473             props = self.node_get_properties(node)
474             if props is None:
475                 break
476             parent, path = props
477             self.statistics_update(parent, population, size, mtime, cluster)
478             node = parent
479             population = 0 # Population isn't recursive
480     
481     def statistics_latest(self, node, before=inf, except_cluster=0):
482         """Return population, total size and last mtime
483            for all latest versions under node that
484            do not belong to the cluster.
485         """
486         
487         # The node.
488         props = self.node_get_properties(node)
489         if props is None:
490             return None
491         parent, path = props
492         
493         # The latest version.
494         s = select([self.versions.c.serial,
495                     self.versions.c.node,
496                     self.versions.c.hash,
497                     self.versions.c.size,
498                     self.versions.c.source,
499                     self.versions.c.mtime,
500                     self.versions.c.muser,
501                     self.versions.c.cluster])
502         filtered = select([func.max(self.versions.c.serial)],
503                             self.versions.c.node == node)
504         if before != inf:
505             filtered = filtered.where(self.versions.c.mtime < before)
506         s = s.where(and_(self.versions.c.cluster != except_cluster,
507                          self.versions.c.serial == filtered))
508         r = self.conn.execute(s)
509         props = r.fetchone()
510         r.close()
511         if not props:
512             return None
513         mtime = props[MTIME]
514         
515         # First level, just under node (get population).
516         v = self.versions.alias('v')
517         s = select([func.count(v.c.serial),
518                     func.sum(v.c.size),
519                     func.max(v.c.mtime)])
520         c1 = select([func.max(self.versions.c.serial)])
521         if before != inf:
522             c1 = c1.where(self.versions.c.mtime < before)
523         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
524         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
525                          v.c.cluster != except_cluster,
526                          v.c.node.in_(c2)))
527         rp = self.conn.execute(s)
528         r = rp.fetchone()
529         rp.close()
530         if not r:
531             return None
532         count = r[0]
533         mtime = max(mtime, r[2])
534         if count == 0:
535             return (0, 0, mtime)
536         
537         # All children (get size and mtime).
538         # XXX: This is why the full path is stored.
539         s = select([func.count(v.c.serial),
540                     func.sum(v.c.size),
541                     func.max(v.c.mtime)])
542         c1 = select([func.max(self.versions.c.serial)],
543             self.versions.c.node == v.c.node)
544         if before != inf:
545             c1 = c1.where(self.versions.c.mtime < before)
546         c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
547         s = s.where(and_(v.c.serial == c1,
548                          v.c.cluster != except_cluster,
549                          v.c.node.in_(c2)))
550         rp = self.conn.execute(s)
551         r = rp.fetchone()
552         rp.close()
553         if not r:
554             return None
555         size = r[1] - props[SIZE]
556         mtime = max(mtime, r[2])
557         return (count, size, mtime)
558     
559     def version_create(self, node, hash, size, source, muser, cluster=0):
560         """Create a new version from the given properties.
561            Return the (serial, mtime) of the new version.
562         """
563         
564         mtime = time()
565         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
566                                           mtime=mtime, muser=muser, cluster=cluster)
567         serial = self.conn.execute(s).inserted_primary_key[0]
568         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
569         return serial, mtime
570     
571     def version_lookup(self, node, before=inf, cluster=0):
572         """Lookup the current version of the given node.
573            Return a list with its properties:
574            (serial, node, hash, size, source, mtime, muser, cluster)
575            or None if the current version is not found in the given cluster.
576         """
577         
578         v = self.versions.alias('v')
579         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
580                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
581         c = select([func.max(self.versions.c.serial)],
582             self.versions.c.node == node)
583         if before != inf:
584             c = c.where(self.versions.c.mtime < before)
585         s = s.where(and_(v.c.serial == c,
586                          v.c.cluster == cluster))
587         r = self.conn.execute(s)
588         props = r.fetchone()
589         r.close()
590         if props:
591             return props
592         return None
593     
594     def version_get_properties(self, serial, keys=(), propnames=_propnames):
595         """Return a sequence of values for the properties of
596            the version specified by serial and the keys, in the order given.
597            If keys is empty, return all properties in the order
598            (serial, node, hash, size, source, mtime, muser, cluster).
599         """
600         
601         v = self.versions.alias()
602         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
603                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
604         rp = self.conn.execute(s)
605         r = rp.fetchone()
606         rp.close()
607         if r is None:
608             return r
609         
610         if not keys:
611             return r
612         return [r[propnames[k]] for k in keys if k in propnames]
613     
614     def version_recluster(self, serial, cluster):
615         """Move the version into another cluster."""
616         
617         props = self.version_get_properties(serial)
618         if not props:
619             return
620         node = props[NODE]
621         size = props[SIZE]
622         oldcluster = props[CLUSTER]
623         if cluster == oldcluster:
624             return
625         
626         mtime = time()
627         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
628         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
629         
630         s = self.versions.update()
631         s = s.where(self.versions.c.serial == serial)
632         s = s.values(cluster = cluster)
633         self.conn.execute(s).close()
634     
635     def version_remove(self, serial):
636         """Remove the serial specified."""
637         
638         props = self.node_get_properties(serial)
639         if not props:
640             return
641         node = props[NODE]
642         size = props[SIZE]
643         cluster = props[CLUSTER]
644         
645         mtime = time()
646         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
647         
648         s = self.versions.delete().where(self.versions.c.serial == serial)
649         self.conn.execute(s).close()
650         return True
651     
652     def attribute_get(self, serial, keys=()):
653         """Return a list of (key, value) pairs of the version specified by serial.
654            If keys is empty, return all attributes.
655            Othwerise, return only those specified.
656         """
657         
658         if keys:
659             attrs = self.attributes.alias()
660             s = select([attrs.c.key, attrs.c.value])
661             s = s.where(and_(attrs.c.key.in_(keys),
662                              attrs.c.serial == serial))
663         else:
664             attrs = self.attributes.alias()
665             s = select([attrs.c.key, attrs.c.value])
666             s = s.where(attrs.c.serial == serial)
667         r = self.conn.execute(s)
668         l = r.fetchall()
669         r.close()
670         return l
671     
672     def attribute_set(self, serial, items):
673         """Set the attributes of the version specified by serial.
674            Receive attributes as an iterable of (key, value) pairs.
675         """
676         #insert or replace
677         #TODO better upsert
678         for k, v in items:
679             s = self.attributes.update()
680             s = s.where(and_(self.attributes.c.serial == serial,
681                              self.attributes.c.key == k))
682             s = s.values(value = v)
683             rp = self.conn.execute(s)
684             rp.close()
685             if rp.rowcount == 0:
686                 s = self.attributes.insert()
687                 s = s.values(serial=serial, key=k, value=v)
688                 self.conn.execute(s).close()
689     
690     def attribute_del(self, serial, keys=()):
691         """Delete attributes of the version specified by serial.
692            If keys is empty, delete all attributes.
693            Otherwise delete those specified.
694         """
695         
696         if keys:
697             #TODO more efficient way to do this?
698             for key in keys:
699                 s = self.attributes.delete()
700                 s = s.where(and_(self.attributes.c.serial == serial,
701                                  self.attributes.c.key == key))
702                 self.conn.execute(s).close()
703         else:
704             s = self.attributes.delete()
705             s = s.where(self.attributes.c.serial == serial)
706             self.conn.execute(s).close()
707     
708     def attribute_copy(self, source, dest):
709         s = select([dest, self.attributes.c.key, self.attributes.c.value],
710             self.attributes.c.serial == source)
711         rp = self.conn.execute(s)
712         attributes = rp.fetchall()
713         rp.close()
714         for dest, k, v in attributes:
715             #insert or replace
716             s = self.attributes.update().where(and_(
717                 self.attributes.c.serial == dest,
718                 self.attributes.c.key == k))
719             rp = self.conn.execute(s, value=v)
720             rp.close()
721             if rp.rowcount == 0:
722                 s = self.attributes.insert()
723                 values = {'serial':dest, 'key':k, 'value':v}
724                 self.conn.execute(s, values).close()
725     
726     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
727         """Return a list with all keys pairs defined
728            for all latest versions under parent that
729            do not belong to the cluster.
730         """
731         
732         # TODO: Use another table to store before=inf results.
733         a = self.attributes.alias('a')
734         v = self.versions.alias('v')
735         n = self.nodes.alias('n')
736         s = select([a.c.key]).distinct()
737         filtered = select([func.max(self.versions.c.serial)])
738         if before != inf:
739             filtered = filtered.where(self.versions.c.mtime < before)
740         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
741         s = s.where(v.c.cluster != except_cluster)
742         s = s.where(v.c.node.in_(select([self.nodes.c.node],
743             self.nodes.c.parent == parent)))
744         s = s.where(a.c.serial == v.c.serial)
745         s = s.where(n.c.node == v.c.node)
746         conj = []
747         for x in pathq:
748             conj.append(n.c.path.like(x + '%'))
749         if conj:
750             s = s.where(or_(*conj))
751         rp = self.conn.execute(s)
752         rows = rp.fetchall()
753         rp.close()
754         return [r[0] for r in rows]
755     
756     def latest_version_list(self, parent, prefix='', delimiter=None,
757                             start='', limit=10000, before=inf,
758                             except_cluster=0, pathq=[], filterq=None):
759         """Return a (list of (path, serial) tuples, list of common prefixes)
760            for the current versions of the paths with the given parent,
761            matching the following criteria.
762            
763            The property tuple for a version is returned if all
764            of these conditions are true:
765                 
766                 a. parent matches
767                 
768                 b. path > start
769                 
770                 c. path starts with prefix (and paths in pathq)
771                 
772                 d. version is the max up to before
773                 
774                 e. version is not in cluster
775                 
776                 f. the path does not have the delimiter occuring
777                    after the prefix, or ends with the delimiter
778                 
779                 g. serial matches the attribute filter query.
780                    
781                    A filter query is a comma-separated list of
782                    terms in one of these three forms:
783                    
784                    key
785                        an attribute with this key must exist
786                    
787                    !key
788                        an attribute with this key must not exist
789                    
790                    key ?op value
791                        the attribute with this key satisfies the value
792                        where ?op is one of ==, != <=, >=, <, >.
793            
794            The list of common prefixes includes the prefixes
795            matching up to the first delimiter after prefix,
796            and are reported only once, as "virtual directories".
797            The delimiter is included in the prefixes.
798            
799            If arguments are None, then the corresponding matching rule
800            will always match.
801            
802            Limit applies to the first list of tuples returned.
803         """
804         
805         if not start or start < prefix:
806             start = strprevling(prefix)
807         nextling = strnextling(prefix)
808         
809         a = self.attributes.alias('a')
810         v = self.versions.alias('v')
811         n = self.nodes.alias('n')
812         s = select([n.c.path, v.c.serial]).distinct()
813         filtered = select([func.max(self.versions.c.serial)])
814         if before != inf:
815             filtered = filtered.where(self.versions.c.mtime < before)
816         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
817         s = s.where(v.c.cluster != except_cluster)
818         s = s.where(v.c.node.in_(select([self.nodes.c.node],
819             self.nodes.c.parent == parent)))
820         if filterq:
821             s = s.where(a.c.serial == v.c.serial)
822         
823         s = s.where(n.c.node == v.c.node)
824         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
825         conj = []
826         for x in pathq:
827             conj.append(n.c.path.like(x + '%'))
828         
829         if conj:
830             s = s.where(or_(*conj))
831         
832         if filterq:
833             s = s.where(a.c.key.in_(filterq.split(',')))
834         
835         s = s.order_by(n.c.path)
836         
837         if not delimiter:
838             s = s.limit(limit)
839             rp = self.conn.execute(s, start=start)
840             r = rp.fetchall()
841             rp.close()
842             return r, ()
843         
844         pfz = len(prefix)
845         dz = len(delimiter)
846         count = 0
847         prefixes = []
848         pappend = prefixes.append
849         matches = []
850         mappend = matches.append
851         
852         rp = self.conn.execute(s, start=start)
853         while True:
854             props = rp.fetchone()
855             if props is None:
856                 break
857             path, serial = props
858             idx = path.find(delimiter, pfz)
859             
860             if idx < 0:
861                 mappend(props)
862                 count += 1
863                 if count >= limit:
864                     break
865                 continue
866             
867             if idx + dz == len(path):
868                 mappend(props)
869                 count += 1
870                 continue # Get one more, in case there is a path.
871             pf = path[:idx + dz]
872             pappend(pf)
873             if count >= limit: 
874                 break
875             
876             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
877         rp.close()
878         
879         return matches, prefixes