Fix listing with prefix
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, null, select, bindparam, text
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41
42 from dbworker import DBWorker
43
44 ROOTNODE  = 0
45
46 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
47
48 inf = float('inf')
49
50
51 def strnextling(prefix):
52     """Return the first unicode string
53        greater than but not starting with given prefix.
54        strnextling('hello') -> 'hellp'
55     """
56     if not prefix:
57         ## all strings start with the null string,
58         ## therefore we have to approximate strnextling('')
59         ## with the last unicode character supported by python
60         ## 0x10ffff for wide (32-bit unicode) python builds
61         ## 0x00ffff for narrow (16-bit unicode) python builds
62         ## We will not autodetect. 0xffff is safe enough.
63         return unichr(0xffff)
64     s = prefix[:-1]
65     c = ord(prefix[-1])
66     if c >= 0xffff:
67         raise RuntimeError
68     s += unichr(c+1)
69     return s
70
71 def strprevling(prefix):
72     """Return an approximation of the last unicode string
73        less than but not starting with given prefix.
74        strprevling(u'hello') -> u'helln\\xffff'
75     """
76     if not prefix:
77         ## There is no prevling for the null string
78         return prefix
79     s = prefix[:-1]
80     c = ord(prefix[-1])
81     if c > 0:
82         s += unichr(c-1) + unichr(0xffff)
83     return s
84
85
86 _propnames = {
87     'serial'    : 0,
88     'node'      : 1,
89     'hash'      : 2,
90     'size'      : 3,
91     'source'    : 4,
92     'mtime'     : 5,
93     'muser'     : 6,
94     'cluster'   : 7,
95 }
96
97
98 class Node(DBWorker):
99     """Nodes store path organization and have multiple versions.
100        Versions store object history and have multiple attributes.
101        Attributes store metadata.
102     """
103     
104     # TODO: Provide an interface for included and excluded clusters.
105     
106     def __init__(self, **params):
107         DBWorker.__init__(self, **params)
108         metadata = MetaData()
109         
110         #create nodes table
111         columns=[]
112         columns.append(Column('node', Integer, primary_key=True))
113         columns.append(Column('parent', Integer,
114                               ForeignKey('nodes.node',
115                                          ondelete='CASCADE',
116                                          onupdate='CASCADE'),
117                               autoincrement=False))
118         path_length = 2048
119         columns.append(Column('path', String(path_length), default='', nullable=False))
120         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
121         
122         #create policy table
123         columns=[]
124         columns.append(Column('node', Integer,
125                               ForeignKey('nodes.node',
126                                          ondelete='CASCADE',
127                                          onupdate='CASCADE'),
128                               primary_key=True))
129         columns.append(Column('key', String(255), primary_key=True))
130         columns.append(Column('value', String(255)))
131         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
132         
133         #create statistics table
134         columns=[]
135         columns.append(Column('node', Integer,
136                               ForeignKey('nodes.node',
137                                          ondelete='CASCADE',
138                                          onupdate='CASCADE'),
139                               primary_key=True))
140         columns.append(Column('population', Integer, nullable=False, default=0))
141         columns.append(Column('size', BigInteger, nullable=False, default=0))
142         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
143         columns.append(Column('cluster', Integer, nullable=False, default=0,
144                               primary_key=True, autoincrement=False))
145         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
146         
147         #create versions table
148         columns=[]
149         columns.append(Column('serial', Integer, primary_key=True))
150         columns.append(Column('node', Integer,
151                               ForeignKey('nodes.node',
152                                          ondelete='CASCADE',
153                                          onupdate='CASCADE')))
154         columns.append(Column('hash', String(255)))
155         columns.append(Column('size', BigInteger, nullable=False, default=0))
156         columns.append(Column('source', Integer))
157         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
158         columns.append(Column('muser', String(255), nullable=False, default=''))
159         columns.append(Column('cluster', Integer, nullable=False, default=0))
160         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
161         Index('idx_versions_node_mtime', self.versions.c.node,
162               self.versions.c.mtime)
163         
164         #create attributes table
165         columns = []
166         columns.append(Column('serial', Integer,
167                               ForeignKey('versions.serial',
168                                          ondelete='CASCADE',
169                                          onupdate='CASCADE'),
170                               primary_key=True))
171         columns.append(Column('key', String(255), primary_key=True))
172         columns.append(Column('value', String(255)))
173         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
174         
175         metadata.create_all(self.engine)
176         
177         # the following code creates an index of specific length
178         # this can be accompliced in sqlalchemy >= 0.7.3
179         # providing mysql_length option during index creation
180         insp = Inspector.from_engine(self.engine)
181         indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
182         if 'idx_nodes_path' not in indexes:
183             explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
184             s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
185             self.conn.execute(s).close()
186         
187         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
188                                            self.nodes.c.parent == ROOTNODE))
189         rp = self.conn.execute(s)
190         r = rp.fetchone()
191         rp.close()
192         if not r:
193             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
194             self.conn.execute(s)
195     
196     def node_create(self, parent, path):
197         """Create a new node from the given properties.
198            Return the node identifier of the new node.
199         """
200         #TODO catch IntegrityError?
201         s = self.nodes.insert().values(parent=parent, path=path)
202         r = self.conn.execute(s)
203         inserted_primary_key = r.inserted_primary_key[0]
204         r.close()
205         return inserted_primary_key
206     
207     def node_lookup(self, path):
208         """Lookup the current node of the given path.
209            Return None if the path is not found.
210         """
211         
212         s = select([self.nodes.c.node], self.nodes.c.path.like(path))
213         r = self.conn.execute(s)
214         row = r.fetchone()
215         r.close()
216         if row:
217             return row[0]
218         return None
219     
220     def node_get_properties(self, node):
221         """Return the node's (parent, path).
222            Return None if the node is not found.
223         """
224         
225         s = select([self.nodes.c.parent, self.nodes.c.path])
226         s = s.where(self.nodes.c.node == node)
227         r = self.conn.execute(s)
228         l = r.fetchone()
229         r.close()
230         return l
231     
232     def node_get_versions(self, node, keys=(), propnames=_propnames):
233         """Return the properties of all versions at node.
234            If keys is empty, return all properties in the order
235            (serial, node, size, source, mtime, muser, cluster).
236         """
237         
238         s = select([self.versions.c.serial,
239                     self.versions.c.node,
240                     self.versions.c.hash,
241                     self.versions.c.size,
242                     self.versions.c.source,
243                     self.versions.c.mtime,
244                     self.versions.c.muser,
245                     self.versions.c.cluster], self.versions.c.node == node)
246         s = s.order_by(self.versions.c.serial)
247         r = self.conn.execute(s)
248         rows = r.fetchall()
249         r.close()
250         if not rows:
251             return rows
252         
253         if not keys:
254             return rows
255         
256         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
257     
258     def node_count_children(self, node):
259         """Return node's child count."""
260         
261         s = select([func.count(self.nodes.c.node)])
262         s = s.where(and_(self.nodes.c.parent == node,
263                          self.nodes.c.node != ROOTNODE))
264         r = self.conn.execute(s)
265         row = r.fetchone()
266         r.close()
267         return row[0]
268     
269     def node_purge_children(self, parent, before=inf, cluster=0):
270         """Delete all versions with the specified
271            parent and cluster, and return
272            the serials of versions deleted.
273            Clears out nodes with no remaining versions.
274         """
275         #update statistics
276         c1 = select([self.nodes.c.node],
277             self.nodes.c.parent == parent)
278         where_clause = and_(self.versions.c.node.in_(c1),
279                             self.versions.c.cluster == cluster)
280         s = select([func.count(self.versions.c.serial),
281                     func.sum(self.versions.c.size)])
282         s = s.where(where_clause)
283         if before != inf:
284             s = s.where(self.versions.c.mtime <= before)
285         r = self.conn.execute(s)
286         row = r.fetchone()
287         r.close()
288         if not row:
289             return ()
290         nr, size = row[0], -row[1] if row[1] else 0
291         mtime = time()
292         self.statistics_update(parent, -nr, size, mtime, cluster)
293         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
294         
295         s = select([self.versions.c.serial])
296         s = s.where(where_clause)
297         r = self.conn.execute(s)
298         serials = [row[SERIAL] for row in r.fetchall()]
299         r.close()
300         
301         #delete versions
302         s = self.versions.delete().where(where_clause)
303         r = self.conn.execute(s)
304         r.close()
305         
306         #delete nodes
307         s = select([self.nodes.c.node],
308             and_(self.nodes.c.parent == parent,
309                  select([func.count(self.versions.c.serial)],
310                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
311         rp = self.conn.execute(s)
312         nodes = [r[0] for r in rp.fetchall()]
313         rp.close()
314         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
315         self.conn.execute(s).close()
316         
317         return serials
318     
319     def node_purge(self, node, before=inf, cluster=0):
320         """Delete all versions with the specified
321            node and cluster, and return
322            the serials of versions deleted.
323            Clears out the node if it has no remaining versions.
324         """
325         
326         #update statistics
327         s = select([func.count(self.versions.c.serial),
328                     func.sum(self.versions.c.size)])
329         where_clause = and_(self.versions.c.node == node,
330                          self.versions.c.cluster == cluster)
331         s = s.where(where_clause)
332         if before != inf:
333             s = s.where(self.versions.c.mtime <= before)
334         r = self.conn.execute(s)
335         row = r.fetchone()
336         nr, size = row[0], row[1]
337         r.close()
338         if not nr:
339             return ()
340         mtime = time()
341         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
342         
343         s = select([self.versions.c.serial])
344         s = s.where(where_clause)
345         r = self.conn.execute(s)
346         serials = [r[SERIAL] for r in r.fetchall()]
347         r.close()
348         
349         #delete versions
350         s = self.versions.delete().where(where_clause)
351         r = self.conn.execute(s)
352         r.close()
353         
354         #delete nodes
355         s = select([self.nodes.c.node],
356             and_(self.nodes.c.node == node,
357                  select([func.count(self.versions.c.serial)],
358                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
359         r = self.conn.execute(s)
360         nodes = r.fetchall()
361         r.close()
362         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
363         self.conn.execute(s).close()
364         
365         return serials
366     
367     def node_remove(self, node):
368         """Remove the node specified.
369            Return false if the node has children or is not found.
370         """
371         
372         if self.node_count_children(node):
373             return False
374         
375         mtime = time()
376         s = select([func.count(self.versions.c.serial),
377                     func.sum(self.versions.c.size),
378                     self.versions.c.cluster])
379         s = s.where(self.versions.c.node == node)
380         s = s.group_by(self.versions.c.cluster)
381         r = self.conn.execute(s)
382         for population, size, cluster in r.fetchall():
383             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
384         r.close()
385         
386         s = self.nodes.delete().where(self.nodes.c.node == node)
387         self.conn.execute(s).close()
388         return True
389     
390     def policy_get(self, node):
391         s = select([self.policies.c.key, self.policies.c.value],
392             self.policies.c.node==node)
393         r = self.conn.execute(s)
394         d = dict(r.fetchall())
395         r.close()
396         return d
397     
398     def policy_set(self, node, policy):
399         #insert or replace
400         for k, v in policy.iteritems():
401             s = self.policies.update().where(and_(self.policies.c.node == node,
402                                                   self.policies.c.key == k))
403             s = s.values(value = v)
404             rp = self.conn.execute(s)
405             rp.close()
406             if rp.rowcount == 0:
407                 s = self.policies.insert()
408                 values = {'node':node, 'key':k, 'value':v}
409                 r = self.conn.execute(s, values)
410                 r.close()
411     
412     def statistics_get(self, node, cluster=0):
413         """Return population, total size and last mtime
414            for all versions under node that belong to the cluster.
415         """
416         
417         s = select([self.statistics.c.population,
418                     self.statistics.c.size,
419                     self.statistics.c.mtime])
420         s = s.where(and_(self.statistics.c.node == node,
421                          self.statistics.c.cluster == cluster))
422         r = self.conn.execute(s)
423         row = r.fetchone()
424         r.close()
425         return row
426     
427     def statistics_update(self, node, population, size, mtime, cluster=0):
428         """Update the statistics of the given node.
429            Statistics keep track the population, total
430            size of objects and mtime in the node's namespace.
431            May be zero or positive or negative numbers.
432         """
433         s = select([self.statistics.c.population, self.statistics.c.size],
434             and_(self.statistics.c.node == node,
435                  self.statistics.c.cluster == cluster))
436         rp = self.conn.execute(s)
437         r = rp.fetchone()
438         rp.close()
439         if not r:
440             prepopulation, presize = (0, 0)
441         else:
442             prepopulation, presize = r
443         population += prepopulation
444         size += presize
445         
446         #insert or replace
447         #TODO better upsert
448         u = self.statistics.update().where(and_(self.statistics.c.node==node,
449                                            self.statistics.c.cluster==cluster))
450         u = u.values(population=population, size=size, mtime=mtime)
451         rp = self.conn.execute(u)
452         rp.close()
453         if rp.rowcount == 0:
454             ins = self.statistics.insert()
455             ins = ins.values(node=node, population=population, size=size,
456                              mtime=mtime, cluster=cluster)
457             self.conn.execute(ins).close()
458     
459     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
460         """Update the statistics of the given node's parent.
461            Then recursively update all parents up to the root.
462            Population is not recursive.
463         """
464         
465         while True:
466             if node == ROOTNODE:
467                 break
468             props = self.node_get_properties(node)
469             if props is None:
470                 break
471             parent, path = props
472             self.statistics_update(parent, population, size, mtime, cluster)
473             node = parent
474             population = 0 # Population isn't recursive
475     
476     def statistics_latest(self, node, before=inf, except_cluster=0):
477         """Return population, total size and last mtime
478            for all latest versions under node that
479            do not belong to the cluster.
480         """
481         
482         # The node.
483         props = self.node_get_properties(node)
484         if props is None:
485             return None
486         parent, path = props
487         
488         # The latest version.
489         s = select([self.versions.c.serial,
490                     self.versions.c.node,
491                     self.versions.c.hash,
492                     self.versions.c.size,
493                     self.versions.c.source,
494                     self.versions.c.mtime,
495                     self.versions.c.muser,
496                     self.versions.c.cluster])
497         filtered = select([func.max(self.versions.c.serial)],
498                             self.versions.c.node == node)
499         if before != inf:
500             filtered = filtered.where(self.versions.c.mtime < before)
501         s = s.where(and_(self.versions.c.cluster != except_cluster,
502                          self.versions.c.serial == filtered))
503         r = self.conn.execute(s)
504         props = r.fetchone()
505         r.close()
506         if not props:
507             return None
508         mtime = props[MTIME]
509         
510         # First level, just under node (get population).
511         v = self.versions.alias('v')
512         s = select([func.count(v.c.serial),
513                     func.sum(v.c.size),
514                     func.max(v.c.mtime)])
515         c1 = select([func.max(self.versions.c.serial)])
516         if before != inf:
517             c1 = c1.where(self.versions.c.mtime < before)
518         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
519         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
520                          v.c.cluster != except_cluster,
521                          v.c.node.in_(c2)))
522         rp = self.conn.execute(s)
523         r = rp.fetchone()
524         rp.close()
525         if not r:
526             return None
527         count = r[0]
528         mtime = max(mtime, r[2])
529         if count == 0:
530             return (0, 0, mtime)
531         
532         # All children (get size and mtime).
533         # XXX: This is why the full path is stored.
534         s = select([func.count(v.c.serial),
535                     func.sum(v.c.size),
536                     func.max(v.c.mtime)])
537         c1 = select([func.max(self.versions.c.serial)],
538             self.versions.c.node == v.c.node)
539         if before != inf:
540             c1 = c1.where(self.versions.c.mtime < before)
541         c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
542         s = s.where(and_(v.c.serial == c1,
543                          v.c.cluster != except_cluster,
544                          v.c.node.in_(c2)))
545         rp = self.conn.execute(s)
546         r = rp.fetchone()
547         rp.close()
548         if not r:
549             return None
550         size = r[1] - props[SIZE]
551         mtime = max(mtime, r[2])
552         return (count, size, mtime)
553     
554     def version_create(self, node, hash, size, source, muser, cluster=0):
555         """Create a new version from the given properties.
556            Return the (serial, mtime) of the new version.
557         """
558         
559         mtime = time()
560         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
561                                           mtime=mtime, muser=muser, cluster=cluster)
562         serial = self.conn.execute(s).inserted_primary_key[0]
563         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
564         return serial, mtime
565     
566     def version_lookup(self, node, before=inf, cluster=0):
567         """Lookup the current version of the given node.
568            Return a list with its properties:
569            (serial, node, hash, size, source, mtime, muser, cluster)
570            or None if the current version is not found in the given cluster.
571         """
572         
573         v = self.versions.alias('v')
574         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
575                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
576         c = select([func.max(self.versions.c.serial)],
577             self.versions.c.node == node)
578         if before != inf:
579             c = c.where(self.versions.c.mtime < before)
580         s = s.where(and_(v.c.serial == c,
581                          v.c.cluster == cluster))
582         r = self.conn.execute(s)
583         props = r.fetchone()
584         r.close()
585         if props:
586             return props
587         return None
588     
589     def version_get_properties(self, serial, keys=(), propnames=_propnames):
590         """Return a sequence of values for the properties of
591            the version specified by serial and the keys, in the order given.
592            If keys is empty, return all properties in the order
593            (serial, node, hash, size, source, mtime, muser, cluster).
594         """
595         
596         v = self.versions.alias()
597         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
598                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
599         rp = self.conn.execute(s)
600         r = rp.fetchone()
601         rp.close()
602         if r is None:
603             return r
604         
605         if not keys:
606             return r
607         return [r[propnames[k]] for k in keys if k in propnames]
608     
609     def version_recluster(self, serial, cluster):
610         """Move the version into another cluster."""
611         
612         props = self.version_get_properties(serial)
613         if not props:
614             return
615         node = props[NODE]
616         size = props[SIZE]
617         oldcluster = props[CLUSTER]
618         if cluster == oldcluster:
619             return
620         
621         mtime = time()
622         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
623         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
624         
625         s = self.versions.update()
626         s = s.where(self.versions.c.serial == serial)
627         s = s.values(cluster = cluster)
628         self.conn.execute(s).close()
629     
630     def version_remove(self, serial):
631         """Remove the serial specified."""
632         
633         props = self.node_get_properties(serial)
634         if not props:
635             return
636         node = props[NODE]
637         size = props[SIZE]
638         cluster = props[CLUSTER]
639         
640         mtime = time()
641         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
642         
643         s = self.versions.delete().where(self.versions.c.serial == serial)
644         self.conn.execute(s).close()
645         return True
646     
647     def attribute_get(self, serial, keys=()):
648         """Return a list of (key, value) pairs of the version specified by serial.
649            If keys is empty, return all attributes.
650            Othwerise, return only those specified.
651         """
652         
653         if keys:
654             attrs = self.attributes.alias()
655             s = select([attrs.c.key, attrs.c.value])
656             s = s.where(and_(attrs.c.key.in_(keys),
657                              attrs.c.serial == serial))
658         else:
659             attrs = self.attributes.alias()
660             s = select([attrs.c.key, attrs.c.value])
661             s = s.where(attrs.c.serial == serial)
662         r = self.conn.execute(s)
663         l = r.fetchall()
664         r.close()
665         return l
666     
667     def attribute_set(self, serial, items):
668         """Set the attributes of the version specified by serial.
669            Receive attributes as an iterable of (key, value) pairs.
670         """
671         #insert or replace
672         #TODO better upsert
673         for k, v in items:
674             s = self.attributes.update()
675             s = s.where(and_(self.attributes.c.serial == serial,
676                              self.attributes.c.key == k))
677             s = s.values(value = v)
678             rp = self.conn.execute(s)
679             rp.close()
680             if rp.rowcount == 0:
681                 s = self.attributes.insert()
682                 s = s.values(serial=serial, key=k, value=v)
683                 self.conn.execute(s).close()
684     
685     def attribute_del(self, serial, keys=()):
686         """Delete attributes of the version specified by serial.
687            If keys is empty, delete all attributes.
688            Otherwise delete those specified.
689         """
690         
691         if keys:
692             #TODO more efficient way to do this?
693             for key in keys:
694                 s = self.attributes.delete()
695                 s = s.where(and_(self.attributes.c.serial == serial,
696                                  self.attributes.c.key == key))
697                 self.conn.execute(s).close()
698         else:
699             s = self.attributes.delete()
700             s = s.where(self.attributes.c.serial == serial)
701             self.conn.execute(s).close()
702     
703     def attribute_copy(self, source, dest):
704         s = select([dest, self.attributes.c.key, self.attributes.c.value],
705             self.attributes.c.serial == source)
706         rp = self.conn.execute(s)
707         attributes = rp.fetchall()
708         rp.close()
709         for dest, k, v in attributes:
710             #insert or replace
711             s = self.attributes.update().where(and_(
712                 self.attributes.c.serial == dest,
713                 self.attributes.c.key == k))
714             rp = self.conn.execute(s, value=v)
715             rp.close()
716             if rp.rowcount == 0:
717                 s = self.attributes.insert()
718                 values = {'serial':dest, 'key':k, 'value':v}
719                 self.conn.execute(s, values).close()
720     
721     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
722         """Return a list with all keys pairs defined
723            for all latest versions under parent that
724            do not belong to the cluster.
725         """
726         
727         # TODO: Use another table to store before=inf results.
728         a = self.attributes.alias('a')
729         v = self.versions.alias('v')
730         n = self.nodes.alias('n')
731         s = select([a.c.key]).distinct()
732         filtered = select([func.max(self.versions.c.serial)])
733         if before != inf:
734             filtered = filtered.where(self.versions.c.mtime < before)
735         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
736         s = s.where(v.c.cluster != except_cluster)
737         s = s.where(v.c.node.in_(select([self.nodes.c.node],
738             self.nodes.c.parent == parent)))
739         s = s.where(a.c.serial == v.c.serial)
740         s = s.where(n.c.node == v.c.node)
741         conj = []
742         for x in pathq:
743             conj.append(n.c.path.like(x + '%'))
744         if conj:
745             s = s.where(or_(*conj))
746         rp = self.conn.execute(s)
747         rows = rp.fetchall()
748         rp.close()
749         return [r[0] for r in rows]
750     
751     def latest_version_list(self, parent, prefix='', delimiter=None,
752                             start='', limit=10000, before=inf,
753                             except_cluster=0, pathq=[], filterq=None):
754         """Return a (list of (path, serial) tuples, list of common prefixes)
755            for the current versions of the paths with the given parent,
756            matching the following criteria.
757            
758            The property tuple for a version is returned if all
759            of these conditions are true:
760                 
761                 a. parent matches
762                 
763                 b. path > start
764                 
765                 c. path starts with prefix (and paths in pathq)
766                 
767                 d. version is the max up to before
768                 
769                 e. version is not in cluster
770                 
771                 f. the path does not have the delimiter occuring
772                    after the prefix, or ends with the delimiter
773                 
774                 g. serial matches the attribute filter query.
775                    
776                    A filter query is a comma-separated list of
777                    terms in one of these three forms:
778                    
779                    key
780                        an attribute with this key must exist
781                    
782                    !key
783                        an attribute with this key must not exist
784                    
785                    key ?op value
786                        the attribute with this key satisfies the value
787                        where ?op is one of ==, != <=, >=, <, >.
788            
789            The list of common prefixes includes the prefixes
790            matching up to the first delimiter after prefix,
791            and are reported only once, as "virtual directories".
792            The delimiter is included in the prefixes.
793            
794            If arguments are None, then the corresponding matching rule
795            will always match.
796            
797            Limit applies to the first list of tuples returned.
798         """
799         
800         if not start or start < prefix:
801             start = strprevling(prefix)
802         nextling = strnextling(prefix)
803         
804         a = self.attributes.alias('a')
805         v = self.versions.alias('v')
806         n = self.nodes.alias('n')
807         s = select([n.c.path, v.c.serial]).distinct()
808         filtered = select([func.max(self.versions.c.serial)])
809         if before != inf:
810             filtered = filtered.where(self.versions.c.mtime < before)
811         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
812         s = s.where(v.c.cluster != except_cluster)
813         s = s.where(v.c.node.in_(select([self.nodes.c.node],
814             self.nodes.c.parent == parent)))
815         if filterq:
816             s = s.where(a.c.serial == v.c.serial)
817         
818         s = s.where(n.c.node == v.c.node)
819         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
820         conj = []
821         for x in pathq:
822             conj.append(n.c.path.like(x + '%'))
823         
824         if conj:
825             s = s.where(or_(*conj))
826         
827         if filterq:
828             s = s.where(a.c.key.in_(filterq.split(',')))
829         
830         s = s.order_by(n.c.path)
831         
832         if not delimiter:
833             s = s.limit(limit)
834             rp = self.conn.execute(s, start=start)
835             r = rp.fetchall()
836             rp.close()
837             return r, ()
838         
839         pfz = len(prefix)
840         dz = len(delimiter)
841         count = 0
842         prefixes = []
843         pappend = prefixes.append
844         matches = []
845         mappend = matches.append
846         
847         rp = self.conn.execute(s, start=start)
848         while True:
849             props = rp.fetchone()
850             if props is None:
851                 break
852             path, serial = props
853             idx = path.find(delimiter, pfz)
854             
855             if idx < 0:
856                 mappend(props)
857                 count += 1
858                 if count >= limit:
859                     break
860                 continue
861             
862             if idx + dz == len(path):
863                 mappend(props)
864                 count += 1
865                 continue # Get one more, in case there is a path.
866             pf = path[:idx + dz]
867             pappend(pf)
868             if count >= limit: 
869                 break
870             
871             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
872         rp.close()
873         
874         return matches, prefixes