Update alembic .ini
[pithos] / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41
42 from dbworker import DBWorker
43
44 from pithos.backends.filter import parse_filters
45
46
47 ROOTNODE  = 0
48
49 ( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
50
51 ( MATCH_PREFIX, MATCH_EXACT ) = range(2)
52
53 inf = float('inf')
54
55
56 def strnextling(prefix):
57     """Return the first unicode string
58        greater than but not starting with given prefix.
59        strnextling('hello') -> 'hellp'
60     """
61     if not prefix:
62         ## all strings start with the null string,
63         ## therefore we have to approximate strnextling('')
64         ## with the last unicode character supported by python
65         ## 0x10ffff for wide (32-bit unicode) python builds
66         ## 0x00ffff for narrow (16-bit unicode) python builds
67         ## We will not autodetect. 0xffff is safe enough.
68         return unichr(0xffff)
69     s = prefix[:-1]
70     c = ord(prefix[-1])
71     if c >= 0xffff:
72         raise RuntimeError
73     s += unichr(c+1)
74     return s
75
76 def strprevling(prefix):
77     """Return an approximation of the last unicode string
78        less than but not starting with given prefix.
79        strprevling(u'hello') -> u'helln\\xffff'
80     """
81     if not prefix:
82         ## There is no prevling for the null string
83         return prefix
84     s = prefix[:-1]
85     c = ord(prefix[-1])
86     if c > 0:
87         s += unichr(c-1) + unichr(0xffff)
88     return s
89
90 _propnames = {
91     'serial'    : 0,
92     'node'      : 1,
93     'hash'      : 2,
94     'size'      : 3,
95     'type'      : 4,
96     'source'    : 5,
97     'mtime'     : 6,
98     'muser'     : 7,
99     'uuid'      : 8,
100     'checksum'  : 9,
101     'cluster'   : 10
102 }
103
104
105 class Node(DBWorker):
106     """Nodes store path organization and have multiple versions.
107        Versions store object history and have multiple attributes.
108        Attributes store metadata.
109     """
110     
111     # TODO: Provide an interface for included and excluded clusters.
112     
113     def __init__(self, **params):
114         DBWorker.__init__(self, **params)
115         metadata = MetaData()
116         
117         #create nodes table
118         columns=[]
119         columns.append(Column('node', Integer, primary_key=True))
120         columns.append(Column('parent', Integer,
121                               ForeignKey('nodes.node',
122                                          ondelete='CASCADE',
123                                          onupdate='CASCADE'),
124                               autoincrement=False))
125         columns.append(Column('latest_version', Integer))
126         columns.append(Column('path', String(2048), default='', nullable=False))
127         self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
128         Index('idx_nodes_path', self.nodes.c.path, unique=True)
129         Index('idx_nodes_parent', self.nodes.c.parent)
130         
131         #create policy table
132         columns=[]
133         columns.append(Column('node', Integer,
134                               ForeignKey('nodes.node',
135                                          ondelete='CASCADE',
136                                          onupdate='CASCADE'),
137                               primary_key=True))
138         columns.append(Column('key', String(128), primary_key=True))
139         columns.append(Column('value', String(256)))
140         self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
141         
142         #create statistics table
143         columns=[]
144         columns.append(Column('node', Integer,
145                               ForeignKey('nodes.node',
146                                          ondelete='CASCADE',
147                                          onupdate='CASCADE'),
148                               primary_key=True))
149         columns.append(Column('population', Integer, nullable=False, default=0))
150         columns.append(Column('size', BigInteger, nullable=False, default=0))
151         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
152         columns.append(Column('cluster', Integer, nullable=False, default=0,
153                               primary_key=True, autoincrement=False))
154         self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
155         
156         #create versions table
157         columns=[]
158         columns.append(Column('serial', Integer, primary_key=True))
159         columns.append(Column('node', Integer,
160                               ForeignKey('nodes.node',
161                                          ondelete='CASCADE',
162                                          onupdate='CASCADE')))
163         columns.append(Column('hash', String(256)))
164         columns.append(Column('size', BigInteger, nullable=False, default=0))
165         columns.append(Column('type', String(256), nullable=False, default=''))
166         columns.append(Column('source', Integer))
167         columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
168         columns.append(Column('muser', String(256), nullable=False, default=''))
169         columns.append(Column('uuid', String(64), nullable=False, default=''))
170         columns.append(Column('checksum', String(256), nullable=False, default=''))
171         columns.append(Column('cluster', Integer, nullable=False, default=0))
172         self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
173         Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
174         Index('idx_versions_node_uuid', self.versions.c.uuid)
175         
176         #create attributes table
177         columns = []
178         columns.append(Column('serial', Integer,
179                               ForeignKey('versions.serial',
180                                          ondelete='CASCADE',
181                                          onupdate='CASCADE'),
182                               primary_key=True))
183         columns.append(Column('domain', String(256), primary_key=True))
184         columns.append(Column('key', String(128), primary_key=True))
185         columns.append(Column('value', String(256)))
186         self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
187         
188         metadata.create_all(self.engine)
189         
190         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
191                                            self.nodes.c.parent == ROOTNODE))
192         rp = self.conn.execute(s)
193         r = rp.fetchone()
194         rp.close()
195         if not r:
196             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
197             self.conn.execute(s)
198     
199     def node_create(self, parent, path):
200         """Create a new node from the given properties.
201            Return the node identifier of the new node.
202         """
203         #TODO catch IntegrityError?
204         s = self.nodes.insert().values(parent=parent, path=path)
205         r = self.conn.execute(s)
206         inserted_primary_key = r.inserted_primary_key[0]
207         r.close()
208         return inserted_primary_key
209     
210     def node_lookup(self, path):
211         """Lookup the current node of the given path.
212            Return None if the path is not found.
213         """
214         
215         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
216         s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
217         r = self.conn.execute(s)
218         row = r.fetchone()
219         r.close()
220         if row:
221             return row[0]
222         return None
223     
224     def node_lookup_bulk(self, paths):
225         """Lookup the current nodes for the given paths.
226            Return () if the path is not found.
227         """
228         
229         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
230         s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
231         r = self.conn.execute(s)
232         rows = r.fetchall()
233         r.close()
234         return [row[0] for row in rows]
235     
236     def node_get_properties(self, node):
237         """Return the node's (parent, path).
238            Return None if the node is not found.
239         """
240         
241         s = select([self.nodes.c.parent, self.nodes.c.path])
242         s = s.where(self.nodes.c.node == node)
243         r = self.conn.execute(s)
244         l = r.fetchone()
245         r.close()
246         return l
247     
248     def node_get_versions(self, node, keys=(), propnames=_propnames):
249         """Return the properties of all versions at node.
250            If keys is empty, return all properties in the order
251            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
252         """
253         
254         s = select([self.versions.c.serial,
255                     self.versions.c.node,
256                     self.versions.c.hash,
257                     self.versions.c.size,
258                     self.versions.c.type,
259                     self.versions.c.source,
260                     self.versions.c.mtime,
261                     self.versions.c.muser,
262                     self.versions.c.uuid,
263                     self.versions.c.checksum,
264                     self.versions.c.cluster], self.versions.c.node == node)
265         s = s.order_by(self.versions.c.serial)
266         r = self.conn.execute(s)
267         rows = r.fetchall()
268         r.close()
269         if not rows:
270             return rows
271         
272         if not keys:
273             return rows
274         
275         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
276     
277     def node_count_children(self, node):
278         """Return node's child count."""
279         
280         s = select([func.count(self.nodes.c.node)])
281         s = s.where(and_(self.nodes.c.parent == node,
282                          self.nodes.c.node != ROOTNODE))
283         r = self.conn.execute(s)
284         row = r.fetchone()
285         r.close()
286         return row[0]
287     
288     def node_purge_children(self, parent, before=inf, cluster=0):
289         """Delete all versions with the specified
290            parent and cluster, and return
291            the hashes and size of versions deleted.
292            Clears out nodes with no remaining versions.
293         """
294         #update statistics
295         c1 = select([self.nodes.c.node],
296             self.nodes.c.parent == parent)
297         where_clause = and_(self.versions.c.node.in_(c1),
298                             self.versions.c.cluster == cluster)
299         s = select([func.count(self.versions.c.serial),
300                     func.sum(self.versions.c.size)])
301         s = s.where(where_clause)
302         if before != inf:
303             s = s.where(self.versions.c.mtime <= before)
304         r = self.conn.execute(s)
305         row = r.fetchone()
306         r.close()
307         if not row:
308             return (), 0
309         nr, size = row[0], -row[1] if row[1] else 0
310         mtime = time()
311         self.statistics_update(parent, -nr, size, mtime, cluster)
312         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
313         
314         s = select([self.versions.c.hash])
315         s = s.where(where_clause)
316         r = self.conn.execute(s)
317         hashes = [row[0] for row in r.fetchall()]
318         r.close()
319         
320         #delete versions
321         s = self.versions.delete().where(where_clause)
322         r = self.conn.execute(s)
323         r.close()
324         
325         #delete nodes
326         s = select([self.nodes.c.node],
327             and_(self.nodes.c.parent == parent,
328                  select([func.count(self.versions.c.serial)],
329                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
330         rp = self.conn.execute(s)
331         nodes = [r[0] for r in rp.fetchall()]
332         rp.close()
333         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
334         self.conn.execute(s).close()
335         
336         return hashes, size
337     
338     def node_purge(self, node, before=inf, cluster=0):
339         """Delete all versions with the specified
340            node and cluster, and return
341            the hashes and size of versions deleted.
342            Clears out the node if it has no remaining versions.
343         """
344         
345         #update statistics
346         s = select([func.count(self.versions.c.serial),
347                     func.sum(self.versions.c.size)])
348         where_clause = and_(self.versions.c.node == node,
349                          self.versions.c.cluster == cluster)
350         s = s.where(where_clause)
351         if before != inf:
352             s = s.where(self.versions.c.mtime <= before)
353         r = self.conn.execute(s)
354         row = r.fetchone()
355         nr, size = row[0], row[1]
356         r.close()
357         if not nr:
358             return (), 0
359         mtime = time()
360         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
361         
362         s = select([self.versions.c.hash])
363         s = s.where(where_clause)
364         r = self.conn.execute(s)
365         hashes = [r[0] for r in r.fetchall()]
366         r.close()
367         
368         #delete versions
369         s = self.versions.delete().where(where_clause)
370         r = self.conn.execute(s)
371         r.close()
372         
373         #delete nodes
374         s = select([self.nodes.c.node],
375             and_(self.nodes.c.node == node,
376                  select([func.count(self.versions.c.serial)],
377                     self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
378         r = self.conn.execute(s)
379         nodes = r.fetchall()
380         r.close()
381         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
382         self.conn.execute(s).close()
383         
384         return hashes, size
385     
386     def node_remove(self, node):
387         """Remove the node specified.
388            Return false if the node has children or is not found.
389         """
390         
391         if self.node_count_children(node):
392             return False
393         
394         mtime = time()
395         s = select([func.count(self.versions.c.serial),
396                     func.sum(self.versions.c.size),
397                     self.versions.c.cluster])
398         s = s.where(self.versions.c.node == node)
399         s = s.group_by(self.versions.c.cluster)
400         r = self.conn.execute(s)
401         for population, size, cluster in r.fetchall():
402             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
403         r.close()
404         
405         s = self.nodes.delete().where(self.nodes.c.node == node)
406         self.conn.execute(s).close()
407         return True
408     
409     def policy_get(self, node):
410         s = select([self.policies.c.key, self.policies.c.value],
411             self.policies.c.node==node)
412         r = self.conn.execute(s)
413         d = dict(r.fetchall())
414         r.close()
415         return d
416     
417     def policy_set(self, node, policy):
418         #insert or replace
419         for k, v in policy.iteritems():
420             s = self.policies.update().where(and_(self.policies.c.node == node,
421                                                   self.policies.c.key == k))
422             s = s.values(value = v)
423             rp = self.conn.execute(s)
424             rp.close()
425             if rp.rowcount == 0:
426                 s = self.policies.insert()
427                 values = {'node':node, 'key':k, 'value':v}
428                 r = self.conn.execute(s, values)
429                 r.close()
430     
431     def statistics_get(self, node, cluster=0):
432         """Return population, total size and last mtime
433            for all versions under node that belong to the cluster.
434         """
435         
436         s = select([self.statistics.c.population,
437                     self.statistics.c.size,
438                     self.statistics.c.mtime])
439         s = s.where(and_(self.statistics.c.node == node,
440                          self.statistics.c.cluster == cluster))
441         r = self.conn.execute(s)
442         row = r.fetchone()
443         r.close()
444         return row
445     
446     def statistics_update(self, node, population, size, mtime, cluster=0):
447         """Update the statistics of the given node.
448            Statistics keep track the population, total
449            size of objects and mtime in the node's namespace.
450            May be zero or positive or negative numbers.
451         """
452         s = select([self.statistics.c.population, self.statistics.c.size],
453             and_(self.statistics.c.node == node,
454                  self.statistics.c.cluster == cluster))
455         rp = self.conn.execute(s)
456         r = rp.fetchone()
457         rp.close()
458         if not r:
459             prepopulation, presize = (0, 0)
460         else:
461             prepopulation, presize = r
462         population += prepopulation
463         size += presize
464         
465         #insert or replace
466         #TODO better upsert
467         u = self.statistics.update().where(and_(self.statistics.c.node==node,
468                                            self.statistics.c.cluster==cluster))
469         u = u.values(population=population, size=size, mtime=mtime)
470         rp = self.conn.execute(u)
471         rp.close()
472         if rp.rowcount == 0:
473             ins = self.statistics.insert()
474             ins = ins.values(node=node, population=population, size=size,
475                              mtime=mtime, cluster=cluster)
476             self.conn.execute(ins).close()
477     
478     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
479         """Update the statistics of the given node's parent.
480            Then recursively update all parents up to the root.
481            Population is not recursive.
482         """
483         
484         while True:
485             if node == ROOTNODE:
486                 break
487             props = self.node_get_properties(node)
488             if props is None:
489                 break
490             parent, path = props
491             self.statistics_update(parent, population, size, mtime, cluster)
492             node = parent
493             population = 0 # Population isn't recursive
494     
495     def statistics_latest(self, node, before=inf, except_cluster=0):
496         """Return population, total size and last mtime
497            for all latest versions under node that
498            do not belong to the cluster.
499         """
500         
501         # The node.
502         props = self.node_get_properties(node)
503         if props is None:
504             return None
505         parent, path = props
506         
507         # The latest version.
508         s = select([self.versions.c.serial,
509                     self.versions.c.node,
510                     self.versions.c.hash,
511                     self.versions.c.size,
512                     self.versions.c.type,
513                     self.versions.c.source,
514                     self.versions.c.mtime,
515                     self.versions.c.muser,
516                     self.versions.c.uuid,
517                     self.versions.c.checksum,
518                     self.versions.c.cluster])
519         if before != inf:
520             filtered = select([func.max(self.versions.c.serial)],
521                             self.versions.c.node == node)
522             filtered = filtered.where(self.versions.c.mtime < before)
523         else:
524             filtered = select([self.nodes.c.latest_version],
525                             self.versions.c.node == node)
526         s = s.where(and_(self.versions.c.cluster != except_cluster,
527                          self.versions.c.serial == filtered))
528         r = self.conn.execute(s)
529         props = r.fetchone()
530         r.close()
531         if not props:
532             return None
533         mtime = props[MTIME]
534         
535         # First level, just under node (get population).
536         v = self.versions.alias('v')
537         s = select([func.count(v.c.serial),
538                     func.sum(v.c.size),
539                     func.max(v.c.mtime)])
540         if before != inf:
541             c1 = select([func.max(self.versions.c.serial)])
542             c1 = c1.where(self.versions.c.mtime < before)
543             c1.where(self.versions.c.node == v.c.node)
544         else:
545             c1 = select([self.nodes.c.latest_version])
546             c1.where(self.nodes.c.node == v.c.node)
547         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
548         s = s.where(and_(v.c.serial == c1,
549                          v.c.cluster != except_cluster,
550                          v.c.node.in_(c2)))
551         rp = self.conn.execute(s)
552         r = rp.fetchone()
553         rp.close()
554         if not r:
555             return None
556         count = r[0]
557         mtime = max(mtime, r[2])
558         if count == 0:
559             return (0, 0, mtime)
560         
561         # All children (get size and mtime).
562         # This is why the full path is stored.
563         s = select([func.count(v.c.serial),
564                     func.sum(v.c.size),
565                     func.max(v.c.mtime)])
566         if before != inf:
567             c1 = select([func.max(self.versions.c.serial)],
568                     self.versions.c.node == v.c.node)
569             c1 = c1.where(self.versions.c.mtime < before)
570         else:
571             c1 = select([self.nodes.c.serial],
572                     self.nodes.c.node == v.c.node)
573         c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
574         s = s.where(and_(v.c.serial == c1,
575                          v.c.cluster != except_cluster,
576                          v.c.node.in_(c2)))
577         rp = self.conn.execute(s)
578         r = rp.fetchone()
579         rp.close()
580         if not r:
581             return None
582         size = r[1] - props[SIZE]
583         mtime = max(mtime, r[2])
584         return (count, size, mtime)
585     
586     def nodes_set_latest_version(self, node, serial):
587         s = self.nodes.update().where(self.nodes.c.node == node)
588         s = s.values(latest_version = serial)
589         self.conn.execute(s).close()
590     
591     def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
592         """Create a new version from the given properties.
593            Return the (serial, mtime) of the new version.
594         """
595         
596         mtime = time()
597         s = self.versions.insert().values(node=node, hash=hash, size=size, type=type, source=source,
598                                           mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
599         serial = self.conn.execute(s).inserted_primary_key[0]
600         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
601         
602         self.nodes_set_latest_version(node, serial)
603         
604         return serial, mtime
605     
606     def version_lookup(self, node, before=inf, cluster=0, all_props=True):
607         """Lookup the current version of the given node.
608            Return a list with its properties:
609            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
610            or None if the current version is not found in the given cluster.
611         """
612         
613         v = self.versions.alias('v')
614         if not all_props:
615             s = select([v.c.serial])
616         else:
617             s = select([v.c.serial, v.c.node, v.c.hash,
618                         v.c.size, v.c.type, v.c.source,
619                         v.c.mtime, v.c.muser, v.c.uuid,
620                         v.c.checksum, v.c.cluster])
621         if before != inf:
622             c = select([func.max(self.versions.c.serial)],
623                 self.versions.c.node == node)
624             c = c.where(self.versions.c.mtime < before)
625         else:
626             c = select([self.nodes.c.latest_version],
627                 self.nodes.c.node == node)
628         s = s.where(and_(v.c.serial == c,
629                          v.c.cluster == cluster))
630         r = self.conn.execute(s)
631         props = r.fetchone()
632         r.close()
633         if props:
634             return props
635         return None
636     
637     def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
638         """Lookup the current versions of the given nodes.
639            Return a list with their properties:
640            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
641         """
642         if not nodes:
643             return ()
644         v = self.versions.alias('v')
645         if not all_props:
646             s = select([v.c.serial])
647         else:
648             s = select([v.c.serial, v.c.node, v.c.hash,
649                         v.c.size, v.c.type, v.c.source,
650                         v.c.mtime, v.c.muser, v.c.uuid,
651                         v.c.checksum, v.c.cluster])
652         if before != inf:
653             c = select([func.max(self.versions.c.serial)],
654                 self.versions.c.node.in_(nodes))
655             c = c.where(self.versions.c.mtime < before)
656             c = c.group_by(self.versions.c.node)
657         else:
658             c = select([self.nodes.c.latest_version],
659                 self.nodes.c.node.in_(nodes))
660         s = s.where(and_(v.c.serial.in_(c),
661                          v.c.cluster == cluster))
662         s = s.order_by(v.c.node)
663         r = self.conn.execute(s)
664         rproxy = r.fetchall()
665         r.close()
666         return (tuple(row.values()) for row in rproxy)
667         
668     def version_get_properties(self, serial, keys=(), propnames=_propnames):
669         """Return a sequence of values for the properties of
670            the version specified by serial and the keys, in the order given.
671            If keys is empty, return all properties in the order
672            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
673         """
674         
675         v = self.versions.alias()
676         s = select([v.c.serial, v.c.node, v.c.hash,
677                     v.c.size, v.c.type, v.c.source,
678                     v.c.mtime, v.c.muser, v.c.uuid,
679                     v.c.checksum, v.c.cluster], v.c.serial == serial)
680         rp = self.conn.execute(s)
681         r = rp.fetchone()
682         rp.close()
683         if r is None:
684             return r
685         
686         if not keys:
687             return r
688         return [r[propnames[k]] for k in keys if k in propnames]
689     
690     def version_put_property(self, serial, key, value):
691         """Set value for the property of version specified by key."""
692         
693         if key not in _propnames:
694             return
695         s = self.versions.update()
696         s = s.where(self.versions.c.serial == serial)
697         s = s.values(**{key: value})
698         self.conn.execute(s).close()
699     
700     def version_recluster(self, serial, cluster):
701         """Move the version into another cluster."""
702         
703         props = self.version_get_properties(serial)
704         if not props:
705             return
706         node = props[NODE]
707         size = props[SIZE]
708         oldcluster = props[CLUSTER]
709         if cluster == oldcluster:
710             return
711         
712         mtime = time()
713         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
714         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
715         
716         s = self.versions.update()
717         s = s.where(self.versions.c.serial == serial)
718         s = s.values(cluster = cluster)
719         self.conn.execute(s).close()
720     
721     def version_remove(self, serial):
722         """Remove the serial specified."""
723         
724         props = self.version_get_properties(serial)
725         if not props:
726             return
727         node = props[NODE]
728         hash = props[HASH]
729         size = props[SIZE]
730         cluster = props[CLUSTER]
731         
732         mtime = time()
733         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
734         
735         s = self.versions.delete().where(self.versions.c.serial == serial)
736         self.conn.execute(s).close()
737         
738         props = self.version_lookup(node, cluster=cluster, all_props=False)
739         if props:
740             self.nodes_set_latest_version(v.node, serial)
741         
742         return hash, size
743     
744     def attribute_get(self, serial, domain, keys=()):
745         """Return a list of (key, value) pairs of the version specified by serial.
746            If keys is empty, return all attributes.
747            Othwerise, return only those specified.
748         """
749         
750         if keys:
751             attrs = self.attributes.alias()
752             s = select([attrs.c.key, attrs.c.value])
753             s = s.where(and_(attrs.c.key.in_(keys),
754                              attrs.c.serial == serial,
755                              attrs.c.domain == domain))
756         else:
757             attrs = self.attributes.alias()
758             s = select([attrs.c.key, attrs.c.value])
759             s = s.where(and_(attrs.c.serial == serial,
760                              attrs.c.domain == domain))
761         r = self.conn.execute(s)
762         l = r.fetchall()
763         r.close()
764         return l
765     
766     def attribute_set(self, serial, domain, items):
767         """Set the attributes of the version specified by serial.
768            Receive attributes as an iterable of (key, value) pairs.
769         """
770         #insert or replace
771         #TODO better upsert
772         for k, v in items:
773             s = self.attributes.update()
774             s = s.where(and_(self.attributes.c.serial == serial,
775                              self.attributes.c.domain == domain,
776                              self.attributes.c.key == k))
777             s = s.values(value = v)
778             rp = self.conn.execute(s)
779             rp.close()
780             if rp.rowcount == 0:
781                 s = self.attributes.insert()
782                 s = s.values(serial=serial, domain=domain, key=k, value=v)
783                 self.conn.execute(s).close()
784     
785     def attribute_del(self, serial, domain, keys=()):
786         """Delete attributes of the version specified by serial.
787            If keys is empty, delete all attributes.
788            Otherwise delete those specified.
789         """
790         
791         if keys:
792             #TODO more efficient way to do this?
793             for key in keys:
794                 s = self.attributes.delete()
795                 s = s.where(and_(self.attributes.c.serial == serial,
796                                  self.attributes.c.domain == domain,
797                                  self.attributes.c.key == key))
798                 self.conn.execute(s).close()
799         else:
800             s = self.attributes.delete()
801             s = s.where(and_(self.attributes.c.serial == serial,
802                              self.attributes.c.domain == domain))
803             self.conn.execute(s).close()
804     
805     def attribute_copy(self, source, dest):
806         s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
807             self.attributes.c.serial == source)
808         rp = self.conn.execute(s)
809         attributes = rp.fetchall()
810         rp.close()
811         for dest, domain, k, v in attributes:
812             #insert or replace
813             s = self.attributes.update().where(and_(
814                 self.attributes.c.serial == dest,
815                 self.attributes.c.domain == domain,
816                 self.attributes.c.key == k))
817             rp = self.conn.execute(s, value=v)
818             rp.close()
819             if rp.rowcount == 0:
820                 s = self.attributes.insert()
821                 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
822                 self.conn.execute(s, values).close()
823     
824     def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
825         """Return a list with all keys pairs defined
826            for all latest versions under parent that
827            do not belong to the cluster.
828         """
829         
830         # TODO: Use another table to store before=inf results.
831         a = self.attributes.alias('a')
832         v = self.versions.alias('v')
833         n = self.nodes.alias('n')
834         s = select([a.c.key]).distinct()
835         if before != inf:
836             filtered = select([func.max(self.versions.c.serial)])
837             filtered = filtered.where(self.versions.c.mtime < before)
838             filtered = filtered.where(self.versions.c.node == v.c.node)
839         else:
840             filtered = select([self.nodes.c.latest_version])
841             filtered = filtered.where(self.nodes.c.node == v.c.node)
842         s = s.where(v.c.serial == filtered)
843         s = s.where(v.c.cluster != except_cluster)
844         s = s.where(v.c.node.in_(select([self.nodes.c.node],
845             self.nodes.c.parent == parent)))
846         s = s.where(a.c.serial == v.c.serial)
847         s = s.where(a.c.domain == domain)
848         s = s.where(n.c.node == v.c.node)
849         conj = []
850         for path, match in pathq:
851             if match == MATCH_PREFIX:
852                 conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
853             elif match == MATCH_EXACT:
854                 conj.append(n.c.path == path)
855         if conj:
856             s = s.where(or_(*conj))
857         rp = self.conn.execute(s)
858         rows = rp.fetchall()
859         rp.close()
860         return [r[0] for r in rows]
861     
862     def latest_version_list(self, parent, prefix='', delimiter=None,
863                             start='', limit=10000, before=inf,
864                             except_cluster=0, pathq=[], domain=None,
865                             filterq=[], sizeq=None, all_props=False):
866         """Return a (list of (path, serial) tuples, list of common prefixes)
867            for the current versions of the paths with the given parent,
868            matching the following criteria.
869            
870            The property tuple for a version is returned if all
871            of these conditions are true:
872                 
873                 a. parent matches
874                 
875                 b. path > start
876                 
877                 c. path starts with prefix (and paths in pathq)
878                 
879                 d. version is the max up to before
880                 
881                 e. version is not in cluster
882                 
883                 f. the path does not have the delimiter occuring
884                    after the prefix, or ends with the delimiter
885                 
886                 g. serial matches the attribute filter query.
887                    
888                    A filter query is a comma-separated list of
889                    terms in one of these three forms:
890                    
891                    key
892                        an attribute with this key must exist
893                    
894                    !key
895                        an attribute with this key must not exist
896                    
897                    key ?op value
898                        the attribute with this key satisfies the value
899                        where ?op is one of ==, != <=, >=, <, >.
900                 
901                 h. the size is in the range set by sizeq
902            
903            The list of common prefixes includes the prefixes
904            matching up to the first delimiter after prefix,
905            and are reported only once, as "virtual directories".
906            The delimiter is included in the prefixes.
907            
908            If arguments are None, then the corresponding matching rule
909            will always match.
910            
911            Limit applies to the first list of tuples returned.
912            
913            If all_props is True, return all properties after path, not just serial.
914         """
915         
916         if not start or start < prefix:
917             start = strprevling(prefix)
918         nextling = strnextling(prefix)
919         
920         v = self.versions.alias('v')
921         n = self.nodes.alias('n')
922         if not all_props:
923             s = select([n.c.path, v.c.serial]).distinct()
924         else:
925             s = select([n.c.path,
926                         v.c.serial, v.c.node, v.c.hash,
927                         v.c.size, v.c.type, v.c.source,
928                         v.c.mtime, v.c.muser, v.c.uuid,
929                         v.c.checksum, v.c.cluster]).distinct()
930         if before != inf:
931             filtered = select([func.max(self.versions.c.serial)])
932             filtered = filtered.where(self.versions.c.mtime < before)
933         else:
934             filtered = select([self.nodes.c.latest_version])
935         s = s.where(v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
936         s = s.where(v.c.cluster != except_cluster)
937         s = s.where(v.c.node.in_(select([self.nodes.c.node],
938             self.nodes.c.parent == parent)))
939         
940         s = s.where(n.c.node == v.c.node)
941         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
942         conj = []
943         for path, match in pathq:
944             if match == MATCH_PREFIX:
945                 conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
946             elif match == MATCH_EXACT:
947                 conj.append(n.c.path == path)
948         if conj:
949             s = s.where(or_(*conj))
950         
951         if sizeq and len(sizeq) == 2:
952             if sizeq[0]:
953                 s = s.where(v.c.size >= sizeq[0])
954             if sizeq[1]:
955                 s = s.where(v.c.size < sizeq[1])
956         
957         if domain and filterq:
958             a = self.attributes.alias('a')
959             included, excluded, opers = parse_filters(filterq)
960             if included:
961                 subs = select([1])
962                 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
963                 subs = subs.where(a.c.domain == domain)
964                 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
965                 s = s.where(exists(subs))
966             if excluded:
967                 subs = select([1])
968                 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
969                 subs = subs.where(a.c.domain == domain)
970                 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
971                 s = s.where(not_(exists(subs)))
972             if opers:
973                 for k, o, val in opers:
974                     subs = select([1])
975                     subs = subs.where(a.c.serial == v.c.serial).correlate(v)
976                     subs = subs.where(a.c.domain == domain)
977                     subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
978                     s = s.where(exists(subs))
979         
980         s = s.order_by(n.c.path)
981         
982         if not delimiter:
983             s = s.limit(limit)
984             rp = self.conn.execute(s, start=start)
985             r = rp.fetchall()
986             rp.close()
987             return r, ()
988         
989         pfz = len(prefix)
990         dz = len(delimiter)
991         count = 0
992         prefixes = []
993         pappend = prefixes.append
994         matches = []
995         mappend = matches.append
996         
997         rp = self.conn.execute(s, start=start)
998         while True:
999             props = rp.fetchone()
1000             if props is None:
1001                 break
1002             path = props[0]
1003             serial = props[1]
1004             idx = path.find(delimiter, pfz)
1005             
1006             if idx < 0:
1007                 mappend(props)
1008                 count += 1
1009                 if count >= limit:
1010                     break
1011                 continue
1012             
1013             if idx + dz == len(path):
1014                 mappend(props)
1015                 count += 1
1016                 continue # Get one more, in case there is a path.
1017             pf = path[:idx + dz]
1018             pappend(pf)
1019             if count >= limit: 
1020                 break
1021             
1022             rp = self.conn.execute(s, start=strnextling(pf)) # New start.
1023         rp.close()
1024         
1025         return matches, prefixes
1026     
1027     def latest_uuid(self, uuid):
1028         """Return a (path, serial) tuple, for the latest version of the given uuid."""
1029         
1030         v = self.versions.alias('v')
1031         n = self.nodes.alias('n')
1032         s = select([n.c.path, v.c.serial])
1033         filtered = select([func.max(self.versions.c.serial)])
1034         s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
1035         s = s.where(n.c.node == v.c.node)
1036         
1037         r = self.conn.execute(s)
1038         l = r.fetchone()
1039         r.close()
1040         return l