Remove deleted version's map.
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, null, select, bindparam, text
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41
42 from dbworker import DBWorker
43
44 ROOTNODE  = 0
45
46 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
47
48 inf = float('inf')
49
50
51 def strnextling(prefix):
52     """Return the first unicode string
53        greater than but not starting with given prefix.
54        strnextling('hello') -> 'hellp'
55     """
56     if not prefix:
57         ## all strings start with the null string,
58         ## therefore we have to approximate strnextling('')
59         ## with the last unicode character supported by python
60         ## 0x10ffff for wide (32-bit unicode) python builds
61         ## 0x00ffff for narrow (16-bit unicode) python builds
62         ## We will not autodetect. 0xffff is safe enough.
63         return unichr(0xffff)
64     s = prefix[:-1]
65     c = ord(prefix[-1])
66     if c >= 0xffff:
67         raise RuntimeError
68     s += unichr(c+1)
69     return s
70
71 def strprevling(prefix):
72     """Return an approximation of the last unicode string
73        less than but not starting with given prefix.
74        strprevling(u'hello') -> u'helln\\xffff'
75     """
76     if not prefix:
77         ## There is no prevling for the null string
78         return prefix
79     s = prefix[:-1]
80     c = ord(prefix[-1])
81     if c > 0:
82         s += unichr(c-1) + unichr(0xffff)
83     return s
84
85
86 _propnames = {
87     'serial'    : 0,
88     'node'      : 1,
89     'hash'      : 2,
90     'size'      : 3,
91     'source'    : 4,
92     'mtime'     : 5,
93     'muser'     : 6,
94     'cluster'   : 7,
95 }
96
97
98 class Node(DBWorker):
99     """Nodes store path organization and have multiple versions.
100        Versions store object history and have multiple attributes.
101        Attributes store metadata.
102     """
103     
104     # TODO: Provide an interface for included and excluded clusters.
105     
106     def __init__(self, **params):
107         DBWorker.__init__(self, **params)
108         metadata = MetaData()
109         
110         #create nodes table
111         columns=[]
112         columns.append(Column('node', Integer, primary_key=True))
113         columns.append(Column('parent', Integer,
114                               ForeignKey('nodes.node',
115                                          ondelete='CASCADE',
116                                          onupdate='CASCADE'),
117                               autoincrement=False))
118         path_length = 2048
119         columns.append(Column('path', String(path_length), default='', nullable=False))
120         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
121         
122         #create policy table
123         columns=[]
124         columns.append(Column('node', Integer,
125                               ForeignKey('nodes.node',
126                                          ondelete='CASCADE',
127                                          onupdate='CASCADE'),
128                               primary_key=True))
129         columns.append(Column('key', String(255), primary_key=True))
130         columns.append(Column('value', String(255)))
131         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
132         
133         #create statistics table
134         columns=[]
135         columns.append(Column('node', Integer,
136                               ForeignKey('nodes.node',
137                                          ondelete='CASCADE',
138                                          onupdate='CASCADE'),
139                               primary_key=True))
140         columns.append(Column('population', Integer, nullable=False, default=0))
141         columns.append(Column('size', BigInteger, nullable=False, default=0))
142         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
143         columns.append(Column('cluster', Integer, nullable=False, default=0,
144                               primary_key=True, autoincrement=False))
145         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
146         
147         #create versions table
148         columns=[]
149         columns.append(Column('serial', Integer, primary_key=True))
150         columns.append(Column('node', Integer,
151                               ForeignKey('nodes.node',
152                                          ondelete='CASCADE',
153                                          onupdate='CASCADE')))
154         columns.append(Column('hash', String(255)))
155         columns.append(Column('size', BigInteger, nullable=False, default=0))
156         columns.append(Column('source', Integer))
157         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
158         columns.append(Column('muser', String(255), nullable=False, default=''))
159         columns.append(Column('cluster', Integer, nullable=False, default=0))
160         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
161         Index('idx_versions_node_mtime', self.versions.c.node,
162               self.versions.c.mtime)
163         
164         #create attributes table
165         columns = []
166         columns.append(Column('serial', Integer,
167                               ForeignKey('versions.serial',
168                                          ondelete='CASCADE',
169                                          onupdate='CASCADE'),
170                               primary_key=True))
171         columns.append(Column('key', String(255), primary_key=True))
172         columns.append(Column('value', String(255)))
173         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
174         
175         metadata.create_all(self.engine)
176         
177         # the following code creates an index of specific length
178         # this can be accompliced in sqlalchemy >= 0.7.3
179         # providing mysql_length option during index creation
180         insp = Inspector.from_engine(self.engine)
181         indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
182         if 'idx_nodes_path' not in indexes:
183             explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
184             s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
185             self.conn.execute(s).close()
186         
187         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
188                                            self.nodes.c.parent == ROOTNODE))
189         rp = self.conn.execute(s)
190         r = rp.fetchone()
191         rp.close()
192         if not r:
193             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
194             self.conn.execute(s)
195     
196     def node_create(self, parent, path):
197         """Create a new node from the given properties.
198            Return the node identifier of the new node.
199         """
200         #TODO catch IntegrityError?
201         s = self.nodes.insert().values(parent=parent, path=path)
202         r = self.conn.execute(s)
203         inserted_primary_key = r.inserted_primary_key[0]
204         r.close()
205         return inserted_primary_key
206     
207     def node_lookup(self, path):
208         """Lookup the current node of the given path.
209            Return None if the path is not found.
210         """
211         
212         s = select([self.nodes.c.node], self.nodes.c.path.like(path))
213         r = self.conn.execute(s)
214         row = r.fetchone()
215         r.close()
216         if row:
217             return row[0]
218         return None
219     
220     def node_get_properties(self, node):
221         """Return the node's (parent, path).
222            Return None if the node is not found.
223         """
224         
225         s = select([self.nodes.c.parent, self.nodes.c.path])
226         s = s.where(self.nodes.c.node == node)
227         r = self.conn.execute(s)
228         l = r.fetchone()
229         r.close()
230         return l
231     
232     def node_get_versions(self, node, keys=(), propnames=_propnames):
233         """Return the properties of all versions at node.
234            If keys is empty, return all properties in the order
235            (serial, node, size, source, mtime, muser, cluster).
236         """
237         
238         s = select([self.versions.c.serial,
239                     self.versions.c.node,
240                     self.versions.c.hash,
241                     self.versions.c.size,
242                     self.versions.c.source,
243                     self.versions.c.mtime,
244                     self.versions.c.muser,
245                     self.versions.c.cluster], self.versions.c.node == node)
246         s = s.order_by(self.versions.c.serial)
247         r = self.conn.execute(s)
248         rows = r.fetchall()
249         r.close()
250         if not rows:
251             return rows
252         
253         if not keys:
254             return rows
255         
256         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
257     
258     def node_count_children(self, node):
259         """Return node's child count."""
260         
261         s = select([func.count(self.nodes.c.node)])
262         s = s.where(and_(self.nodes.c.parent == node,
263                          self.nodes.c.node != ROOTNODE))
264         r = self.conn.execute(s)
265         row = r.fetchone()
266         r.close()
267         return row[0]
268     
269     def node_purge_children(self, parent, before=inf, cluster=0):
270         """Delete all versions with the specified
271            parent and cluster, and return
272            the hashes of versions deleted.
273            Clears out nodes with no remaining versions.
274         """
275         #update statistics
276         c1 = select([self.nodes.c.node],
277             self.nodes.c.parent == parent)
278         where_clause = and_(self.versions.c.node.in_(c1),
279                             self.versions.c.cluster == cluster)
280         s = select([func.count(self.versions.c.serial),
281                     func.sum(self.versions.c.size)])
282         s = s.where(where_clause)
283         if before != inf:
284             s = s.where(self.versions.c.mtime <= before)
285         r = self.conn.execute(s)
286         row = r.fetchone()
287         r.close()
288         if not row:
289             return ()
290         nr, size = row[0], -row[1] if row[1] else 0
291         mtime = time()
292         self.statistics_update(parent, -nr, size, mtime, cluster)
293         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
294         
295         s = select([self.versions.c.hash])
296         s = s.where(where_clause)
297         r = self.conn.execute(s)
298         hashes = [row[0] for row in r.fetchall()]
299         r.close()
300         
301         #delete versions
302         s = self.versions.delete().where(where_clause)
303         r = self.conn.execute(s)
304         r.close()
305         
306         #delete nodes
307         s = select([self.nodes.c.node],
308             and_(self.nodes.c.parent == parent,
309                  select([func.count(self.versions.c.serial)],
310                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
311         rp = self.conn.execute(s)
312         nodes = [r[0] for r in rp.fetchall()]
313         rp.close()
314         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
315         self.conn.execute(s).close()
316         
317         return hashes
318     
319     def node_purge(self, node, before=inf, cluster=0):
320         """Delete all versions with the specified
321            node and cluster, and return
322            the hashes of versions deleted.
323            Clears out the node if it has no remaining versions.
324         """
325         
326         #update statistics
327         s = select([func.count(self.versions.c.serial),
328                     func.sum(self.versions.c.size)])
329         where_clause = and_(self.versions.c.node == node,
330                          self.versions.c.cluster == cluster)
331         s = s.where(where_clause)
332         if before != inf:
333             s = s.where(self.versions.c.mtime <= before)
334         r = self.conn.execute(s)
335         row = r.fetchone()
336         nr, size = row[0], row[1]
337         r.close()
338         if not nr:
339             return ()
340         mtime = time()
341         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
342         
343         s = select([self.versions.c.hash])
344         s = s.where(where_clause)
345         r = self.conn.execute(s)
346         hashes = [r[0] for r in r.fetchall()]
347         r.close()
348         
349         #delete versions
350         s = self.versions.delete().where(where_clause)
351         r = self.conn.execute(s)
352         r.close()
353         
354         #delete nodes
355         s = select([self.nodes.c.node],
356             and_(self.nodes.c.node == node,
357                  select([func.count(self.versions.c.serial)],
358                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
359         r = self.conn.execute(s)
360         nodes = r.fetchall()
361         r.close()
362         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
363         self.conn.execute(s).close()
364         
365         return hashes
366     
367     def node_remove(self, node):
368         """Remove the node specified.
369            Return false if the node has children or is not found.
370         """
371         
372         if self.node_count_children(node):
373             return False
374         
375         mtime = time()
376         s = select([func.count(self.versions.c.serial),
377                     func.sum(self.versions.c.size),
378                     self.versions.c.cluster])
379         s = s.where(self.versions.c.node == node)
380         s = s.group_by(self.versions.c.cluster)
381         r = self.conn.execute(s)
382         for population, size, cluster in r.fetchall():
383             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
384         r.close()
385         
386         s = self.nodes.delete().where(self.nodes.c.node == node)
387         self.conn.execute(s).close()
388         return True
389     
390     def policy_get(self, node):
391         s = select([self.policies.c.key, self.policies.c.value],
392             self.policies.c.node==node)
393         r = self.conn.execute(s)
394         d = dict(r.fetchall())
395         r.close()
396         return d
397     
398     def policy_set(self, node, policy):
399         #insert or replace
400         for k, v in policy.iteritems():
401             s = self.policies.update().where(and_(self.policies.c.node == node,
402                                                   self.policies.c.key == k))
403             s = s.values(value = v)
404             rp = self.conn.execute(s)
405             rp.close()
406             if rp.rowcount == 0:
407                 s = self.policies.insert()
408                 values = {'node':node, 'key':k, 'value':v}
409                 r = self.conn.execute(s, values)
410                 r.close()
411     
412     def statistics_get(self, node, cluster=0):
413         """Return population, total size and last mtime
414            for all versions under node that belong to the cluster.
415         """
416         
417         s = select([self.statistics.c.population,
418                     self.statistics.c.size,
419                     self.statistics.c.mtime])
420         s = s.where(and_(self.statistics.c.node == node,
421                          self.statistics.c.cluster == cluster))
422         r = self.conn.execute(s)
423         row = r.fetchone()
424         r.close()
425         return row
426     
427     def statistics_update(self, node, population, size, mtime, cluster=0):
428         """Update the statistics of the given node.
429            Statistics keep track the population, total
430            size of objects and mtime in the node's namespace.
431            May be zero or positive or negative numbers.
432         """
433         s = select([self.statistics.c.population, self.statistics.c.size],
434             and_(self.statistics.c.node == node,
435                  self.statistics.c.cluster == cluster))
436         rp = self.conn.execute(s)
437         r = rp.fetchone()
438         rp.close()
439         if not r:
440             prepopulation, presize = (0, 0)
441         else:
442             prepopulation, presize = r
443         population += prepopulation
444         size += presize
445         
446         #insert or replace
447         #TODO better upsert
448         u = self.statistics.update().where(and_(self.statistics.c.node==node,
449                                            self.statistics.c.cluster==cluster))
450         u = u.values(population=population, size=size, mtime=mtime)
451         rp = self.conn.execute(u)
452         rp.close()
453         if rp.rowcount == 0:
454             ins = self.statistics.insert()
455             ins = ins.values(node=node, population=population, size=size,
456                              mtime=mtime, cluster=cluster)
457             self.conn.execute(ins).close()
458     
459     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
460         """Update the statistics of the given node's parent.
461            Then recursively update all parents up to the root.
462            Population is not recursive.
463         """
464         
465         while True:
466             if node == ROOTNODE:
467                 break
468             props = self.node_get_properties(node)
469             if props is None:
470                 break
471             parent, path = props
472             self.statistics_update(parent, population, size, mtime, cluster)
473             node = parent
474             population = 0 # Population isn't recursive
475     
476     def statistics_latest(self, node, before=inf, except_cluster=0):
477         """Return population, total size and last mtime
478            for all latest versions under node that
479            do not belong to the cluster.
480         """
481         
482         # The node.
483         props = self.node_get_properties(node)
484         if props is None:
485             return None
486         parent, path = props
487         
488         # The latest version.
489         s = select([self.versions.c.serial,
490                     self.versions.c.node,
491                     self.versions.c.hash,
492                     self.versions.c.size,
493                     self.versions.c.source,
494                     self.versions.c.mtime,
495                     self.versions.c.muser,
496                     self.versions.c.cluster])
497         filtered = select([func.max(self.versions.c.serial)],
498                             self.versions.c.node == node)
499         if before != inf:
500             filtered = filtered.where(self.versions.c.mtime < before)
501         s = s.where(and_(self.versions.c.cluster != except_cluster,
502                          self.versions.c.serial == filtered))
503         r = self.conn.execute(s)
504         props = r.fetchone()
505         r.close()
506         if not props:
507             return None
508         mtime = props[MTIME]
509         
510         # First level, just under node (get population).
511         v = self.versions.alias('v')
512         s = select([func.count(v.c.serial),
513                     func.sum(v.c.size),
514                     func.max(v.c.mtime)])
515         c1 = select([func.max(self.versions.c.serial)])
516         if before != inf:
517             c1 = c1.where(self.versions.c.mtime < before)
518         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
519         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
520                          v.c.cluster != except_cluster,
521                          v.c.node.in_(c2)))
522         rp = self.conn.execute(s)
523         r = rp.fetchone()
524         rp.close()
525         if not r:
526             return None
527         count = r[0]
528         mtime = max(mtime, r[2])
529         if count == 0:
530             return (0, 0, mtime)
531         
532         # All children (get size and mtime).
533         # XXX: This is why the full path is stored.
534         s = select([func.count(v.c.serial),
535                     func.sum(v.c.size),
536                     func.max(v.c.mtime)])
537         c1 = select([func.max(self.versions.c.serial)],
538             self.versions.c.node == v.c.node)
539         if before != inf:
540             c1 = c1.where(self.versions.c.mtime < before)
541         c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
542         s = s.where(and_(v.c.serial == c1,
543                          v.c.cluster != except_cluster,
544                          v.c.node.in_(c2)))
545         rp = self.conn.execute(s)
546         r = rp.fetchone()
547         rp.close()
548         if not r:
549             return None
550         size = r[1] - props[SIZE]
551         mtime = max(mtime, r[2])
552         return (count, size, mtime)
553     
554     def version_create(self, node, hash, size, source, muser, cluster=0):
555         """Create a new version from the given properties.
556            Return the (serial, mtime) of the new version.
557         """
558         
559         mtime = time()
560         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
561                                           mtime=mtime, muser=muser, cluster=cluster)
562         serial = self.conn.execute(s).inserted_primary_key[0]
563         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
564         return serial, mtime
565     
566     def version_lookup(self, node, before=inf, cluster=0):
567         """Lookup the current version of the given node.
568            Return a list with its properties:
569            (serial, node, hash, size, source, mtime, muser, cluster)
570            or None if the current version is not found in the given cluster.
571         """
572         
573         v = self.versions.alias('v')
574         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
575                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
576         c = select([func.max(self.versions.c.serial)],
577             self.versions.c.node == node)
578         if before != inf:
579             c = c.where(self.versions.c.mtime < before)
580         s = s.where(and_(v.c.serial == c,
581                          v.c.cluster == cluster))
582         r = self.conn.execute(s)
583         props = r.fetchone()
584         r.close()
585         if props:
586             return props
587         return None
588     
589     def version_get_properties(self, serial, keys=(), propnames=_propnames):
590         """Return a sequence of values for the properties of
591            the version specified by serial and the keys, in the order given.
592            If keys is empty, return all properties in the order
593            (serial, node, hash, size, source, mtime, muser, cluster).
594         """
595         
596         v = self.versions.alias()
597         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
598                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
599         rp = self.conn.execute(s)
600         r = rp.fetchone()
601         rp.close()
602         if r is None:
603             return r
604         
605         if not keys:
606             return r
607         return [r[propnames[k]] for k in keys if k in propnames]
608     
609     def version_recluster(self, serial, cluster):
610         """Move the version into another cluster."""
611         
612         props = self.version_get_properties(serial)
613         if not props:
614             return
615         node = props[NODE]
616         size = props[SIZE]
617         oldcluster = props[CLUSTER]
618         if cluster == oldcluster:
619             return
620         
621         mtime = time()
622         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
623         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
624         
625         s = self.versions.update()
626         s = s.where(self.versions.c.serial == serial)
627         s = s.values(cluster = cluster)
628         self.conn.execute(s).close()
629     
630     def version_remove(self, serial):
631         """Remove the serial specified."""
632         
633         props = self.version_get_properties(serial)
634         if not props:
635             return
636         node = props[NODE]
637         hash = props[HASH]
638         size = props[SIZE]
639         cluster = props[CLUSTER]
640         
641         mtime = time()
642         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
643         
644         s = self.versions.delete().where(self.versions.c.serial == serial)
645         self.conn.execute(s).close()
646         return hash
647     
648     def attribute_get(self, serial, keys=()):
649         """Return a list of (key, value) pairs of the version specified by serial.
650            If keys is empty, return all attributes.
651            Othwerise, return only those specified.
652         """
653         
654         if keys:
655             attrs = self.attributes.alias()
656             s = select([attrs.c.key, attrs.c.value])
657             s = s.where(and_(attrs.c.key.in_(keys),
658                              attrs.c.serial == serial))
659         else:
660             attrs = self.attributes.alias()
661             s = select([attrs.c.key, attrs.c.value])
662             s = s.where(attrs.c.serial == serial)
663         r = self.conn.execute(s)
664         l = r.fetchall()
665         r.close()
666         return l
667     
668     def attribute_set(self, serial, items):
669         """Set the attributes of the version specified by serial.
670            Receive attributes as an iterable of (key, value) pairs.
671         """
672         #insert or replace
673         #TODO better upsert
674         for k, v in items:
675             s = self.attributes.update()
676             s = s.where(and_(self.attributes.c.serial == serial,
677                              self.attributes.c.key == k))
678             s = s.values(value = v)
679             rp = self.conn.execute(s)
680             rp.close()
681             if rp.rowcount == 0:
682                 s = self.attributes.insert()
683                 s = s.values(serial=serial, key=k, value=v)
684                 self.conn.execute(s).close()
685     
686     def attribute_del(self, serial, keys=()):
687         """Delete attributes of the version specified by serial.
688            If keys is empty, delete all attributes.
689            Otherwise delete those specified.
690         """
691         
692         if keys:
693             #TODO more efficient way to do this?
694             for key in keys:
695                 s = self.attributes.delete()
696                 s = s.where(and_(self.attributes.c.serial == serial,
697                                  self.attributes.c.key == key))
698                 self.conn.execute(s).close()
699         else:
700             s = self.attributes.delete()
701             s = s.where(self.attributes.c.serial == serial)
702             self.conn.execute(s).close()
703     
704     def attribute_copy(self, source, dest):
705         s = select([dest, self.attributes.c.key, self.attributes.c.value],
706             self.attributes.c.serial == source)
707         rp = self.conn.execute(s)
708         attributes = rp.fetchall()
709         rp.close()
710         for dest, k, v in attributes:
711             #insert or replace
712             s = self.attributes.update().where(and_(
713                 self.attributes.c.serial == dest,
714                 self.attributes.c.key == k))
715             rp = self.conn.execute(s, value=v)
716             rp.close()
717             if rp.rowcount == 0:
718                 s = self.attributes.insert()
719                 values = {'serial':dest, 'key':k, 'value':v}
720                 self.conn.execute(s, values).close()
721     
722     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
723         """Return a list with all keys pairs defined
724            for all latest versions under parent that
725            do not belong to the cluster.
726         """
727         
728         # TODO: Use another table to store before=inf results.
729         a = self.attributes.alias('a')
730         v = self.versions.alias('v')
731         n = self.nodes.alias('n')
732         s = select([a.c.key]).distinct()
733         filtered = select([func.max(self.versions.c.serial)])
734         if before != inf:
735             filtered = filtered.where(self.versions.c.mtime < before)
736         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
737         s = s.where(v.c.cluster != except_cluster)
738         s = s.where(v.c.node.in_(select([self.nodes.c.node],
739             self.nodes.c.parent == parent)))
740         s = s.where(a.c.serial == v.c.serial)
741         s = s.where(n.c.node == v.c.node)
742         conj = []
743         for x in pathq:
744             conj.append(n.c.path.like(x + '%'))
745         if conj:
746             s = s.where(or_(*conj))
747         rp = self.conn.execute(s)
748         rows = rp.fetchall()
749         rp.close()
750         return [r[0] for r in rows]
751     
752     def latest_version_list(self, parent, prefix='', delimiter=None,
753                             start='', limit=10000, before=inf,
754                             except_cluster=0, pathq=[], filterq=None):
755         """Return a (list of (path, serial) tuples, list of common prefixes)
756            for the current versions of the paths with the given parent,
757            matching the following criteria.
758            
759            The property tuple for a version is returned if all
760            of these conditions are true:
761                 
762                 a. parent matches
763                 
764                 b. path > start
765                 
766                 c. path starts with prefix (and paths in pathq)
767                 
768                 d. version is the max up to before
769                 
770                 e. version is not in cluster
771                 
772                 f. the path does not have the delimiter occuring
773                    after the prefix, or ends with the delimiter
774                 
775                 g. serial matches the attribute filter query.
776                    
777                    A filter query is a comma-separated list of
778                    terms in one of these three forms:
779                    
780                    key
781                        an attribute with this key must exist
782                    
783                    !key
784                        an attribute with this key must not exist
785                    
786                    key ?op value
787                        the attribute with this key satisfies the value
788                        where ?op is one of ==, != <=, >=, <, >.
789            
790            The list of common prefixes includes the prefixes
791            matching up to the first delimiter after prefix,
792            and are reported only once, as "virtual directories".
793            The delimiter is included in the prefixes.
794            
795            If arguments are None, then the corresponding matching rule
796            will always match.
797            
798            Limit applies to the first list of tuples returned.
799         """
800         
801         if not start or start < prefix:
802             start = strprevling(prefix)
803         nextling = strnextling(prefix)
804         
805         a = self.attributes.alias('a')
806         v = self.versions.alias('v')
807         n = self.nodes.alias('n')
808         s = select([n.c.path, v.c.serial]).distinct()
809         filtered = select([func.max(self.versions.c.serial)])
810         if before != inf:
811             filtered = filtered.where(self.versions.c.mtime < before)
812         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
813         s = s.where(v.c.cluster != except_cluster)
814         s = s.where(v.c.node.in_(select([self.nodes.c.node],
815             self.nodes.c.parent == parent)))
816         if filterq:
817             s = s.where(a.c.serial == v.c.serial)
818         
819         s = s.where(n.c.node == v.c.node)
820         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
821         conj = []
822         for x in pathq:
823             conj.append(n.c.path.like(x + '%'))
824         
825         if conj:
826             s = s.where(or_(*conj))
827         
828         if filterq:
829             s = s.where(a.c.key.in_(filterq.split(',')))
830         
831         s = s.order_by(n.c.path)
832         
833         if not delimiter:
834             s = s.limit(limit)
835             rp = self.conn.execute(s, start=start)
836             r = rp.fetchall()
837             rp.close()
838             return r, ()
839         
840         pfz = len(prefix)
841         dz = len(delimiter)
842         count = 0
843         prefixes = []
844         pappend = prefixes.append
845         matches = []
846         mappend = matches.append
847         
848         rp = self.conn.execute(s, start=start)
849         while True:
850             props = rp.fetchone()
851             if props is None:
852                 break
853             path, serial = props
854             idx = path.find(delimiter, pfz)
855             
856             if idx < 0:
857                 mappend(props)
858                 count += 1
859                 if count >= limit:
860                     break
861                 continue
862             
863             if idx + dz == len(path):
864                 mappend(props)
865                 count += 1
866                 continue # Get one more, in case there is a path.
867             pf = path[:idx + dz]
868             pappend(pf)
869             if count >= limit: 
870                 break
871             
872             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
873         rp.close()
874         
875         return matches, prefixes