Add size queries in backend object lists.
[pithos] / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41
42 from dbworker import DBWorker
43
44 from pithos.lib.filter import parse_filters
45
46
47 ROOTNODE  = 0
48
49 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
50
51 inf = float('inf')
52
53
54 def strnextling(prefix):
55     """Return the first unicode string
56        greater than but not starting with given prefix.
57        strnextling('hello') -> 'hellp'
58     """
59     if not prefix:
60         ## all strings start with the null string,
61         ## therefore we have to approximate strnextling('')
62         ## with the last unicode character supported by python
63         ## 0x10ffff for wide (32-bit unicode) python builds
64         ## 0x00ffff for narrow (16-bit unicode) python builds
65         ## We will not autodetect. 0xffff is safe enough.
66         return unichr(0xffff)
67     s = prefix[:-1]
68     c = ord(prefix[-1])
69     if c >= 0xffff:
70         raise RuntimeError
71     s += unichr(c+1)
72     return s
73
74 def strprevling(prefix):
75     """Return an approximation of the last unicode string
76        less than but not starting with given prefix.
77        strprevling(u'hello') -> u'helln\\xffff'
78     """
79     if not prefix:
80         ## There is no prevling for the null string
81         return prefix
82     s = prefix[:-1]
83     c = ord(prefix[-1])
84     if c > 0:
85         s += unichr(c-1) + unichr(0xffff)
86     return s
87
88 _propnames = {
89     'serial'    : 0,
90     'node'      : 1,
91     'hash'      : 2,
92     'size'      : 3,
93     'source'    : 4,
94     'mtime'     : 5,
95     'muser'     : 6,
96     'uuid'      : 7,
97     'cluster'   : 8
98 }
99
100
101 class Node(DBWorker):
102     """Nodes store path organization and have multiple versions.
103        Versions store object history and have multiple attributes.
104        Attributes store metadata.
105     """
106     
107     # TODO: Provide an interface for included and excluded clusters.
108     
109     def __init__(self, **params):
110         DBWorker.__init__(self, **params)
111         metadata = MetaData()
112         
113         #create nodes table
114         columns=[]
115         columns.append(Column('node', Integer, primary_key=True))
116         columns.append(Column('parent', Integer,
117                               ForeignKey('nodes.node',
118                                          ondelete='CASCADE',
119                                          onupdate='CASCADE'),
120                               autoincrement=False))
121         path_length = 2048
122         columns.append(Column('path', String(path_length), default='', nullable=False))
123         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
124         
125         #create policy table
126         columns=[]
127         columns.append(Column('node', Integer,
128                               ForeignKey('nodes.node',
129                                          ondelete='CASCADE',
130                                          onupdate='CASCADE'),
131                               primary_key=True))
132         columns.append(Column('key', String(255), primary_key=True))
133         columns.append(Column('value', String(255)))
134         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
135         
136         #create statistics table
137         columns=[]
138         columns.append(Column('node', Integer,
139                               ForeignKey('nodes.node',
140                                          ondelete='CASCADE',
141                                          onupdate='CASCADE'),
142                               primary_key=True))
143         columns.append(Column('population', Integer, nullable=False, default=0))
144         columns.append(Column('size', BigInteger, nullable=False, default=0))
145         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
146         columns.append(Column('cluster', Integer, nullable=False, default=0,
147                               primary_key=True, autoincrement=False))
148         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
149         
150         #create versions table
151         columns=[]
152         columns.append(Column('serial', Integer, primary_key=True))
153         columns.append(Column('node', Integer,
154                               ForeignKey('nodes.node',
155                                          ondelete='CASCADE',
156                                          onupdate='CASCADE')))
157         columns.append(Column('hash', String(255)))
158         columns.append(Column('size', BigInteger, nullable=False, default=0))
159         columns.append(Column('source', Integer))
160         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
161         columns.append(Column('muser', String(255), nullable=False, default=''))
162         columns.append(Column('uuid', String(64), nullable=False, default=''))
163         columns.append(Column('cluster', Integer, nullable=False, default=0))
164         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
165         Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
166         Index('idx_versions_node_uuid', self.versions.c.uuid)
167         
168         #create attributes table
169         columns = []
170         columns.append(Column('serial', Integer,
171                               ForeignKey('versions.serial',
172                                          ondelete='CASCADE',
173                                          onupdate='CASCADE'),
174                               primary_key=True))
175         columns.append(Column('domain', String(255), primary_key=True))
176         columns.append(Column('key', String(255), primary_key=True))
177         columns.append(Column('value', String(255)))
178         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
179         
180         metadata.create_all(self.engine)
181         
182         # the following code creates an index of specific length
183         # this can be accompliced in sqlalchemy >= 0.7.3
184         # providing mysql_length option during index creation
185         insp = Inspector.from_engine(self.engine)
186         indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
187         if 'idx_nodes_path' not in indexes:
188             explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
189             s = text('CREATE UNIQUE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
190             self.conn.execute(s).close()
191         
192         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
193                                            self.nodes.c.parent == ROOTNODE))
194         rp = self.conn.execute(s)
195         r = rp.fetchone()
196         rp.close()
197         if not r:
198             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
199             self.conn.execute(s)
200     
201     def node_create(self, parent, path):
202         """Create a new node from the given properties.
203            Return the node identifier of the new node.
204         """
205         #TODO catch IntegrityError?
206         s = self.nodes.insert().values(parent=parent, path=path)
207         r = self.conn.execute(s)
208         inserted_primary_key = r.inserted_primary_key[0]
209         r.close()
210         return inserted_primary_key
211     
212     def node_lookup(self, path):
213         """Lookup the current node of the given path.
214            Return None if the path is not found.
215         """
216         
217         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
218         s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
219         r = self.conn.execute(s)
220         row = r.fetchone()
221         r.close()
222         if row:
223             return row[0]
224         return None
225     
226     def node_get_properties(self, node):
227         """Return the node's (parent, path).
228            Return None if the node is not found.
229         """
230         
231         s = select([self.nodes.c.parent, self.nodes.c.path])
232         s = s.where(self.nodes.c.node == node)
233         r = self.conn.execute(s)
234         l = r.fetchone()
235         r.close()
236         return l
237     
238     def node_get_versions(self, node, keys=(), propnames=_propnames):
239         """Return the properties of all versions at node.
240            If keys is empty, return all properties in the order
241            (serial, node, hash, size, source, mtime, muser, uuid, cluster).
242         """
243         
244         s = select([self.versions.c.serial,
245                     self.versions.c.node,
246                     self.versions.c.hash,
247                     self.versions.c.size,
248                     self.versions.c.source,
249                     self.versions.c.mtime,
250                     self.versions.c.muser,
251                     self.versions.c.uuid,
252                     self.versions.c.cluster], self.versions.c.node == node)
253         s = s.order_by(self.versions.c.serial)
254         r = self.conn.execute(s)
255         rows = r.fetchall()
256         r.close()
257         if not rows:
258             return rows
259         
260         if not keys:
261             return rows
262         
263         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
264     
265     def node_count_children(self, node):
266         """Return node's child count."""
267         
268         s = select([func.count(self.nodes.c.node)])
269         s = s.where(and_(self.nodes.c.parent == node,
270                          self.nodes.c.node != ROOTNODE))
271         r = self.conn.execute(s)
272         row = r.fetchone()
273         r.close()
274         return row[0]
275     
276     def node_purge_children(self, parent, before=inf, cluster=0):
277         """Delete all versions with the specified
278            parent and cluster, and return
279            the hashes of versions deleted.
280            Clears out nodes with no remaining versions.
281         """
282         #update statistics
283         c1 = select([self.nodes.c.node],
284             self.nodes.c.parent == parent)
285         where_clause = and_(self.versions.c.node.in_(c1),
286                             self.versions.c.cluster == cluster)
287         s = select([func.count(self.versions.c.serial),
288                     func.sum(self.versions.c.size)])
289         s = s.where(where_clause)
290         if before != inf:
291             s = s.where(self.versions.c.mtime <= before)
292         r = self.conn.execute(s)
293         row = r.fetchone()
294         r.close()
295         if not row:
296             return ()
297         nr, size = row[0], -row[1] if row[1] else 0
298         mtime = time()
299         self.statistics_update(parent, -nr, size, mtime, cluster)
300         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
301         
302         s = select([self.versions.c.hash])
303         s = s.where(where_clause)
304         r = self.conn.execute(s)
305         hashes = [row[0] for row in r.fetchall()]
306         r.close()
307         
308         #delete versions
309         s = self.versions.delete().where(where_clause)
310         r = self.conn.execute(s)
311         r.close()
312         
313         #delete nodes
314         s = select([self.nodes.c.node],
315             and_(self.nodes.c.parent == parent,
316                  select([func.count(self.versions.c.serial)],
317                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
318         rp = self.conn.execute(s)
319         nodes = [r[0] for r in rp.fetchall()]
320         rp.close()
321         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
322         self.conn.execute(s).close()
323         
324         return hashes
325     
326     def node_purge(self, node, before=inf, cluster=0):
327         """Delete all versions with the specified
328            node and cluster, and return
329            the hashes of versions deleted.
330            Clears out the node if it has no remaining versions.
331         """
332         
333         #update statistics
334         s = select([func.count(self.versions.c.serial),
335                     func.sum(self.versions.c.size)])
336         where_clause = and_(self.versions.c.node == node,
337                          self.versions.c.cluster == cluster)
338         s = s.where(where_clause)
339         if before != inf:
340             s = s.where(self.versions.c.mtime <= before)
341         r = self.conn.execute(s)
342         row = r.fetchone()
343         nr, size = row[0], row[1]
344         r.close()
345         if not nr:
346             return ()
347         mtime = time()
348         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
349         
350         s = select([self.versions.c.hash])
351         s = s.where(where_clause)
352         r = self.conn.execute(s)
353         hashes = [r[0] for r in r.fetchall()]
354         r.close()
355         
356         #delete versions
357         s = self.versions.delete().where(where_clause)
358         r = self.conn.execute(s)
359         r.close()
360         
361         #delete nodes
362         s = select([self.nodes.c.node],
363             and_(self.nodes.c.node == node,
364                  select([func.count(self.versions.c.serial)],
365                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
366         r = self.conn.execute(s)
367         nodes = r.fetchall()
368         r.close()
369         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
370         self.conn.execute(s).close()
371         
372         return hashes
373     
374     def node_remove(self, node):
375         """Remove the node specified.
376            Return false if the node has children or is not found.
377         """
378         
379         if self.node_count_children(node):
380             return False
381         
382         mtime = time()
383         s = select([func.count(self.versions.c.serial),
384                     func.sum(self.versions.c.size),
385                     self.versions.c.cluster])
386         s = s.where(self.versions.c.node == node)
387         s = s.group_by(self.versions.c.cluster)
388         r = self.conn.execute(s)
389         for population, size, cluster in r.fetchall():
390             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
391         r.close()
392         
393         s = self.nodes.delete().where(self.nodes.c.node == node)
394         self.conn.execute(s).close()
395         return True
396     
397     def policy_get(self, node):
398         s = select([self.policies.c.key, self.policies.c.value],
399             self.policies.c.node==node)
400         r = self.conn.execute(s)
401         d = dict(r.fetchall())
402         r.close()
403         return d
404     
405     def policy_set(self, node, policy):
406         #insert or replace
407         for k, v in policy.iteritems():
408             s = self.policies.update().where(and_(self.policies.c.node == node,
409                                                   self.policies.c.key == k))
410             s = s.values(value = v)
411             rp = self.conn.execute(s)
412             rp.close()
413             if rp.rowcount == 0:
414                 s = self.policies.insert()
415                 values = {'node':node, 'key':k, 'value':v}
416                 r = self.conn.execute(s, values)
417                 r.close()
418     
419     def statistics_get(self, node, cluster=0):
420         """Return population, total size and last mtime
421            for all versions under node that belong to the cluster.
422         """
423         
424         s = select([self.statistics.c.population,
425                     self.statistics.c.size,
426                     self.statistics.c.mtime])
427         s = s.where(and_(self.statistics.c.node == node,
428                          self.statistics.c.cluster == cluster))
429         r = self.conn.execute(s)
430         row = r.fetchone()
431         r.close()
432         return row
433     
434     def statistics_update(self, node, population, size, mtime, cluster=0):
435         """Update the statistics of the given node.
436            Statistics keep track the population, total
437            size of objects and mtime in the node's namespace.
438            May be zero or positive or negative numbers.
439         """
440         s = select([self.statistics.c.population, self.statistics.c.size],
441             and_(self.statistics.c.node == node,
442                  self.statistics.c.cluster == cluster))
443         rp = self.conn.execute(s)
444         r = rp.fetchone()
445         rp.close()
446         if not r:
447             prepopulation, presize = (0, 0)
448         else:
449             prepopulation, presize = r
450         population += prepopulation
451         size += presize
452         
453         #insert or replace
454         #TODO better upsert
455         u = self.statistics.update().where(and_(self.statistics.c.node==node,
456                                            self.statistics.c.cluster==cluster))
457         u = u.values(population=population, size=size, mtime=mtime)
458         rp = self.conn.execute(u)
459         rp.close()
460         if rp.rowcount == 0:
461             ins = self.statistics.insert()
462             ins = ins.values(node=node, population=population, size=size,
463                              mtime=mtime, cluster=cluster)
464             self.conn.execute(ins).close()
465     
466     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
467         """Update the statistics of the given node's parent.
468            Then recursively update all parents up to the root.
469            Population is not recursive.
470         """
471         
472         while True:
473             if node == ROOTNODE:
474                 break
475             props = self.node_get_properties(node)
476             if props is None:
477                 break
478             parent, path = props
479             self.statistics_update(parent, population, size, mtime, cluster)
480             node = parent
481             population = 0 # Population isn't recursive
482     
483     def statistics_latest(self, node, before=inf, except_cluster=0):
484         """Return population, total size and last mtime
485            for all latest versions under node that
486            do not belong to the cluster.
487         """
488         
489         # The node.
490         props = self.node_get_properties(node)
491         if props is None:
492             return None
493         parent, path = props
494         
495         # The latest version.
496         s = select([self.versions.c.serial,
497                     self.versions.c.node,
498                     self.versions.c.hash,
499                     self.versions.c.size,
500                     self.versions.c.source,
501                     self.versions.c.mtime,
502                     self.versions.c.muser,
503                     self.versions.c.uuid,
504                     self.versions.c.cluster])
505         filtered = select([func.max(self.versions.c.serial)],
506                             self.versions.c.node == node)
507         if before != inf:
508             filtered = filtered.where(self.versions.c.mtime < before)
509         s = s.where(and_(self.versions.c.cluster != except_cluster,
510                          self.versions.c.serial == filtered))
511         r = self.conn.execute(s)
512         props = r.fetchone()
513         r.close()
514         if not props:
515             return None
516         mtime = props[MTIME]
517         
518         # First level, just under node (get population).
519         v = self.versions.alias('v')
520         s = select([func.count(v.c.serial),
521                     func.sum(v.c.size),
522                     func.max(v.c.mtime)])
523         c1 = select([func.max(self.versions.c.serial)])
524         if before != inf:
525             c1 = c1.where(self.versions.c.mtime < before)
526         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
527         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
528                          v.c.cluster != except_cluster,
529                          v.c.node.in_(c2)))
530         rp = self.conn.execute(s)
531         r = rp.fetchone()
532         rp.close()
533         if not r:
534             return None
535         count = r[0]
536         mtime = max(mtime, r[2])
537         if count == 0:
538             return (0, 0, mtime)
539         
540         # All children (get size and mtime).
541         # XXX: This is why the full path is stored.
542         s = select([func.count(v.c.serial),
543                     func.sum(v.c.size),
544                     func.max(v.c.mtime)])
545         c1 = select([func.max(self.versions.c.serial)],
546             self.versions.c.node == v.c.node)
547         if before != inf:
548             c1 = c1.where(self.versions.c.mtime < before)
549         c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
550         s = s.where(and_(v.c.serial == c1,
551                          v.c.cluster != except_cluster,
552                          v.c.node.in_(c2)))
553         rp = self.conn.execute(s)
554         r = rp.fetchone()
555         rp.close()
556         if not r:
557             return None
558         size = r[1] - props[SIZE]
559         mtime = max(mtime, r[2])
560         return (count, size, mtime)
561     
562     def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
563         """Create a new version from the given properties.
564            Return the (serial, mtime) of the new version.
565         """
566         
567         mtime = time()
568         s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
569                                           mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
570         serial = self.conn.execute(s).inserted_primary_key[0]
571         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
572         return serial, mtime
573     
574     def version_lookup(self, node, before=inf, cluster=0):
575         """Lookup the current version of the given node.
576            Return a list with its properties:
577            (serial, node, hash, size, source, mtime, muser, uuid, cluster)
578            or None if the current version is not found in the given cluster.
579         """
580         
581         v = self.versions.alias('v')
582         s = select([v.c.serial, v.c.node, v.c.hash,
583                     v.c.size, v.c.source, v.c.mtime,
584                     v.c.muser, v.c.uuid, v.c.cluster])
585         c = select([func.max(self.versions.c.serial)],
586             self.versions.c.node == node)
587         if before != inf:
588             c = c.where(self.versions.c.mtime < before)
589         s = s.where(and_(v.c.serial == c,
590                          v.c.cluster == cluster))
591         r = self.conn.execute(s)
592         props = r.fetchone()
593         r.close()
594         if props:
595             return props
596         return None
597     
598     def version_get_properties(self, serial, keys=(), propnames=_propnames):
599         """Return a sequence of values for the properties of
600            the version specified by serial and the keys, in the order given.
601            If keys is empty, return all properties in the order
602            (serial, node, hash, size, source, mtime, muser, uuid, cluster).
603         """
604         
605         v = self.versions.alias()
606         s = select([v.c.serial, v.c.node, v.c.hash,
607                     v.c.size, v.c.source, v.c.mtime,
608                     v.c.muser, v.c.uuid, v.c.cluster], v.c.serial == serial)
609         rp = self.conn.execute(s)
610         r = rp.fetchone()
611         rp.close()
612         if r is None:
613             return r
614         
615         if not keys:
616             return r
617         return [r[propnames[k]] for k in keys if k in propnames]
618     
619     def version_recluster(self, serial, cluster):
620         """Move the version into another cluster."""
621         
622         props = self.version_get_properties(serial)
623         if not props:
624             return
625         node = props[NODE]
626         size = props[SIZE]
627         oldcluster = props[CLUSTER]
628         if cluster == oldcluster:
629             return
630         
631         mtime = time()
632         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
633         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
634         
635         s = self.versions.update()
636         s = s.where(self.versions.c.serial == serial)
637         s = s.values(cluster = cluster)
638         self.conn.execute(s).close()
639     
640     def version_remove(self, serial):
641         """Remove the serial specified."""
642         
643         props = self.version_get_properties(serial)
644         if not props:
645             return
646         node = props[NODE]
647         hash = props[HASH]
648         size = props[SIZE]
649         cluster = props[CLUSTER]
650         
651         mtime = time()
652         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
653         
654         s = self.versions.delete().where(self.versions.c.serial == serial)
655         self.conn.execute(s).close()
656         return hash
657     
658     def attribute_get(self, serial, domain, keys=()):
659         """Return a list of (key, value) pairs of the version specified by serial.
660            If keys is empty, return all attributes.
661            Othwerise, return only those specified.
662         """
663         
664         if keys:
665             attrs = self.attributes.alias()
666             s = select([attrs.c.key, attrs.c.value])
667             s = s.where(and_(attrs.c.key.in_(keys),
668                              attrs.c.serial == serial,
669                              attrs.c.domain == domain))
670         else:
671             attrs = self.attributes.alias()
672             s = select([attrs.c.key, attrs.c.value])
673             s = s.where(and_(attrs.c.serial == serial,
674                              attrs.c.domain == domain))
675         r = self.conn.execute(s)
676         l = r.fetchall()
677         r.close()
678         return l
679     
680     def attribute_set(self, serial, domain, items):
681         """Set the attributes of the version specified by serial.
682            Receive attributes as an iterable of (key, value) pairs.
683         """
684         #insert or replace
685         #TODO better upsert
686         for k, v in items:
687             s = self.attributes.update()
688             s = s.where(and_(self.attributes.c.serial == serial,
689                              self.attributes.c.domain == domain,
690                              self.attributes.c.key == k))
691             s = s.values(value = v)
692             rp = self.conn.execute(s)
693             rp.close()
694             if rp.rowcount == 0:
695                 s = self.attributes.insert()
696                 s = s.values(serial=serial, domain=domain, key=k, value=v)
697                 self.conn.execute(s).close()
698     
699     def attribute_del(self, serial, domain, keys=()):
700         """Delete attributes of the version specified by serial.
701            If keys is empty, delete all attributes.
702            Otherwise delete those specified.
703         """
704         
705         if keys:
706             #TODO more efficient way to do this?
707             for key in keys:
708                 s = self.attributes.delete()
709                 s = s.where(and_(self.attributes.c.serial == serial,
710                                  self.attributes.c.domain == domain,
711                                  self.attributes.c.key == key))
712                 self.conn.execute(s).close()
713         else:
714             s = self.attributes.delete()
715             s = s.where(and_(self.attributes.c.serial == serial,
716                              self.attributes.c.domain == domain))
717             self.conn.execute(s).close()
718     
719     def attribute_copy(self, source, dest):
720         s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
721             self.attributes.c.serial == source)
722         rp = self.conn.execute(s)
723         attributes = rp.fetchall()
724         rp.close()
725         for dest, domain, k, v in attributes:
726             #insert or replace
727             s = self.attributes.update().where(and_(
728                 self.attributes.c.serial == dest,
729                 self.attributes.c.domain == domain,
730                 self.attributes.c.key == k))
731             rp = self.conn.execute(s, value=v)
732             rp.close()
733             if rp.rowcount == 0:
734                 s = self.attributes.insert()
735                 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
736                 self.conn.execute(s, values).close()
737     
738     def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
739         """Return a list with all keys pairs defined
740            for all latest versions under parent that
741            do not belong to the cluster.
742         """
743         
744         # TODO: Use another table to store before=inf results.
745         a = self.attributes.alias('a')
746         v = self.versions.alias('v')
747         n = self.nodes.alias('n')
748         s = select([a.c.key]).distinct()
749         filtered = select([func.max(self.versions.c.serial)])
750         if before != inf:
751             filtered = filtered.where(self.versions.c.mtime < before)
752         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
753         s = s.where(v.c.cluster != except_cluster)
754         s = s.where(v.c.node.in_(select([self.nodes.c.node],
755             self.nodes.c.parent == parent)))
756         s = s.where(a.c.serial == v.c.serial)
757         s = s.where(a.c.domain == domain)
758         s = s.where(n.c.node == v.c.node)
759         conj = []
760         for x in pathq:
761             conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
762         if conj:
763             s = s.where(or_(*conj))
764         rp = self.conn.execute(s)
765         rows = rp.fetchall()
766         rp.close()
767         return [r[0] for r in rows]
768     
769     def latest_version_list(self, parent, prefix='', delimiter=None,
770                             start='', limit=10000, before=inf,
771                             except_cluster=0, pathq=[], domain=None, filterq=[], sizeq=None):
772         """Return a (list of (path, serial) tuples, list of common prefixes)
773            for the current versions of the paths with the given parent,
774            matching the following criteria.
775            
776            The property tuple for a version is returned if all
777            of these conditions are true:
778                 
779                 a. parent matches
780                 
781                 b. path > start
782                 
783                 c. path starts with prefix (and paths in pathq)
784                 
785                 d. version is the max up to before
786                 
787                 e. version is not in cluster
788                 
789                 f. the path does not have the delimiter occuring
790                    after the prefix, or ends with the delimiter
791                 
792                 g. serial matches the attribute filter query.
793                    
794                    A filter query is a comma-separated list of
795                    terms in one of these three forms:
796                    
797                    key
798                        an attribute with this key must exist
799                    
800                    !key
801                        an attribute with this key must not exist
802                    
803                    key ?op value
804                        the attribute with this key satisfies the value
805                        where ?op is one of ==, != <=, >=, <, >.
806                 
807                 h. the size is in the range set by sizeq
808            
809            The list of common prefixes includes the prefixes
810            matching up to the first delimiter after prefix,
811            and are reported only once, as "virtual directories".
812            The delimiter is included in the prefixes.
813            
814            If arguments are None, then the corresponding matching rule
815            will always match.
816            
817            Limit applies to the first list of tuples returned.
818         """
819         
820         if not start or start < prefix:
821             start = strprevling(prefix)
822         nextling = strnextling(prefix)
823         
824         v = self.versions.alias('v')
825         n = self.nodes.alias('n')
826         s = select([n.c.path, v.c.serial]).distinct()
827         filtered = select([func.max(self.versions.c.serial)])
828         if before != inf:
829             filtered = filtered.where(self.versions.c.mtime < before)
830         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
831         s = s.where(v.c.cluster != except_cluster)
832         s = s.where(v.c.node.in_(select([self.nodes.c.node],
833             self.nodes.c.parent == parent)))
834         
835         s = s.where(n.c.node == v.c.node)
836         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
837         conj = []
838         for x in pathq:
839             conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
840         if conj:
841             s = s.where(or_(*conj))
842         
843         if sizeq and len(sizeq) == 2:
844             if sizeq[0]:
845                 s = s.where(v.c.size >= sizeq[0])
846             if sizeq[1]:
847                 s = s.where(v.c.size < sizeq[1])
848         
849         if domain and filterq:
850             a = self.attributes.alias('a')
851             included, excluded, opers = parse_filters(filterq)
852             if included:
853                 subs = select([1])
854                 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
855                 subs = subs.where(a.c.domain == domain)
856                 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
857                 s = s.where(exists(subs))
858             if excluded:
859                 subs = select([1])
860                 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
861                 subs = subs.where(a.c.domain == domain)
862                 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
863                 s = s.where(not_(exists(subs)))
864             if opers:
865                 for k, o, val in opers:
866                     subs = select([1])
867                     subs = subs.where(a.c.serial == v.c.serial).correlate(v)
868                     subs = subs.where(a.c.domain == domain)
869                     subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
870                     s = s.where(exists(subs))
871         
872         s = s.order_by(n.c.path)
873         
874         if not delimiter:
875             s = s.limit(limit)
876             rp = self.conn.execute(s, start=start)
877             r = rp.fetchall()
878             rp.close()
879             return r, ()
880         
881         pfz = len(prefix)
882         dz = len(delimiter)
883         count = 0
884         prefixes = []
885         pappend = prefixes.append
886         matches = []
887         mappend = matches.append
888         
889         rp = self.conn.execute(s, start=start)
890         while True:
891             props = rp.fetchone()
892             if props is None:
893                 break
894             path, serial = props
895             idx = path.find(delimiter, pfz)
896             
897             if idx < 0:
898                 mappend(props)
899                 count += 1
900                 if count >= limit:
901                     break
902                 continue
903             
904             if idx + dz == len(path):
905                 mappend(props)
906                 count += 1
907                 continue # Get one more, in case there is a path.
908             pf = path[:idx + dz]
909             pappend(pf)
910             if count >= limit: 
911                 break
912             
913             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
914         rp.close()
915         
916         return matches, prefixes
917     
918     def latest_uuid(self, uuid):
919         """Return a (path, serial) tuple, for the latest version of the given uuid."""
920         
921         v = self.versions.alias('v')
922         n = self.nodes.alias('n')
923         s = select([n.c.path, v.c.serial])
924         filtered = select([func.max(self.versions.c.serial)])
925         s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
926         s = s.where(n.c.node == v.c.node)
927         
928         r = self.conn.execute(s)
929         l = r.fetchone()
930         r.close()
931         return l