sqlalchemy backend: fix metadata queries
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41
42 from dbworker import DBWorker
43
44 from pithos.lib.filter import parse_filters, OPERATORS
45
46 ROOTNODE  = 0
47
48 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
49
50 inf = float('inf')
51
52
53 def strnextling(prefix):
54     """Return the first unicode string
55        greater than but not starting with given prefix.
56        strnextling('hello') -> 'hellp'
57     """
58     if not prefix:
59         ## all strings start with the null string,
60         ## therefore we have to approximate strnextling('')
61         ## with the last unicode character supported by python
62         ## 0x10ffff for wide (32-bit unicode) python builds
63         ## 0x00ffff for narrow (16-bit unicode) python builds
64         ## We will not autodetect. 0xffff is safe enough.
65         return unichr(0xffff)
66     s = prefix[:-1]
67     c = ord(prefix[-1])
68     if c >= 0xffff:
69         raise RuntimeError
70     s += unichr(c+1)
71     return s
72
73 def strprevling(prefix):
74     """Return an approximation of the last unicode string
75        less than but not starting with given prefix.
76        strprevling(u'hello') -> u'helln\\xffff'
77     """
78     if not prefix:
79         ## There is no prevling for the null string
80         return prefix
81     s = prefix[:-1]
82     c = ord(prefix[-1])
83     if c > 0:
84         s += unichr(c-1) + unichr(0xffff)
85     return s
86
87 _propnames = {
88     'serial'    : 0,
89     'node'      : 1,
90     'hash'      : 2,
91     'size'      : 3,
92     'source'    : 4,
93     'mtime'     : 5,
94     'muser'     : 6,
95     'uuid'      : 7,
96     'cluster'   : 8
97 }
98
99
100 class Node(DBWorker):
101     """Nodes store path organization and have multiple versions.
102        Versions store object history and have multiple attributes.
103        Attributes store metadata.
104     """
105     
106     # TODO: Provide an interface for included and excluded clusters.
107     
108     def __init__(self, **params):
109         DBWorker.__init__(self, **params)
110         metadata = MetaData()
111         
112         #create nodes table
113         columns=[]
114         columns.append(Column('node', Integer, primary_key=True))
115         columns.append(Column('parent', Integer,
116                               ForeignKey('nodes.node',
117                                          ondelete='CASCADE',
118                                          onupdate='CASCADE'),
119                               autoincrement=False))
120         path_length = 2048
121         columns.append(Column('path', String(path_length), default='', nullable=False))
122         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123         
124         #create policy table
125         columns=[]
126         columns.append(Column('node', Integer,
127                               ForeignKey('nodes.node',
128                                          ondelete='CASCADE',
129                                          onupdate='CASCADE'),
130                               primary_key=True))
131         columns.append(Column('key', String(255), primary_key=True))
132         columns.append(Column('value', String(255)))
133         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
134         
135         #create statistics table
136         columns=[]
137         columns.append(Column('node', Integer,
138                               ForeignKey('nodes.node',
139                                          ondelete='CASCADE',
140                                          onupdate='CASCADE'),
141                               primary_key=True))
142         columns.append(Column('population', Integer, nullable=False, default=0))
143         columns.append(Column('size', BigInteger, nullable=False, default=0))
144         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
145         columns.append(Column('cluster', Integer, nullable=False, default=0,
146                               primary_key=True, autoincrement=False))
147         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
148         
149         #create versions table
150         columns=[]
151         columns.append(Column('serial', Integer, primary_key=True))
152         columns.append(Column('node', Integer,
153                               ForeignKey('nodes.node',
154                                          ondelete='CASCADE',
155                                          onupdate='CASCADE')))
156         columns.append(Column('hash', String(255)))
157         columns.append(Column('size', BigInteger, nullable=False, default=0))
158         columns.append(Column('source', Integer))
159         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
160         columns.append(Column('muser', String(255), nullable=False, default=''))
161         columns.append(Column('uuid', String(64), nullable=False, default=''))
162         columns.append(Column('cluster', Integer, nullable=False, default=0))
163         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
164         Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
165         Index('idx_versions_node_uuid', self.versions.c.uuid)
166         
167         #create attributes table
168         columns = []
169         columns.append(Column('serial', Integer,
170                               ForeignKey('versions.serial',
171                                          ondelete='CASCADE',
172                                          onupdate='CASCADE'),
173                               primary_key=True))
174         columns.append(Column('domain', String(255), primary_key=True))
175         columns.append(Column('key', String(255), primary_key=True))
176         columns.append(Column('value', String(255)))
177         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
178         
179         metadata.create_all(self.engine)
180         
181         # the following code creates an index of specific length
182         # this can be accompliced in sqlalchemy >= 0.7.3
183         # providing mysql_length option during index creation
184         insp = Inspector.from_engine(self.engine)
185         indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
186         if 'idx_nodes_path' not in indexes:
187             explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
188             s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
189             self.conn.execute(s).close()
190         
191         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
192                                            self.nodes.c.parent == ROOTNODE))
193         rp = self.conn.execute(s)
194         r = rp.fetchone()
195         rp.close()
196         if not r:
197             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
198             self.conn.execute(s)
199     
200     def node_create(self, parent, path):
201         """Create a new node from the given properties.
202            Return the node identifier of the new node.
203         """
204         #TODO catch IntegrityError?
205         s = self.nodes.insert().values(parent=parent, path=path)
206         r = self.conn.execute(s)
207         inserted_primary_key = r.inserted_primary_key[0]
208         r.close()
209         return inserted_primary_key
210     
211     def node_lookup(self, path):
212         """Lookup the current node of the given path.
213            Return None if the path is not found.
214         """
215         
216         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
217         s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
218         r = self.conn.execute(s)
219         row = r.fetchone()
220         r.close()
221         if row:
222             return row[0]
223         return None
224     
225     def node_get_properties(self, node):
226         """Return the node's (parent, path).
227            Return None if the node is not found.
228         """
229         
230         s = select([self.nodes.c.parent, self.nodes.c.path])
231         s = s.where(self.nodes.c.node == node)
232         r = self.conn.execute(s)
233         l = r.fetchone()
234         r.close()
235         return l
236     
237     def node_get_versions(self, node, keys=(), propnames=_propnames):
238         """Return the properties of all versions at node.
239            If keys is empty, return all properties in the order
240            (serial, node, hash, size, source, mtime, muser, uuid, cluster).
241         """
242         
243         s = select([self.versions.c.serial,
244                     self.versions.c.node,
245                     self.versions.c.hash,
246                     self.versions.c.size,
247                     self.versions.c.source,
248                     self.versions.c.mtime,
249                     self.versions.c.muser,
250                     self.versions.c.uuid,
251                     self.versions.c.cluster], self.versions.c.node == node)
252         s = s.order_by(self.versions.c.serial)
253         r = self.conn.execute(s)
254         rows = r.fetchall()
255         r.close()
256         if not rows:
257             return rows
258         
259         if not keys:
260             return rows
261         
262         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
263     
264     def node_count_children(self, node):
265         """Return node's child count."""
266         
267         s = select([func.count(self.nodes.c.node)])
268         s = s.where(and_(self.nodes.c.parent == node,
269                          self.nodes.c.node != ROOTNODE))
270         r = self.conn.execute(s)
271         row = r.fetchone()
272         r.close()
273         return row[0]
274     
275     def node_purge_children(self, parent, before=inf, cluster=0):
276         """Delete all versions with the specified
277            parent and cluster, and return
278            the hashes of versions deleted.
279            Clears out nodes with no remaining versions.
280         """
281         #update statistics
282         c1 = select([self.nodes.c.node],
283             self.nodes.c.parent == parent)
284         where_clause = and_(self.versions.c.node.in_(c1),
285                             self.versions.c.cluster == cluster)
286         s = select([func.count(self.versions.c.serial),
287                     func.sum(self.versions.c.size)])
288         s = s.where(where_clause)
289         if before != inf:
290             s = s.where(self.versions.c.mtime <= before)
291         r = self.conn.execute(s)
292         row = r.fetchone()
293         r.close()
294         if not row:
295             return ()
296         nr, size = row[0], -row[1] if row[1] else 0
297         mtime = time()
298         self.statistics_update(parent, -nr, size, mtime, cluster)
299         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
300         
301         s = select([self.versions.c.hash])
302         s = s.where(where_clause)
303         r = self.conn.execute(s)
304         hashes = [row[0] for row in r.fetchall()]
305         r.close()
306         
307         #delete versions
308         s = self.versions.delete().where(where_clause)
309         r = self.conn.execute(s)
310         r.close()
311         
312         #delete nodes
313         s = select([self.nodes.c.node],
314             and_(self.nodes.c.parent == parent,
315                  select([func.count(self.versions.c.serial)],
316                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
317         rp = self.conn.execute(s)
318         nodes = [r[0] for r in rp.fetchall()]
319         rp.close()
320         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
321         self.conn.execute(s).close()
322         
323         return hashes
324     
325     def node_purge(self, node, before=inf, cluster=0):
326         """Delete all versions with the specified
327            node and cluster, and return
328            the hashes of versions deleted.
329            Clears out the node if it has no remaining versions.
330         """
331         
332         #update statistics
333         s = select([func.count(self.versions.c.serial),
334                     func.sum(self.versions.c.size)])
335         where_clause = and_(self.versions.c.node == node,
336                          self.versions.c.cluster == cluster)
337         s = s.where(where_clause)
338         if before != inf:
339             s = s.where(self.versions.c.mtime <= before)
340         r = self.conn.execute(s)
341         row = r.fetchone()
342         nr, size = row[0], row[1]
343         r.close()
344         if not nr:
345             return ()
346         mtime = time()
347         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
348         
349         s = select([self.versions.c.hash])
350         s = s.where(where_clause)
351         r = self.conn.execute(s)
352         hashes = [r[0] for r in r.fetchall()]
353         r.close()
354         
355         #delete versions
356         s = self.versions.delete().where(where_clause)
357         r = self.conn.execute(s)
358         r.close()
359         
360         #delete nodes
361         s = select([self.nodes.c.node],
362             and_(self.nodes.c.node == node,
363                  select([func.count(self.versions.c.serial)],
364                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
365         r = self.conn.execute(s)
366         nodes = r.fetchall()
367         r.close()
368         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
369         self.conn.execute(s).close()
370         
371         return hashes
372     
373     def node_remove(self, node):
374         """Remove the node specified.
375            Return false if the node has children or is not found.
376         """
377         
378         if self.node_count_children(node):
379             return False
380         
381         mtime = time()
382         s = select([func.count(self.versions.c.serial),
383                     func.sum(self.versions.c.size),
384                     self.versions.c.cluster])
385         s = s.where(self.versions.c.node == node)
386         s = s.group_by(self.versions.c.cluster)
387         r = self.conn.execute(s)
388         for population, size, cluster in r.fetchall():
389             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
390         r.close()
391         
392         s = self.nodes.delete().where(self.nodes.c.node == node)
393         self.conn.execute(s).close()
394         return True
395     
396     def policy_get(self, node):
397         s = select([self.policies.c.key, self.policies.c.value],
398             self.policies.c.node==node)
399         r = self.conn.execute(s)
400         d = dict(r.fetchall())
401         r.close()
402         return d
403     
404     def policy_set(self, node, policy):
405         #insert or replace
406         for k, v in policy.iteritems():
407             s = self.policies.update().where(and_(self.policies.c.node == node,
408                                                   self.policies.c.key == k))
409             s = s.values(value = v)
410             rp = self.conn.execute(s)
411             rp.close()
412             if rp.rowcount == 0:
413                 s = self.policies.insert()
414                 values = {'node':node, 'key':k, 'value':v}
415                 r = self.conn.execute(s, values)
416                 r.close()
417     
418     def statistics_get(self, node, cluster=0):
419         """Return population, total size and last mtime
420            for all versions under node that belong to the cluster.
421         """
422         
423         s = select([self.statistics.c.population,
424                     self.statistics.c.size,
425                     self.statistics.c.mtime])
426         s = s.where(and_(self.statistics.c.node == node,
427                          self.statistics.c.cluster == cluster))
428         r = self.conn.execute(s)
429         row = r.fetchone()
430         r.close()
431         return row
432     
433     def statistics_update(self, node, population, size, mtime, cluster=0):
434         """Update the statistics of the given node.
435            Statistics keep track the population, total
436            size of objects and mtime in the node's namespace.
437            May be zero or positive or negative numbers.
438         """
439         s = select([self.statistics.c.population, self.statistics.c.size],
440             and_(self.statistics.c.node == node,
441                  self.statistics.c.cluster == cluster))
442         rp = self.conn.execute(s)
443         r = rp.fetchone()
444         rp.close()
445         if not r:
446             prepopulation, presize = (0, 0)
447         else:
448             prepopulation, presize = r
449         population += prepopulation
450         size += presize
451         
452         #insert or replace
453         #TODO better upsert
454         u = self.statistics.update().where(and_(self.statistics.c.node==node,
455                                            self.statistics.c.cluster==cluster))
456         u = u.values(population=population, size=size, mtime=mtime)
457         rp = self.conn.execute(u)
458         rp.close()
459         if rp.rowcount == 0:
460             ins = self.statistics.insert()
461             ins = ins.values(node=node, population=population, size=size,
462                              mtime=mtime, cluster=cluster)
463             self.conn.execute(ins).close()
464     
465     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
466         """Update the statistics of the given node's parent.
467            Then recursively update all parents up to the root.
468            Population is not recursive.
469         """
470         
471         while True:
472             if node == ROOTNODE:
473                 break
474             props = self.node_get_properties(node)
475             if props is None:
476                 break
477             parent, path = props
478             self.statistics_update(parent, population, size, mtime, cluster)
479             node = parent
480             population = 0 # Population isn't recursive
481     
482     def statistics_latest(self, node, before=inf, except_cluster=0):
483         """Return population, total size and last mtime
484            for all latest versions under node that
485            do not belong to the cluster.
486         """
487         
488         # The node.
489         props = self.node_get_properties(node)
490         if props is None:
491             return None
492         parent, path = props
493         
494         # The latest version.
495         s = select([self.versions.c.serial,
496                     self.versions.c.node,
497                     self.versions.c.hash,
498                     self.versions.c.size,
499                     self.versions.c.source,
500                     self.versions.c.mtime,
501                     self.versions.c.muser,
502                     self.versions.c.uuid,
503                     self.versions.c.cluster])
504         filtered = select([func.max(self.versions.c.serial)],
505                             self.versions.c.node == node)
506         if before != inf:
507             filtered = filtered.where(self.versions.c.mtime < before)
508         s = s.where(and_(self.versions.c.cluster != except_cluster,
509                          self.versions.c.serial == filtered))
510         r = self.conn.execute(s)
511         props = r.fetchone()
512         r.close()
513         if not props:
514             return None
515         mtime = props[MTIME]
516         
517         # First level, just under node (get population).
518         v = self.versions.alias('v')
519         s = select([func.count(v.c.serial),
520                     func.sum(v.c.size),
521                     func.max(v.c.mtime)])
522         c1 = select([func.max(self.versions.c.serial)])
523         if before != inf:
524             c1 = c1.where(self.versions.c.mtime < before)
525         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
526         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
527                          v.c.cluster != except_cluster,
528                          v.c.node.in_(c2)))
529         rp = self.conn.execute(s)
530         r = rp.fetchone()
531         rp.close()
532         if not r:
533             return None
534         count = r[0]
535         mtime = max(mtime, r[2])
536         if count == 0:
537             return (0, 0, mtime)
538         
539         # All children (get size and mtime).
540         # XXX: This is why the full path is stored.
541         s = select([func.count(v.c.serial),
542                     func.sum(v.c.size),
543                     func.max(v.c.mtime)])
544         c1 = select([func.max(self.versions.c.serial)],
545             self.versions.c.node == v.c.node)
546         if before != inf:
547             c1 = c1.where(self.versions.c.mtime < before)
548         c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
549         s = s.where(and_(v.c.serial == c1,
550                          v.c.cluster != except_cluster,
551                          v.c.node.in_(c2)))
552         rp = self.conn.execute(s)
553         r = rp.fetchone()
554         rp.close()
555         if not r:
556             return None
557         size = r[1] - props[SIZE]
558         mtime = max(mtime, r[2])
559         return (count, size, mtime)
560     
561     def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
562         """Create a new version from the given properties.
563            Return the (serial, mtime) of the new version.
564         """
565         
566         mtime = time()
567         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
568                                           mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
569         serial = self.conn.execute(s).inserted_primary_key[0]
570         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
571         return serial, mtime
572     
573     def version_lookup(self, node, before=inf, cluster=0):
574         """Lookup the current version of the given node.
575            Return a list with its properties:
576            (serial, node, hash, size, source, mtime, muser, uuid, cluster)
577            or None if the current version is not found in the given cluster.
578         """
579         
580         v = self.versions.alias('v')
581         s = select([v.c.serial, v.c.node, v.c.hash,
582                     v.c.size, v.c.source, v.c.mtime,
583                     v.c.muser, v.c.uuid, v.c.cluster])
584         c = select([func.max(self.versions.c.serial)],
585             self.versions.c.node == node)
586         if before != inf:
587             c = c.where(self.versions.c.mtime < before)
588         s = s.where(and_(v.c.serial == c,
589                          v.c.cluster == cluster))
590         r = self.conn.execute(s)
591         props = r.fetchone()
592         r.close()
593         if props:
594             return props
595         return None
596     
597     def version_get_properties(self, serial, keys=(), propnames=_propnames):
598         """Return a sequence of values for the properties of
599            the version specified by serial and the keys, in the order given.
600            If keys is empty, return all properties in the order
601            (serial, node, hash, size, source, mtime, muser, uuid, cluster).
602         """
603         
604         v = self.versions.alias()
605         s = select([v.c.serial, v.c.node, v.c.hash,
606                     v.c.size, v.c.source, v.c.mtime,
607                     v.c.muser, v.c.uuid, v.c.cluster], v.c.serial == serial)
608         rp = self.conn.execute(s)
609         r = rp.fetchone()
610         rp.close()
611         if r is None:
612             return r
613         
614         if not keys:
615             return r
616         return [r[propnames[k]] for k in keys if k in propnames]
617     
618     def version_recluster(self, serial, cluster):
619         """Move the version into another cluster."""
620         
621         props = self.version_get_properties(serial)
622         if not props:
623             return
624         node = props[NODE]
625         size = props[SIZE]
626         oldcluster = props[CLUSTER]
627         if cluster == oldcluster:
628             return
629         
630         mtime = time()
631         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
632         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
633         
634         s = self.versions.update()
635         s = s.where(self.versions.c.serial == serial)
636         s = s.values(cluster = cluster)
637         self.conn.execute(s).close()
638     
639     def version_remove(self, serial):
640         """Remove the serial specified."""
641         
642         props = self.version_get_properties(serial)
643         if not props:
644             return
645         node = props[NODE]
646         hash = props[HASH]
647         size = props[SIZE]
648         cluster = props[CLUSTER]
649         
650         mtime = time()
651         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
652         
653         s = self.versions.delete().where(self.versions.c.serial == serial)
654         self.conn.execute(s).close()
655         return hash
656     
657     def attribute_get(self, serial, domain, keys=()):
658         """Return a list of (key, value) pairs of the version specified by serial.
659            If keys is empty, return all attributes.
660            Othwerise, return only those specified.
661         """
662         
663         if keys:
664             attrs = self.attributes.alias()
665             s = select([attrs.c.key, attrs.c.value])
666             s = s.where(and_(attrs.c.key.in_(keys),
667                              attrs.c.serial == serial,
668                              attrs.c.domain == domain))
669         else:
670             attrs = self.attributes.alias()
671             s = select([attrs.c.key, attrs.c.value])
672             s = s.where(and_(attrs.c.serial == serial,
673                              attrs.c.domain == domain))
674         r = self.conn.execute(s)
675         l = r.fetchall()
676         r.close()
677         return l
678     
679     def attribute_set(self, serial, domain, items):
680         """Set the attributes of the version specified by serial.
681            Receive attributes as an iterable of (key, value) pairs.
682         """
683         #insert or replace
684         #TODO better upsert
685         for k, v in items:
686             s = self.attributes.update()
687             s = s.where(and_(self.attributes.c.serial == serial,
688                              self.attributes.c.domain == domain,
689                              self.attributes.c.key == k))
690             s = s.values(value = v)
691             rp = self.conn.execute(s)
692             rp.close()
693             if rp.rowcount == 0:
694                 s = self.attributes.insert()
695                 s = s.values(serial=serial, domain=domain, key=k, value=v)
696                 self.conn.execute(s).close()
697     
698     def attribute_del(self, serial, domain, keys=()):
699         """Delete attributes of the version specified by serial.
700            If keys is empty, delete all attributes.
701            Otherwise delete those specified.
702         """
703         
704         if keys:
705             #TODO more efficient way to do this?
706             for key in keys:
707                 s = self.attributes.delete()
708                 s = s.where(and_(self.attributes.c.serial == serial,
709                                  self.attributes.c.domain == domain,
710                                  self.attributes.c.key == key))
711                 self.conn.execute(s).close()
712         else:
713             s = self.attributes.delete()
714             s = s.where(and_(self.attributes.c.serial == serial,
715                              self.attributes.c.domain == domain))
716             self.conn.execute(s).close()
717     
718     def attribute_copy(self, source, dest):
719         s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
720             self.attributes.c.serial == source)
721         rp = self.conn.execute(s)
722         attributes = rp.fetchall()
723         rp.close()
724         for dest, domain, k, v in attributes:
725             #insert or replace
726             s = self.attributes.update().where(and_(
727                 self.attributes.c.serial == dest,
728                 self.attributes.c.domain == domain,
729                 self.attributes.c.key == k))
730             rp = self.conn.execute(s, value=v)
731             rp.close()
732             if rp.rowcount == 0:
733                 s = self.attributes.insert()
734                 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
735                 self.conn.execute(s, values).close()
736     
737     def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
738         """Return a list with all keys pairs defined
739            for all latest versions under parent that
740            do not belong to the cluster.
741         """
742         
743         # TODO: Use another table to store before=inf results.
744         a = self.attributes.alias('a')
745         v = self.versions.alias('v')
746         n = self.nodes.alias('n')
747         s = select([a.c.key]).distinct()
748         filtered = select([func.max(self.versions.c.serial)])
749         if before != inf:
750             filtered = filtered.where(self.versions.c.mtime < before)
751         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
752         s = s.where(v.c.cluster != except_cluster)
753         s = s.where(v.c.node.in_(select([self.nodes.c.node],
754             self.nodes.c.parent == parent)))
755         s = s.where(a.c.serial == v.c.serial)
756         s = s.where(a.c.domain == domain)
757         s = s.where(n.c.node == v.c.node)
758         conj = []
759         for x in pathq:
760             conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
761         if conj:
762             s = s.where(or_(*conj))
763         rp = self.conn.execute(s)
764         rows = rp.fetchall()
765         rp.close()
766         return [r[0] for r in rows]
767     
768     def latest_version_list(self, parent, prefix='', delimiter=None,
769                             start='', limit=10000, before=inf,
770                             except_cluster=0, pathq=[], domain=None, filterq=[]):
771         """Return a (list of (path, serial) tuples, list of common prefixes)
772            for the current versions of the paths with the given parent,
773            matching the following criteria.
774            
775            The property tuple for a version is returned if all
776            of these conditions are true:
777                 
778                 a. parent matches
779                 
780                 b. path > start
781                 
782                 c. path starts with prefix (and paths in pathq)
783                 
784                 d. version is the max up to before
785                 
786                 e. version is not in cluster
787                 
788                 f. the path does not have the delimiter occuring
789                    after the prefix, or ends with the delimiter
790                 
791                 g. serial matches the attribute filter query.
792                    
793                    A filter query is a comma-separated list of
794                    terms in one of these three forms:
795                    
796                    key
797                        an attribute with this key must exist
798                    
799                    !key
800                        an attribute with this key must not exist
801                    
802                    key ?op value
803                        the attribute with this key satisfies the value
804                        where ?op is one of ==, != <=, >=, <, >.
805            
806            The list of common prefixes includes the prefixes
807            matching up to the first delimiter after prefix,
808            and are reported only once, as "virtual directories".
809            The delimiter is included in the prefixes.
810            
811            If arguments are None, then the corresponding matching rule
812            will always match.
813            
814            Limit applies to the first list of tuples returned.
815         """
816         
817         if not start or start < prefix:
818             start = strprevling(prefix)
819         nextling = strnextling(prefix)
820         
821         a = self.attributes.alias('a')
822         v = self.versions.alias('v')
823         n = self.nodes.alias('n')
824         s = select([n.c.path, v.c.serial]).distinct()
825         filtered = select([func.max(self.versions.c.serial)])
826         if before != inf:
827             filtered = filtered.where(self.versions.c.mtime < before)
828         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
829         s = s.where(v.c.cluster != except_cluster)
830         s = s.where(v.c.node.in_(select([self.nodes.c.node],
831             self.nodes.c.parent == parent)))
832         
833         s = s.where(n.c.node == v.c.node)
834         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
835         conj = []
836         for x in pathq:
837             conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
838         
839         if conj:
840             s = s.where(or_(*conj))
841         
842         s = s.order_by(n.c.path)
843         
844         def filterout(r):
845             if not filterq:
846                 return False
847             path, serial = r
848             included, excluded, opers = parse_filters(filterq)
849             
850             #retrieve metadata
851             s = select([a.c.key, a.c.value])
852             s = s.where(a.c.domain == domain)
853             s = s.where(a.c.serial == serial)
854             rp = self.conn.execute(s)
855             meta = dict(rp.fetchall())
856             keyset= set([k.encode('utf8') for k in meta.keys()])
857             
858             if included:
859                 if not set(included) & keyset:
860                     return True
861             if excluded:
862                 if set(excluded) & keyset:
863                     return True
864             for k, op, v in opers:
865                 k = k.decode('utf8')
866                 v = v.decode('utf8')
867                 if k not in meta:
868                     return True
869                 operation = OPERATORS[op]
870                 if not operation(meta[k], v):
871                     return True
872             return False
873     
874         if not delimiter:
875             s = s.limit(limit)
876             rp = self.conn.execute(s, start=start)
877             filter_ = lambda r : [t for t in r if not filterout(t)]
878             r = filter_(rp.fetchall())
879             rp.close()
880             return r, ()
881         
882         pfz = len(prefix)
883         dz = len(delimiter)
884         count = 0
885         prefixes = []
886         pappend = prefixes.append
887         matches = []
888         mappend = matches.append
889         
890         rp = self.conn.execute(s, start=start)
891         while True:
892             props = rp.fetchone()
893             if props is None:
894                 break
895             if filterout(props):
896                 continue
897             path, serial = props
898             idx = path.find(delimiter, pfz)
899             
900             if idx < 0:
901                 mappend(props)
902                 count += 1
903                 if count >= limit:
904                     break
905                 continue
906             
907             if idx + dz == len(path):
908                 mappend(props)
909                 count += 1
910                 continue # Get one more, in case there is a path.
911             pf = path[:idx + dz]
912             pappend(pf)
913             if count >= limit: 
914                 break
915             
916             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
917         rp.close()
918         
919         return matches, prefixes
920     
921     def latest_uuid(self, uuid):
922         """Return a (path, serial) tuple, for the latest version of the given uuid."""
923         
924         v = self.versions.alias('v')
925         n = self.nodes.alias('n')
926         s = select([n.c.path, v.c.serial])
927         filtered = select([func.max(self.versions.c.serial)])
928         s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
929         s = s.where(n.c.node == v.c.node)
930         
931         r = self.conn.execute(s)
932         l = r.fetchone()
933         r.close()
934         return l