Progress II: update sqlalchemy pithos backend lib
[pithos] / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41
42 from dbworker import DBWorker
43
44 from pithos.backends.filter import parse_filters
45
46
47 ROOTNODE  = 0
48
49 ( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
50
51 ( MATCH_PREFIX, MATCH_EXACT ) = range(2)
52
53 inf = float('inf')
54
55
56 def strnextling(prefix):
57     """Return the first unicode string
58        greater than but not starting with given prefix.
59        strnextling('hello') -> 'hellp'
60     """
61     if not prefix:
62         ## all strings start with the null string,
63         ## therefore we have to approximate strnextling('')
64         ## with the last unicode character supported by python
65         ## 0x10ffff for wide (32-bit unicode) python builds
66         ## 0x00ffff for narrow (16-bit unicode) python builds
67         ## We will not autodetect. 0xffff is safe enough.
68         return unichr(0xffff)
69     s = prefix[:-1]
70     c = ord(prefix[-1])
71     if c >= 0xffff:
72         raise RuntimeError
73     s += unichr(c+1)
74     return s
75
76 def strprevling(prefix):
77     """Return an approximation of the last unicode string
78        less than but not starting with given prefix.
79        strprevling(u'hello') -> u'helln\\xffff'
80     """
81     if not prefix:
82         ## There is no prevling for the null string
83         return prefix
84     s = prefix[:-1]
85     c = ord(prefix[-1])
86     if c > 0:
87         s += unichr(c-1) + unichr(0xffff)
88     return s
89
90 _propnames = {
91     'serial'    : 0,
92     'node'      : 1,
93     'hash'      : 2,
94     'size'      : 3,
95     'type'      : 4,
96     'source'    : 5,
97     'mtime'     : 6,
98     'muser'     : 7,
99     'uuid'      : 8,
100     'checksum'  : 9,
101     'cluster'   : 10
102 }
103
104
105 class Node(DBWorker):
106     """Nodes store path organization and have multiple versions.
107        Versions store object history and have multiple attributes.
108        Attributes store metadata.
109     """
110     
111     # TODO: Provide an interface for included and excluded clusters.
112     
113     def __init__(self, **params):
114         DBWorker.__init__(self, **params)
115         metadata = MetaData()
116         
117         #create nodes table
118         columns=[]
119         columns.append(Column('node', Integer, primary_key=True))
120         columns.append(Column('parent', Integer,
121                               ForeignKey('nodes.node',
122                                          ondelete='CASCADE',
123                                          onupdate='CASCADE'),
124                               autoincrement=False))
125         columns.append(Column('path', String(2048), default='', nullable=False))
126         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
127         Index('idx_nodes_path', self.nodes.c.path, unique=True)
128         
129         #create policy table
130         columns=[]
131         columns.append(Column('node', Integer,
132                               ForeignKey('nodes.node',
133                                          ondelete='CASCADE',
134                                          onupdate='CASCADE'),
135                               primary_key=True))
136         columns.append(Column('key', String(128), primary_key=True))
137         columns.append(Column('value', String(256)))
138         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
139         
140         #create statistics table
141         columns=[]
142         columns.append(Column('node', Integer,
143                               ForeignKey('nodes.node',
144                                          ondelete='CASCADE',
145                                          onupdate='CASCADE'),
146                               primary_key=True))
147         columns.append(Column('population', Integer, nullable=False, default=0))
148         columns.append(Column('size', BigInteger, nullable=False, default=0))
149         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
150         columns.append(Column('cluster', Integer, nullable=False, default=0,
151                               primary_key=True, autoincrement=False))
152         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
153         
154         #create versions table
155         columns=[]
156         columns.append(Column('serial', Integer, primary_key=True))
157         columns.append(Column('node', Integer,
158                               ForeignKey('nodes.node',
159                                          ondelete='CASCADE',
160                                          onupdate='CASCADE')))
161         columns.append(Column('hash', String(256)))
162         columns.append(Column('size', BigInteger, nullable=False, default=0))
163         columns.append(Column('type', String(256), nullable=False, default=''))
164         columns.append(Column('source', Integer))
165         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
166         columns.append(Column('muser', String(256), nullable=False, default=''))
167         columns.append(Column('uuid', String(64), nullable=False, default=''))
168         columns.append(Column('checksum', String(256), nullable=False, default=''))
169         columns.append(Column('cluster', Integer, nullable=False, default=0))
170         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
171         Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
172         Index('idx_versions_node_uuid', self.versions.c.uuid)
173         
174         #create attributes table
175         columns = []
176         columns.append(Column('serial', Integer,
177                               ForeignKey('versions.serial',
178                                          ondelete='CASCADE',
179                                          onupdate='CASCADE'),
180                               primary_key=True))
181         columns.append(Column('domain', String(256), primary_key=True))
182         columns.append(Column('key', String(128), primary_key=True))
183         columns.append(Column('value', String(256)))
184         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
185         
186         metadata.create_all(self.engine)
187         
188         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
189                                            self.nodes.c.parent == ROOTNODE))
190         rp = self.conn.execute(s)
191         r = rp.fetchone()
192         rp.close()
193         if not r:
194             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
195             self.conn.execute(s)
196     
197     def node_create(self, parent, path):
198         """Create a new node from the given properties.
199            Return the node identifier of the new node.
200         """
201         #TODO catch IntegrityError?
202         s = self.nodes.insert().values(parent=parent, path=path)
203         r = self.conn.execute(s)
204         inserted_primary_key = r.inserted_primary_key[0]
205         r.close()
206         return inserted_primary_key
207     
208     def node_lookup(self, path):
209         """Lookup the current node of the given path.
210            Return None if the path is not found.
211         """
212         
213         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
214         s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
215         r = self.conn.execute(s)
216         row = r.fetchone()
217         r.close()
218         if row:
219             return row[0]
220         return None
221     
222     def node_lookup_bulk(self, paths):
223         """Lookup the current nodes for the given paths.
224            Return () if the path is not found.
225         """
226         
227         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
228         s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
229         r = self.conn.execute(s)
230         rows = r.fetchall()
231         r.close()
232         return [row[0] for row in rows]
233     
234     def node_get_properties(self, node):
235         """Return the node's (parent, path).
236            Return None if the node is not found.
237         """
238         
239         s = select([self.nodes.c.parent, self.nodes.c.path])
240         s = s.where(self.nodes.c.node == node)
241         r = self.conn.execute(s)
242         l = r.fetchone()
243         r.close()
244         return l
245     
246     def node_get_versions(self, node, keys=(), propnames=_propnames):
247         """Return the properties of all versions at node.
248            If keys is empty, return all properties in the order
249            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
250         """
251         
252         s = select([self.versions.c.serial,
253                     self.versions.c.node,
254                     self.versions.c.hash,
255                     self.versions.c.size,
256                     self.versions.c.type,
257                     self.versions.c.source,
258                     self.versions.c.mtime,
259                     self.versions.c.muser,
260                     self.versions.c.uuid,
261                     self.versions.c.checksum,
262                     self.versions.c.cluster], self.versions.c.node == node)
263         s = s.order_by(self.versions.c.serial)
264         r = self.conn.execute(s)
265         rows = r.fetchall()
266         r.close()
267         if not rows:
268             return rows
269         
270         if not keys:
271             return rows
272         
273         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
274     
275     def node_count_children(self, node):
276         """Return node's child count."""
277         
278         s = select([func.count(self.nodes.c.node)])
279         s = s.where(and_(self.nodes.c.parent == node,
280                          self.nodes.c.node != ROOTNODE))
281         r = self.conn.execute(s)
282         row = r.fetchone()
283         r.close()
284         return row[0]
285     
286     def node_purge_children(self, parent, before=inf, cluster=0):
287         """Delete all versions with the specified
288            parent and cluster, and return
289            the hashes and size of versions deleted.
290            Clears out nodes with no remaining versions.
291         """
292         #update statistics
293         c1 = select([self.nodes.c.node],
294             self.nodes.c.parent == parent)
295         where_clause = and_(self.versions.c.node.in_(c1),
296                             self.versions.c.cluster == cluster)
297         s = select([func.count(self.versions.c.serial),
298                     func.sum(self.versions.c.size)])
299         s = s.where(where_clause)
300         if before != inf:
301             s = s.where(self.versions.c.mtime <= before)
302         r = self.conn.execute(s)
303         row = r.fetchone()
304         r.close()
305         if not row:
306             return (), 0
307         nr, size = row[0], -row[1] if row[1] else 0
308         mtime = time()
309         self.statistics_update(parent, -nr, size, mtime, cluster)
310         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
311         
312         s = select([self.versions.c.hash])
313         s = s.where(where_clause)
314         r = self.conn.execute(s)
315         hashes = [row[0] for row in r.fetchall()]
316         r.close()
317         
318         #delete versions
319         s = self.versions.delete().where(where_clause)
320         r = self.conn.execute(s)
321         r.close()
322         
323         #delete nodes
324         s = select([self.nodes.c.node],
325             and_(self.nodes.c.parent == parent,
326                  select([func.count(self.versions.c.serial)],
327                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
328         rp = self.conn.execute(s)
329         nodes = [r[0] for r in rp.fetchall()]
330         rp.close()
331         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
332         self.conn.execute(s).close()
333         
334         return hashes, size
335     
336     def node_purge(self, node, before=inf, cluster=0):
337         """Delete all versions with the specified
338            node and cluster, and return
339            the hashes and size of versions deleted.
340            Clears out the node if it has no remaining versions.
341         """
342         
343         #update statistics
344         s = select([func.count(self.versions.c.serial),
345                     func.sum(self.versions.c.size)])
346         where_clause = and_(self.versions.c.node == node,
347                          self.versions.c.cluster == cluster)
348         s = s.where(where_clause)
349         if before != inf:
350             s = s.where(self.versions.c.mtime <= before)
351         r = self.conn.execute(s)
352         row = r.fetchone()
353         nr, size = row[0], row[1]
354         r.close()
355         if not nr:
356             return (), 0
357         mtime = time()
358         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
359         
360         s = select([self.versions.c.hash])
361         s = s.where(where_clause)
362         r = self.conn.execute(s)
363         hashes = [r[0] for r in r.fetchall()]
364         r.close()
365         
366         #delete versions
367         s = self.versions.delete().where(where_clause)
368         r = self.conn.execute(s)
369         r.close()
370         
371         #delete nodes
372         s = select([self.nodes.c.node],
373             and_(self.nodes.c.node == node,
374                  select([func.count(self.versions.c.serial)],
375                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
376         r = self.conn.execute(s)
377         nodes = r.fetchall()
378         r.close()
379         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
380         self.conn.execute(s).close()
381         
382         return hashes, size
383     
384     def node_remove(self, node):
385         """Remove the node specified.
386            Return false if the node has children or is not found.
387         """
388         
389         if self.node_count_children(node):
390             return False
391         
392         mtime = time()
393         s = select([func.count(self.versions.c.serial),
394                     func.sum(self.versions.c.size),
395                     self.versions.c.cluster])
396         s = s.where(self.versions.c.node == node)
397         s = s.group_by(self.versions.c.cluster)
398         r = self.conn.execute(s)
399         for population, size, cluster in r.fetchall():
400             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
401         r.close()
402         
403         s = self.nodes.delete().where(self.nodes.c.node == node)
404         self.conn.execute(s).close()
405         return True
406     
407     def policy_get(self, node):
408         s = select([self.policies.c.key, self.policies.c.value],
409             self.policies.c.node==node)
410         r = self.conn.execute(s)
411         d = dict(r.fetchall())
412         r.close()
413         return d
414     
415     def policy_set(self, node, policy):
416         #insert or replace
417         for k, v in policy.iteritems():
418             s = self.policies.update().where(and_(self.policies.c.node == node,
419                                                   self.policies.c.key == k))
420             s = s.values(value = v)
421             rp = self.conn.execute(s)
422             rp.close()
423             if rp.rowcount == 0:
424                 s = self.policies.insert()
425                 values = {'node':node, 'key':k, 'value':v}
426                 r = self.conn.execute(s, values)
427                 r.close()
428     
429     def statistics_get(self, node, cluster=0):
430         """Return population, total size and last mtime
431            for all versions under node that belong to the cluster.
432         """
433         
434         s = select([self.statistics.c.population,
435                     self.statistics.c.size,
436                     self.statistics.c.mtime])
437         s = s.where(and_(self.statistics.c.node == node,
438                          self.statistics.c.cluster == cluster))
439         r = self.conn.execute(s)
440         row = r.fetchone()
441         r.close()
442         return row
443     
444     def statistics_update(self, node, population, size, mtime, cluster=0):
445         """Update the statistics of the given node.
446            Statistics keep track the population, total
447            size of objects and mtime in the node's namespace.
448            May be zero or positive or negative numbers.
449         """
450         s = select([self.statistics.c.population, self.statistics.c.size],
451             and_(self.statistics.c.node == node,
452                  self.statistics.c.cluster == cluster))
453         rp = self.conn.execute(s)
454         r = rp.fetchone()
455         rp.close()
456         if not r:
457             prepopulation, presize = (0, 0)
458         else:
459             prepopulation, presize = r
460         population += prepopulation
461         size += presize
462         
463         #insert or replace
464         #TODO better upsert
465         u = self.statistics.update().where(and_(self.statistics.c.node==node,
466                                            self.statistics.c.cluster==cluster))
467         u = u.values(population=population, size=size, mtime=mtime)
468         rp = self.conn.execute(u)
469         rp.close()
470         if rp.rowcount == 0:
471             ins = self.statistics.insert()
472             ins = ins.values(node=node, population=population, size=size,
473                              mtime=mtime, cluster=cluster)
474             self.conn.execute(ins).close()
475     
476     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
477         """Update the statistics of the given node's parent.
478            Then recursively update all parents up to the root.
479            Population is not recursive.
480         """
481         
482         while True:
483             if node == ROOTNODE:
484                 break
485             props = self.node_get_properties(node)
486             if props is None:
487                 break
488             parent, path = props
489             self.statistics_update(parent, population, size, mtime, cluster)
490             node = parent
491             population = 0 # Population isn't recursive
492     
493     def statistics_latest(self, node, before=inf, except_cluster=0):
494         """Return population, total size and last mtime
495            for all latest versions under node that
496            do not belong to the cluster.
497         """
498         
499         # The node.
500         props = self.node_get_properties(node)
501         if props is None:
502             return None
503         parent, path = props
504         
505         # The latest version.
506         s = select([self.versions.c.serial,
507                     self.versions.c.node,
508                     self.versions.c.hash,
509                     self.versions.c.size,
510                     self.versions.c.type,
511                     self.versions.c.source,
512                     self.versions.c.mtime,
513                     self.versions.c.muser,
514                     self.versions.c.uuid,
515                     self.versions.c.checksum,
516                     self.versions.c.cluster])
517         filtered = select([func.max(self.versions.c.serial)],
518                             self.versions.c.node == node)
519         if before != inf:
520             filtered = filtered.where(self.versions.c.mtime < before)
521         s = s.where(and_(self.versions.c.cluster != except_cluster,
522                          self.versions.c.serial == filtered))
523         r = self.conn.execute(s)
524         props = r.fetchone()
525         r.close()
526         if not props:
527             return None
528         mtime = props[MTIME]
529         
530         # First level, just under node (get population).
531         v = self.versions.alias('v')
532         s = select([func.count(v.c.serial),
533                     func.sum(v.c.size),
534                     func.max(v.c.mtime)])
535         c1 = select([func.max(self.versions.c.serial)])
536         if before != inf:
537             c1 = c1.where(self.versions.c.mtime < before)
538         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
539         s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
540                          v.c.cluster != except_cluster,
541                          v.c.node.in_(c2)))
542         rp = self.conn.execute(s)
543         r = rp.fetchone()
544         rp.close()
545         if not r:
546             return None
547         count = r[0]
548         mtime = max(mtime, r[2])
549         if count == 0:
550             return (0, 0, mtime)
551         
552         # All children (get size and mtime).
553         # This is why the full path is stored.
554         s = select([func.count(v.c.serial),
555                     func.sum(v.c.size),
556                     func.max(v.c.mtime)])
557         c1 = select([func.max(self.versions.c.serial)],
558             self.versions.c.node == v.c.node)
559         if before != inf:
560             c1 = c1.where(self.versions.c.mtime < before)
561         c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
562         s = s.where(and_(v.c.serial == c1,
563                          v.c.cluster != except_cluster,
564                          v.c.node.in_(c2)))
565         rp = self.conn.execute(s)
566         r = rp.fetchone()
567         rp.close()
568         if not r:
569             return None
570         size = r[1] - props[SIZE]
571         mtime = max(mtime, r[2])
572         return (count, size, mtime)
573     
574     def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
575         """Create a new version from the given properties.
576            Return the (serial, mtime) of the new version.
577         """
578         
579         mtime = time()
580         s = self.versions.insert().values(node=node, hash=hash, size=size, type=type, source=source,
581                                           mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
582         serial = self.conn.execute(s).inserted_primary_key[0]
583         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
584         return serial, mtime
585     
586     def version_lookup(self, node, before=inf, cluster=0, all_props=True):
587         """Lookup the current version of the given node.
588            Return a list with its properties:
589            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
590            or None if the current version is not found in the given cluster.
591         """
592         
593         v = self.versions.alias('v')
594         if not all_props:
595             s = select([v.c.serial])
596         else:
597             s = select([v.c.serial, v.c.node, v.c.hash,
598                         v.c.size, v.c.type, v.c.source,
599                         v.c.mtime, v.c.muser, v.c.uuid,
600                         v.c.checksum, v.c.cluster])
601         c = select([func.max(self.versions.c.serial)],
602             self.versions.c.node == node)
603         if before != inf:
604             c = c.where(self.versions.c.mtime < before)
605         s = s.where(and_(v.c.serial == c,
606                          v.c.cluster == cluster))
607         r = self.conn.execute(s)
608         props = r.fetchone()
609         r.close()
610         if props:
611             return props
612         return None
613     
614     def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
615         """Lookup the current versions of the given nodes.
616            Return a list with their properties:
617            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
618         """
619         
620         v = self.versions.alias('v')
621         if not all_props:
622             s = select([v.c.serial])
623         else:
624             s = select([v.c.serial, v.c.node, v.c.hash,
625                         v.c.size, v.c.type, v.c.source,
626                         v.c.mtime, v.c.muser, v.c.uuid,
627                         v.c.checksum, v.c.cluster])
628         c = select([func.max(self.versions.c.serial)],
629             self.versions.c.node.in_(nodes)).group_by(self.versions.c.node)
630         if before != inf:
631             c = c.where(self.versions.c.mtime < before)
632         s = s.where(and_(v.c.serial.in_(c),
633                          v.c.cluster == cluster))
634         r = self.conn.execute(s)
635         rproxy = r.fetchall()
636         r.close()
637         return (tuple(row.values()) for row in rproxy)
638         
639     def version_get_properties(self, serial, keys=(), propnames=_propnames):
640         """Return a sequence of values for the properties of
641            the version specified by serial and the keys, in the order given.
642            If keys is empty, return all properties in the order
643            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
644         """
645         
646         v = self.versions.alias()
647         s = select([v.c.serial, v.c.node, v.c.hash,
648                     v.c.size, v.c.type, v.c.source,
649                     v.c.mtime, v.c.muser, v.c.uuid,
650                     v.c.checksum, v.c.cluster], v.c.serial == serial)
651         rp = self.conn.execute(s)
652         r = rp.fetchone()
653         rp.close()
654         if r is None:
655             return r
656         
657         if not keys:
658             return r
659         return [r[propnames[k]] for k in keys if k in propnames]
660     
661     def version_put_property(self, serial, key, value):
662         """Set value for the property of version specified by key."""
663         
664         if key not in _propnames:
665             return
666         s = self.versions.update()
667         s = s.where(self.versions.c.serial == serial)
668         s = s.values(**{key: value})
669         self.conn.execute(s).close()
670     
671     def version_recluster(self, serial, cluster):
672         """Move the version into another cluster."""
673         
674         props = self.version_get_properties(serial)
675         if not props:
676             return
677         node = props[NODE]
678         size = props[SIZE]
679         oldcluster = props[CLUSTER]
680         if cluster == oldcluster:
681             return
682         
683         mtime = time()
684         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
685         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
686         
687         s = self.versions.update()
688         s = s.where(self.versions.c.serial == serial)
689         s = s.values(cluster = cluster)
690         self.conn.execute(s).close()
691     
692     def version_remove(self, serial):
693         """Remove the serial specified."""
694         
695         props = self.version_get_properties(serial)
696         if not props:
697             return
698         node = props[NODE]
699         hash = props[HASH]
700         size = props[SIZE]
701         cluster = props[CLUSTER]
702         
703         mtime = time()
704         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
705         
706         s = self.versions.delete().where(self.versions.c.serial == serial)
707         self.conn.execute(s).close()
708         return hash, size
709     
710     def attribute_get(self, serial, domain, keys=()):
711         """Return a list of (key, value) pairs of the version specified by serial.
712            If keys is empty, return all attributes.
713            Othwerise, return only those specified.
714         """
715         
716         if keys:
717             attrs = self.attributes.alias()
718             s = select([attrs.c.key, attrs.c.value])
719             s = s.where(and_(attrs.c.key.in_(keys),
720                              attrs.c.serial == serial,
721                              attrs.c.domain == domain))
722         else:
723             attrs = self.attributes.alias()
724             s = select([attrs.c.key, attrs.c.value])
725             s = s.where(and_(attrs.c.serial == serial,
726                              attrs.c.domain == domain))
727         r = self.conn.execute(s)
728         l = r.fetchall()
729         r.close()
730         return l
731     
732     def attribute_set(self, serial, domain, items):
733         """Set the attributes of the version specified by serial.
734            Receive attributes as an iterable of (key, value) pairs.
735         """
736         #insert or replace
737         #TODO better upsert
738         for k, v in items:
739             s = self.attributes.update()
740             s = s.where(and_(self.attributes.c.serial == serial,
741                              self.attributes.c.domain == domain,
742                              self.attributes.c.key == k))
743             s = s.values(value = v)
744             rp = self.conn.execute(s)
745             rp.close()
746             if rp.rowcount == 0:
747                 s = self.attributes.insert()
748                 s = s.values(serial=serial, domain=domain, key=k, value=v)
749                 self.conn.execute(s).close()
750     
751     def attribute_del(self, serial, domain, keys=()):
752         """Delete attributes of the version specified by serial.
753            If keys is empty, delete all attributes.
754            Otherwise delete those specified.
755         """
756         
757         if keys:
758             #TODO more efficient way to do this?
759             for key in keys:
760                 s = self.attributes.delete()
761                 s = s.where(and_(self.attributes.c.serial == serial,
762                                  self.attributes.c.domain == domain,
763                                  self.attributes.c.key == key))
764                 self.conn.execute(s).close()
765         else:
766             s = self.attributes.delete()
767             s = s.where(and_(self.attributes.c.serial == serial,
768                              self.attributes.c.domain == domain))
769             self.conn.execute(s).close()
770     
771     def attribute_copy(self, source, dest):
772         s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
773             self.attributes.c.serial == source)
774         rp = self.conn.execute(s)
775         attributes = rp.fetchall()
776         rp.close()
777         for dest, domain, k, v in attributes:
778             #insert or replace
779             s = self.attributes.update().where(and_(
780                 self.attributes.c.serial == dest,
781                 self.attributes.c.domain == domain,
782                 self.attributes.c.key == k))
783             rp = self.conn.execute(s, value=v)
784             rp.close()
785             if rp.rowcount == 0:
786                 s = self.attributes.insert()
787                 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
788                 self.conn.execute(s, values).close()
789     
790     def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
791         """Return a list with all keys pairs defined
792            for all latest versions under parent that
793            do not belong to the cluster.
794         """
795         
796         # TODO: Use another table to store before=inf results.
797         a = self.attributes.alias('a')
798         v = self.versions.alias('v')
799         n = self.nodes.alias('n')
800         s = select([a.c.key]).distinct()
801         filtered = select([func.max(self.versions.c.serial)])
802         if before != inf:
803             filtered = filtered.where(self.versions.c.mtime < before)
804         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
805         s = s.where(v.c.cluster != except_cluster)
806         s = s.where(v.c.node.in_(select([self.nodes.c.node],
807             self.nodes.c.parent == parent)))
808         s = s.where(a.c.serial == v.c.serial)
809         s = s.where(a.c.domain == domain)
810         s = s.where(n.c.node == v.c.node)
811         conj = []
812         for path, match in pathq:
813             if match == MATCH_PREFIX:
814                 conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
815             elif match == MATCH_EXACT:
816                 conj.append(n.c.path == path)
817         if conj:
818             s = s.where(or_(*conj))
819         rp = self.conn.execute(s)
820         rows = rp.fetchall()
821         rp.close()
822         return [r[0] for r in rows]
823     
824     def latest_version_list(self, parent, prefix='', delimiter=None,
825                             start='', limit=10000, before=inf,
826                             except_cluster=0, pathq=[], domain=None,
827                             filterq=[], sizeq=None, all_props=False):
828         """Return a (list of (path, serial) tuples, list of common prefixes)
829            for the current versions of the paths with the given parent,
830            matching the following criteria.
831            
832            The property tuple for a version is returned if all
833            of these conditions are true:
834                 
835                 a. parent matches
836                 
837                 b. path > start
838                 
839                 c. path starts with prefix (and paths in pathq)
840                 
841                 d. version is the max up to before
842                 
843                 e. version is not in cluster
844                 
845                 f. the path does not have the delimiter occuring
846                    after the prefix, or ends with the delimiter
847                 
848                 g. serial matches the attribute filter query.
849                    
850                    A filter query is a comma-separated list of
851                    terms in one of these three forms:
852                    
853                    key
854                        an attribute with this key must exist
855                    
856                    !key
857                        an attribute with this key must not exist
858                    
859                    key ?op value
860                        the attribute with this key satisfies the value
861                        where ?op is one of ==, != <=, >=, <, >.
862                 
863                 h. the size is in the range set by sizeq
864            
865            The list of common prefixes includes the prefixes
866            matching up to the first delimiter after prefix,
867            and are reported only once, as "virtual directories".
868            The delimiter is included in the prefixes.
869            
870            If arguments are None, then the corresponding matching rule
871            will always match.
872            
873            Limit applies to the first list of tuples returned.
874            
875            If all_props is True, return all properties after path, not just serial.
876         """
877         
878         if not start or start < prefix:
879             start = strprevling(prefix)
880         nextling = strnextling(prefix)
881         
882         v = self.versions.alias('v')
883         n = self.nodes.alias('n')
884         if not all_props:
885             s = select([n.c.path, v.c.serial]).distinct()
886         else:
887             s = select([n.c.path,
888                         v.c.serial, v.c.node, v.c.hash,
889                         v.c.size, v.c.type, v.c.source,
890                         v.c.mtime, v.c.muser, v.c.uuid,
891                         v.c.checksum, v.c.cluster]).distinct()
892         filtered = select([func.max(self.versions.c.serial)])
893         if before != inf:
894             filtered = filtered.where(self.versions.c.mtime < before)
895         s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
896         s = s.where(v.c.cluster != except_cluster)
897         s = s.where(v.c.node.in_(select([self.nodes.c.node],
898             self.nodes.c.parent == parent)))
899         
900         s = s.where(n.c.node == v.c.node)
901         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
902         conj = []
903         for path, match in pathq:
904             if match == MATCH_PREFIX:
905                 conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
906             elif match == MATCH_EXACT:
907                 conj.append(n.c.path == path)
908         if conj:
909             s = s.where(or_(*conj))
910         
911         if sizeq and len(sizeq) == 2:
912             if sizeq[0]:
913                 s = s.where(v.c.size >= sizeq[0])
914             if sizeq[1]:
915                 s = s.where(v.c.size < sizeq[1])
916         
917         if domain and filterq:
918             a = self.attributes.alias('a')
919             included, excluded, opers = parse_filters(filterq)
920             if included:
921                 subs = select([1])
922                 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
923                 subs = subs.where(a.c.domain == domain)
924                 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
925                 s = s.where(exists(subs))
926             if excluded:
927                 subs = select([1])
928                 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
929                 subs = subs.where(a.c.domain == domain)
930                 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
931                 s = s.where(not_(exists(subs)))
932             if opers:
933                 for k, o, val in opers:
934                     subs = select([1])
935                     subs = subs.where(a.c.serial == v.c.serial).correlate(v)
936                     subs = subs.where(a.c.domain == domain)
937                     subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
938                     s = s.where(exists(subs))
939         
940         s = s.order_by(n.c.path)
941         
942         if not delimiter:
943             s = s.limit(limit)
944             rp = self.conn.execute(s, start=start)
945             r = rp.fetchall()
946             rp.close()
947             return r, ()
948         
949         pfz = len(prefix)
950         dz = len(delimiter)
951         count = 0
952         prefixes = []
953         pappend = prefixes.append
954         matches = []
955         mappend = matches.append
956         
957         rp = self.conn.execute(s, start=start)
958         while True:
959             props = rp.fetchone()
960             if props is None:
961                 break
962             path = props[0]
963             serial = props[1]
964             idx = path.find(delimiter, pfz)
965             
966             if idx < 0:
967                 mappend(props)
968                 count += 1
969                 if count >= limit:
970                     break
971                 continue
972             
973             if idx + dz == len(path):
974                 mappend(props)
975                 count += 1
976                 continue # Get one more, in case there is a path.
977             pf = path[:idx + dz]
978             pappend(pf)
979             if count >= limit: 
980                 break
981             
982             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
983         rp.close()
984         
985         return matches, prefixes
986     
987     def latest_uuid(self, uuid):
988         """Return a (path, serial) tuple, for the latest version of the given uuid."""
989         
990         v = self.versions.alias('v')
991         n = self.nodes.alias('n')
992         s = select([n.c.path, v.c.serial])
993         filtered = select([func.max(self.versions.c.serial)])
994         s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
995         s = s.where(n.c.node == v.c.node)
996         
997         r = self.conn.execute(s)
998         l = r.fetchone()
999         r.close()
1000         return l