Update SQLAlchemy backend with metadata domains.
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41
42 from dbworker import DBWorker
43
44 from pithos.lib.filter import parse_filters
45
46
47 ROOTNODE  = 0
48
49 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
50
51 inf = float('inf')
52
53
54 def strnextling(prefix):
55     """Return the first unicode string
56        greater than but not starting with given prefix.
57        strnextling('hello') -> 'hellp'
58     """
59     if not prefix:
60         ## all strings start with the null string,
61         ## therefore we have to approximate strnextling('')
62         ## with the last unicode character supported by python
63         ## 0x10ffff for wide (32-bit unicode) python builds
64         ## 0x00ffff for narrow (16-bit unicode) python builds
65         ## We will not autodetect. 0xffff is safe enough.
66         return unichr(0xffff)
67     s = prefix[:-1]
68     c = ord(prefix[-1])
69     if c >= 0xffff:
70         raise RuntimeError
71     s += unichr(c+1)
72     return s
73
74 def strprevling(prefix):
75     """Return an approximation of the last unicode string
76        less than but not starting with given prefix.
77        strprevling(u'hello') -> u'helln\\xffff'
78     """
79     if not prefix:
80         ## There is no prevling for the null string
81         return prefix
82     s = prefix[:-1]
83     c = ord(prefix[-1])
84     if c > 0:
85         s += unichr(c-1) + unichr(0xffff)
86     return s
87
88 _propnames = {
89     'serial'    : 0,
90     'node'      : 1,
91     'hash'      : 2,
92     'size'      : 3,
93     'source'    : 4,
94     'mtime'     : 5,
95     'muser'     : 6,
96     'cluster'   : 7,
97 }
98
99
100 class Node(DBWorker):
101     """Nodes store path organization and have multiple versions.
102        Versions store object history and have multiple attributes.
103        Attributes store metadata.
104     """
105     
106     # TODO: Provide an interface for included and excluded clusters.
107     
108     def __init__(self, **params):
109         DBWorker.__init__(self, **params)
110         metadata = MetaData()
111         
112         #create nodes table
113         columns=[]
114         columns.append(Column('node', Integer, primary_key=True))
115         columns.append(Column('parent', Integer,
116                               ForeignKey('nodes.node',
117                                          ondelete='CASCADE',
118                                          onupdate='CASCADE'),
119                               autoincrement=False))
120         path_length = 2048
121         columns.append(Column('path', String(path_length), default='', nullable=False))
122         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123         
124         #create policy table
125         columns=[]
126         columns.append(Column('node', Integer,
127                               ForeignKey('nodes.node',
128                                          ondelete='CASCADE',
129                                          onupdate='CASCADE'),
130                               primary_key=True))
131         columns.append(Column('key', String(255), primary_key=True))
132         columns.append(Column('value', String(255)))
133         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
134         
135         #create statistics table
136         columns=[]
137         columns.append(Column('node', Integer,
138                               ForeignKey('nodes.node',
139                                          ondelete='CASCADE',
140                                          onupdate='CASCADE'),
141                               primary_key=True))
142         columns.append(Column('population', Integer, nullable=False, default=0))
143         columns.append(Column('size', BigInteger, nullable=False, default=0))
144         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
145         columns.append(Column('cluster', Integer, nullable=False, default=0,
146                               primary_key=True, autoincrement=False))
147         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
148         
149         #create versions table
150         columns=[]
151         columns.append(Column('serial', Integer, primary_key=True))
152         columns.append(Column('node', Integer,
153                               ForeignKey('nodes.node',
154                                          ondelete='CASCADE',
155                                          onupdate='CASCADE')))
156         columns.append(Column('hash', String(255)))
157         columns.append(Column('size', BigInteger, nullable=False, default=0))
158         columns.append(Column('source', Integer))
159         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
160         columns.append(Column('muser', String(255), nullable=False, default=''))
161         columns.append(Column('cluster', Integer, nullable=False, default=0))
162         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
163         Index('idx_versions_node_mtime', self.versions.c.node,
164               self.versions.c.mtime)
165         
166         #create attributes table
167         columns = []
168         columns.append(Column('serial', Integer,
169                               ForeignKey('versions.serial',
170                                          ondelete='CASCADE',
171                                          onupdate='CASCADE'),
172                               primary_key=True))
173         columns.append(Column('domain', String(255), primary_key=True))
174         columns.append(Column('key', String(255), primary_key=True))
175         columns.append(Column('value', String(255)))
176         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
177         
178         metadata.create_all(self.engine)
179         
180         # the following code creates an index of specific length
181         # this can be accompliced in sqlalchemy >= 0.7.3
182         # providing mysql_length option during index creation
183         insp = Inspector.from_engine(self.engine)
184         indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
185         if 'idx_nodes_path' not in indexes:
186             explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
187             s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
188             self.conn.execute(s).close()
189         
190         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
191                                            self.nodes.c.parent == ROOTNODE))
192         rp = self.conn.execute(s)
193         r = rp.fetchone()
194         rp.close()
195         if not r:
196             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
197             self.conn.execute(s)
198     
199     def node_create(self, parent, path):
200         """Create a new node from the given properties.
201            Return the node identifier of the new node.
202         """
203         #TODO catch IntegrityError?
204         s = self.nodes.insert().values(parent=parent, path=path)
205         r = self.conn.execute(s)
206         inserted_primary_key = r.inserted_primary_key[0]
207         r.close()
208         return inserted_primary_key
209     
210     def node_lookup(self, path):
211         """Lookup the current node of the given path.
212            Return None if the path is not found.
213         """
214         
215         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
216         s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
217         r = self.conn.execute(s)
218         row = r.fetchone()
219         r.close()
220         if row:
221             return row[0]
222         return None
223     
224     def node_get_properties(self, node):
225         """Return the node's (parent, path).
226            Return None if the node is not found.
227         """
228         
229         s = select([self.nodes.c.parent, self.nodes.c.path])
230         s = s.where(self.nodes.c.node == node)
231         r = self.conn.execute(s)
232         l = r.fetchone()
233         r.close()
234         return l
235     
236     def node_get_versions(self, node, keys=(), propnames=_propnames):
237         """Return the properties of all versions at node.
238            If keys is empty, return all properties in the order
239            (serial, node, size, source, mtime, muser, cluster).
240         """
241         
242         s = select([self.versions.c.serial,
243                     self.versions.c.node,
244                     self.versions.c.hash,
245                     self.versions.c.size,
246                     self.versions.c.source,
247                     self.versions.c.mtime,
248                     self.versions.c.muser,
249                     self.versions.c.cluster], self.versions.c.node == node)
250         s = s.order_by(self.versions.c.serial)
251         r = self.conn.execute(s)
252         rows = r.fetchall()
253         r.close()
254         if not rows:
255             return rows
256         
257         if not keys:
258             return rows
259         
260         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
261     
262     def node_count_children(self, node):
263         """Return node's child count."""
264         
265         s = select([func.count(self.nodes.c.node)])
266         s = s.where(and_(self.nodes.c.parent == node,
267                          self.nodes.c.node != ROOTNODE))
268         r = self.conn.execute(s)
269         row = r.fetchone()
270         r.close()
271         return row[0]
272     
273     def node_purge_children(self, parent, before=inf, cluster=0):
274         """Delete all versions with the specified
275            parent and cluster, and return
276            the hashes of versions deleted.
277            Clears out nodes with no remaining versions.
278         """
279         #update statistics
280         c1 = select([self.nodes.c.node],
281             self.nodes.c.parent == parent)
282         where_clause = and_(self.versions.c.node.in_(c1),
283                             self.versions.c.cluster == cluster)
284         s = select([func.count(self.versions.c.serial),
285                     func.sum(self.versions.c.size)])
286         s = s.where(where_clause)
287         if before != inf:
288             s = s.where(self.versions.c.mtime <= before)
289         r = self.conn.execute(s)
290         row = r.fetchone()
291         r.close()
292         if not row:
293             return ()
294         nr, size = row[0], -row[1] if row[1] else 0
295         mtime = time()
296         self.statistics_update(parent, -nr, size, mtime, cluster)
297         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
298         
299         s = select([self.versions.c.hash])
300         s = s.where(where_clause)
301         r = self.conn.execute(s)
302         hashes = [row[0] for row in r.fetchall()]
303         r.close()
304         
305         #delete versions
306         s = self.versions.delete().where(where_clause)
307         r = self.conn.execute(s)
308         r.close()
309         
310         #delete nodes
311         s = select([self.nodes.c.node],
312             and_(self.nodes.c.parent == parent,
313                  select([func.count(self.versions.c.serial)],
314                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
315         rp = self.conn.execute(s)
316         nodes = [r[0] for r in rp.fetchall()]
317         rp.close()
318         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
319         self.conn.execute(s).close()
320         
321         return hashes
322     
323     def node_purge(self, node, before=inf, cluster=0):
324         """Delete all versions with the specified
325            node and cluster, and return
326            the hashes of versions deleted.
327            Clears out the node if it has no remaining versions.
328         """
329         
330         #update statistics
331         s = select([func.count(self.versions.c.serial),
332                     func.sum(self.versions.c.size)])
333         where_clause = and_(self.versions.c.node == node,
334                          self.versions.c.cluster == cluster)
335         s = s.where(where_clause)
336         if before != inf:
337             s = s.where(self.versions.c.mtime <= before)
338         r = self.conn.execute(s)
339         row = r.fetchone()
340         nr, size = row[0], row[1]
341         r.close()
342         if not nr:
343             return ()
344         mtime = time()
345         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
346         
347         s = select([self.versions.c.hash])
348         s = s.where(where_clause)
349         r = self.conn.execute(s)
350         hashes = [r[0] for r in r.fetchall()]
351         r.close()
352         
353         #delete versions
354         s = self.versions.delete().where(where_clause)
355         r = self.conn.execute(s)
356         r.close()
357         
358         #delete nodes
359         s = select([self.nodes.c.node],
360             and_(self.nodes.c.node == node,
361                  select([func.count(self.versions.c.serial)],
362                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
363         r = self.conn.execute(s)
364         nodes = r.fetchall()
365         r.close()
366         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
367         self.conn.execute(s).close()
368         
369         return hashes
370     
371     def node_remove(self, node):
372         """Remove the node specified.
373            Return false if the node has children or is not found.
374         """
375         
376         if self.node_count_children(node):
377             return False
378         
379         mtime = time()
380         s = select([func.count(self.versions.c.serial),
381                     func.sum(self.versions.c.size),
382                     self.versions.c.cluster])
383         s = s.where(self.versions.c.node == node)
384         s = s.group_by(self.versions.c.cluster)
385         r = self.conn.execute(s)
386         for population, size, cluster in r.fetchall():
387             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
388         r.close()
389         
390         s = self.nodes.delete().where(self.nodes.c.node == node)
391         self.conn.execute(s).close()
392         return True
393     
394     def policy_get(self, node):
395         s = select([self.policies.c.key, self.policies.c.value],
396             self.policies.c.node==node)
397         r = self.conn.execute(s)
398         d = dict(r.fetchall())
399         r.close()
400         return d
401     
402     def policy_set(self, node, policy):
403         #insert or replace
404         for k, v in policy.iteritems():
405             s = self.policies.update().where(and_(self.policies.c.node == node,
406                                                   self.policies.c.key == k))
407             s = s.values(value = v)
408             rp = self.conn.execute(s)
409             rp.close()
410             if rp.rowcount == 0:
411                 s = self.policies.insert()
412                 values = {'node':node, 'key':k, 'value':v}
413                 r = self.conn.execute(s, values)
414                 r.close()
415     
416     def statistics_get(self, node, cluster=0):
417         """Return population, total size and last mtime
418            for all versions under node that belong to the cluster.
419         """
420         
421         s = select([self.statistics.c.population,
422                     self.statistics.c.size,
423                     self.statistics.c.mtime])
424         s = s.where(and_(self.statistics.c.node == node,
425                          self.statistics.c.cluster == cluster))
426         r = self.conn.execute(s)
427         row = r.fetchone()
428         r.close()
429         return row
430     
431     def statistics_update(self, node, population, size, mtime, cluster=0):
432         """Update the statistics of the given node.
433            Statistics keep track the population, total
434            size of objects and mtime in the node's namespace.
435            May be zero or positive or negative numbers.
436         """
437         s = select([self.statistics.c.population, self.statistics.c.size],
438             and_(self.statistics.c.node == node,
439                  self.statistics.c.cluster == cluster))
440         rp = self.conn.execute(s)
441         r = rp.fetchone()
442         rp.close()
443         if not r:
444             prepopulation, presize = (0, 0)
445         else:
446             prepopulation, presize = r
447         population += prepopulation
448         size += presize
449         
450         #insert or replace
451         #TODO better upsert
452         u = self.statistics.update().where(and_(self.statistics.c.node==node,
453                                            self.statistics.c.cluster==cluster))
454         u = u.values(population=population, size=size, mtime=mtime)
455         rp = self.conn.execute(u)
456         rp.close()
457         if rp.rowcount == 0:
458             ins = self.statistics.insert()
459             ins = ins.values(node=node, population=population, size=size,
460                              mtime=mtime, cluster=cluster)
461             self.conn.execute(ins).close()
462     
463     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
464         """Update the statistics of the given node's parent.
465            Then recursively update all parents up to the root.
466            Population is not recursive.
467         """
468         
469         while True:
470             if node == ROOTNODE:
471                 break
472             props = self.node_get_properties(node)
473             if props is None:
474                 break
475             parent, path = props
476             self.statistics_update(parent, population, size, mtime, cluster)
477             node = parent
478             population = 0 # Population isn't recursive
479     
480     def statistics_latest(self, node, before=inf, except_cluster=0):
481         """Return population, total size and last mtime
482            for all latest versions under node that
483            do not belong to the cluster.
484         """
485         
486         # The node.
487         props = self.node_get_properties(node)
488         if props is None:
489             return None
490         parent, path = props
491         
492         # The latest version.
493         s = select([self.versions.c.serial,
494                     self.versions.c.node,
495                     self.versions.c.hash,
496                     self.versions.c.size,
497                     self.versions.c.source,
498                     self.versions.c.mtime,
499                     self.versions.c.muser,
500                     self.versions.c.cluster])
501         filtered = select([func.max(self.versions.c.serial)],
502                             self.versions.c.node == node)
503         if before != inf:
504             filtered = filtered.where(self.versions.c.mtime < before)
505         s = s.where(and_(self.versions.c.cluster != except_cluster,
506                          self.versions.c.serial == filtered))
507         r = self.conn.execute(s)
508         props = r.fetchone()
509         r.close()
510         if not props:
511             return None
512         mtime = props[MTIME]
513         
514         # First level, just under node (get population).
515         v = self.versions.alias('v')
516         s = select([func.count(v.c.serial),
517                     func.sum(v.c.size),
518                     func.max(v.c.mtime)])
519         c1 = select([func.max(self.versions.c.serial)])
520         if before != inf:
521             c1 = c1.where(self.versions.c.mtime < before)
522         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
523         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
524                          v.c.cluster != except_cluster,
525                          v.c.node.in_(c2)))
526         rp = self.conn.execute(s)
527         r = rp.fetchone()
528         rp.close()
529         if not r:
530             return None
531         count = r[0]
532         mtime = max(mtime, r[2])
533         if count == 0:
534             return (0, 0, mtime)
535         
536         # All children (get size and mtime).
537         # XXX: This is why the full path is stored.
538         s = select([func.count(v.c.serial),
539                     func.sum(v.c.size),
540                     func.max(v.c.mtime)])
541         c1 = select([func.max(self.versions.c.serial)],
542             self.versions.c.node == v.c.node)
543         if before != inf:
544             c1 = c1.where(self.versions.c.mtime < before)
545         c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
546         s = s.where(and_(v.c.serial == c1,
547                          v.c.cluster != except_cluster,
548                          v.c.node.in_(c2)))
549         rp = self.conn.execute(s)
550         r = rp.fetchone()
551         rp.close()
552         if not r:
553             return None
554         size = r[1] - props[SIZE]
555         mtime = max(mtime, r[2])
556         return (count, size, mtime)
557     
558     def version_create(self, node, hash, size, source, muser, cluster=0):
559         """Create a new version from the given properties.
560            Return the (serial, mtime) of the new version.
561         """
562         
563         mtime = time()
564         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
565                                           mtime=mtime, muser=muser, cluster=cluster)
566         serial = self.conn.execute(s).inserted_primary_key[0]
567         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
568         return serial, mtime
569     
570     def version_lookup(self, node, before=inf, cluster=0):
571         """Lookup the current version of the given node.
572            Return a list with its properties:
573            (serial, node, hash, size, source, mtime, muser, cluster)
574            or None if the current version is not found in the given cluster.
575         """
576         
577         v = self.versions.alias('v')
578         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
579                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
580         c = select([func.max(self.versions.c.serial)],
581             self.versions.c.node == node)
582         if before != inf:
583             c = c.where(self.versions.c.mtime < before)
584         s = s.where(and_(v.c.serial == c,
585                          v.c.cluster == cluster))
586         r = self.conn.execute(s)
587         props = r.fetchone()
588         r.close()
589         if props:
590             return props
591         return None
592     
593     def version_get_properties(self, serial, keys=(), propnames=_propnames):
594         """Return a sequence of values for the properties of
595            the version specified by serial and the keys, in the order given.
596            If keys is empty, return all properties in the order
597            (serial, node, hash, size, source, mtime, muser, cluster).
598         """
599         
600         v = self.versions.alias()
601         s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
602                     v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
603         rp = self.conn.execute(s)
604         r = rp.fetchone()
605         rp.close()
606         if r is None:
607             return r
608         
609         if not keys:
610             return r
611         return [r[propnames[k]] for k in keys if k in propnames]
612     
613     def version_recluster(self, serial, cluster):
614         """Move the version into another cluster."""
615         
616         props = self.version_get_properties(serial)
617         if not props:
618             return
619         node = props[NODE]
620         size = props[SIZE]
621         oldcluster = props[CLUSTER]
622         if cluster == oldcluster:
623             return
624         
625         mtime = time()
626         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
627         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
628         
629         s = self.versions.update()
630         s = s.where(self.versions.c.serial == serial)
631         s = s.values(cluster = cluster)
632         self.conn.execute(s).close()
633     
634     def version_remove(self, serial):
635         """Remove the serial specified."""
636         
637         props = self.version_get_properties(serial)
638         if not props:
639             return
640         node = props[NODE]
641         hash = props[HASH]
642         size = props[SIZE]
643         cluster = props[CLUSTER]
644         
645         mtime = time()
646         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
647         
648         s = self.versions.delete().where(self.versions.c.serial == serial)
649         self.conn.execute(s).close()
650         return hash
651     
652     def attribute_get(self, serial, domain, keys=()):
653         """Return a list of (key, value) pairs of the version specified by serial.
654            If keys is empty, return all attributes.
655            Othwerise, return only those specified.
656         """
657         
658         if keys:
659             attrs = self.attributes.alias()
660             s = select([attrs.c.key, attrs.c.value])
661             s = s.where(and_(attrs.c.key.in_(keys),
662                              attrs.c.serial == serial,
663                              attrs.c.domain == domain))
664         else:
665             attrs = self.attributes.alias()
666             s = select([attrs.c.key, attrs.c.value])
667             s = s.where(and_(attrs.c.serial == serial,
668                              attrs.c.domain == domain))
669         r = self.conn.execute(s)
670         l = r.fetchall()
671         r.close()
672         return l
673     
674     def attribute_set(self, serial, domain, items):
675         """Set the attributes of the version specified by serial.
676            Receive attributes as an iterable of (key, value) pairs.
677         """
678         #insert or replace
679         #TODO better upsert
680         for k, v in items:
681             s = self.attributes.update()
682             s = s.where(and_(self.attributes.c.serial == serial,
683                              self.attributes.c.domain == domain,
684                              self.attributes.c.key == k))
685             s = s.values(value = v)
686             rp = self.conn.execute(s)
687             rp.close()
688             if rp.rowcount == 0:
689                 s = self.attributes.insert()
690                 s = s.values(serial=serial, domain=domain, key=k, value=v)
691                 self.conn.execute(s).close()
692     
693     def attribute_del(self, serial, domain, keys=()):
694         """Delete attributes of the version specified by serial.
695            If keys is empty, delete all attributes.
696            Otherwise delete those specified.
697         """
698         
699         if keys:
700             #TODO more efficient way to do this?
701             for key in keys:
702                 s = self.attributes.delete()
703                 s = s.where(and_(self.attributes.c.serial == serial,
704                                  self.attributes.c.domain == domain,
705                                  self.attributes.c.key == key))
706                 self.conn.execute(s).close()
707         else:
708             s = self.attributes.delete()
709             s = s.where(and_(self.attributes.c.serial == serial,
710                              self.attributes.c.domain == domain))
711             self.conn.execute(s).close()
712     
713     def attribute_copy(self, source, dest):
714         s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
715             self.attributes.c.serial == source)
716         rp = self.conn.execute(s)
717         attributes = rp.fetchall()
718         rp.close()
719         for dest, domain, k, v in attributes:
720             #insert or replace
721             s = self.attributes.update().where(and_(
722                 self.attributes.c.serial == dest,
723                 self.attributes.c.domain == domain,
724                 self.attributes.c.key == k))
725             rp = self.conn.execute(s, value=v)
726             rp.close()
727             if rp.rowcount == 0:
728                 s = self.attributes.insert()
729                 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
730                 self.conn.execute(s, values).close()
731     
732     def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
733         """Return a list with all keys pairs defined
734            for all latest versions under parent that
735            do not belong to the cluster.
736         """
737         
738         # TODO: Use another table to store before=inf results.
739         a = self.attributes.alias('a')
740         v = self.versions.alias('v')
741         n = self.nodes.alias('n')
742         s = select([a.c.key]).distinct()
743         filtered = select([func.max(self.versions.c.serial)])
744         if before != inf:
745             filtered = filtered.where(self.versions.c.mtime < before)
746         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
747         s = s.where(v.c.cluster != except_cluster)
748         s = s.where(v.c.node.in_(select([self.nodes.c.node],
749             self.nodes.c.parent == parent)))
750         s = s.where(a.c.serial == v.c.serial)
751         s = s.where(a.c.domain == domain)
752         s = s.where(n.c.node == v.c.node)
753         conj = []
754         for x in pathq:
755             conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
756         if conj:
757             s = s.where(or_(*conj))
758         rp = self.conn.execute(s)
759         rows = rp.fetchall()
760         rp.close()
761         return [r[0] for r in rows]
762     
763     def latest_version_list(self, parent, prefix='', delimiter=None,
764                             start='', limit=10000, before=inf,
765                             except_cluster=0, pathq=[], domain=None, filterq=[]):
766         """Return a (list of (path, serial) tuples, list of common prefixes)
767            for the current versions of the paths with the given parent,
768            matching the following criteria.
769            
770            The property tuple for a version is returned if all
771            of these conditions are true:
772                 
773                 a. parent matches
774                 
775                 b. path > start
776                 
777                 c. path starts with prefix (and paths in pathq)
778                 
779                 d. version is the max up to before
780                 
781                 e. version is not in cluster
782                 
783                 f. the path does not have the delimiter occuring
784                    after the prefix, or ends with the delimiter
785                 
786                 g. serial matches the attribute filter query.
787                    
788                    A filter query is a comma-separated list of
789                    terms in one of these three forms:
790                    
791                    key
792                        an attribute with this key must exist
793                    
794                    !key
795                        an attribute with this key must not exist
796                    
797                    key ?op value
798                        the attribute with this key satisfies the value
799                        where ?op is one of ==, != <=, >=, <, >.
800            
801            The list of common prefixes includes the prefixes
802            matching up to the first delimiter after prefix,
803            and are reported only once, as "virtual directories".
804            The delimiter is included in the prefixes.
805            
806            If arguments are None, then the corresponding matching rule
807            will always match.
808            
809            Limit applies to the first list of tuples returned.
810         """
811         
812         if not start or start < prefix:
813             start = strprevling(prefix)
814         nextling = strnextling(prefix)
815         
816         a = self.attributes.alias('a')
817         v = self.versions.alias('v')
818         n = self.nodes.alias('n')
819         s = select([n.c.path, v.c.serial]).distinct()
820         filtered = select([func.max(self.versions.c.serial)])
821         if before != inf:
822             filtered = filtered.where(self.versions.c.mtime < before)
823         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
824         s = s.where(v.c.cluster != except_cluster)
825         s = s.where(v.c.node.in_(select([self.nodes.c.node],
826             self.nodes.c.parent == parent)))
827         if domain and filterq:
828             s = s.where(a.c.serial == v.c.serial)
829             s = s.where(a.c.domain == domain)
830         
831         s = s.where(n.c.node == v.c.node)
832         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
833         conj = []
834         for x in pathq:
835             conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
836         
837         if conj:
838             s = s.where(or_(*conj))
839         
840         if domain and filterq:
841             included, excluded, opers = parse_filters(filterq)
842             if included:
843                 s = s.where(a.c.key.in_(x for x in included))
844             if excluded:
845                 s = s.where(not_(a.c.key.in_(x for x in excluded)))
846             if opers:
847                 for k, o, v in opers:
848                     s = s.where(or_(a.c.key == k and a.c.value.op(o)(v)))
849         
850         s = s.order_by(n.c.path)
851         
852         if not delimiter:
853             s = s.limit(limit)
854             rp = self.conn.execute(s, start=start)
855             r = rp.fetchall()
856             rp.close()
857             return r, ()
858         
859         pfz = len(prefix)
860         dz = len(delimiter)
861         count = 0
862         prefixes = []
863         pappend = prefixes.append
864         matches = []
865         mappend = matches.append
866         
867         rp = self.conn.execute(s, start=start)
868         while True:
869             props = rp.fetchone()
870             if props is None:
871                 break
872             path, serial = props
873             idx = path.find(delimiter, pfz)
874             
875             if idx < 0:
876                 mappend(props)
877                 count += 1
878                 if count >= limit:
879                     break
880                 continue
881             
882             if idx + dz == len(path):
883                 mappend(props)
884                 count += 1
885                 continue # Get one more, in case there is a path.
886             pf = path[:idx + dz]
887             pappend(pf)
888             if count >= limit: 
889                 break
890             
891             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
892         rp.close()
893         
894         return matches, prefixes