Change QuotaholderSync to QuotaholderSerial
[pithos] / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41 from sqlalchemy.exc import NoSuchTableError
42
43 from dbworker import DBWorker
44
45 from pithos.backends.filter import parse_filters
46
47
48 ROOTNODE = 0
49
50 (SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
51  CLUSTER) = range(11)
52
53 (MATCH_PREFIX, MATCH_EXACT) = range(2)
54
55 inf = float('inf')
56
57
58 def strnextling(prefix):
59     """Return the first unicode string
60        greater than but not starting with given prefix.
61        strnextling('hello') -> 'hellp'
62     """
63     if not prefix:
64         ## all strings start with the null string,
65         ## therefore we have to approximate strnextling('')
66         ## with the last unicode character supported by python
67         ## 0x10ffff for wide (32-bit unicode) python builds
68         ## 0x00ffff for narrow (16-bit unicode) python builds
69         ## We will not autodetect. 0xffff is safe enough.
70         return unichr(0xffff)
71     s = prefix[:-1]
72     c = ord(prefix[-1])
73     if c >= 0xffff:
74         raise RuntimeError
75     s += unichr(c + 1)
76     return s
77
78
79 def strprevling(prefix):
80     """Return an approximation of the last unicode string
81        less than but not starting with given prefix.
82        strprevling(u'hello') -> u'helln\\xffff'
83     """
84     if not prefix:
85         ## There is no prevling for the null string
86         return prefix
87     s = prefix[:-1]
88     c = ord(prefix[-1])
89     if c > 0:
90         s += unichr(c - 1) + unichr(0xffff)
91     return s
92
93 _propnames = {
94     'serial': 0,
95     'node': 1,
96     'hash': 2,
97     'size': 3,
98     'type': 4,
99     'source': 5,
100     'mtime': 6,
101     'muser': 7,
102     'uuid': 8,
103     'checksum': 9,
104     'cluster': 10
105 }
106
107
108 def create_tables(engine):
109     metadata = MetaData()
110
111     #create nodes table
112     columns = []
113     columns.append(Column('node', Integer, primary_key=True))
114     columns.append(Column('parent', Integer,
115                           ForeignKey('nodes.node',
116                                      ondelete='CASCADE',
117                                      onupdate='CASCADE'),
118                           autoincrement=False))
119     columns.append(Column('latest_version', Integer))
120     columns.append(Column('path', String(2048), default='', nullable=False))
121     nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
122     Index('idx_nodes_path', nodes.c.path, unique=True)
123     Index('idx_nodes_parent', nodes.c.parent)
124
125     #create policy table
126     columns = []
127     columns.append(Column('node', Integer,
128                           ForeignKey('nodes.node',
129                                      ondelete='CASCADE',
130                                      onupdate='CASCADE'),
131                           primary_key=True))
132     columns.append(Column('key', String(128), primary_key=True))
133     columns.append(Column('value', String(256)))
134     policy = Table('policy', metadata, *columns, mysql_engine='InnoDB')
135
136     #create statistics table
137     columns = []
138     columns.append(Column('node', Integer,
139                           ForeignKey('nodes.node',
140                                      ondelete='CASCADE',
141                                      onupdate='CASCADE'),
142                           primary_key=True))
143     columns.append(Column('population', Integer, nullable=False, default=0))
144     columns.append(Column('size', BigInteger, nullable=False, default=0))
145     columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
146     columns.append(Column('cluster', Integer, nullable=False, default=0,
147                           primary_key=True, autoincrement=False))
148     statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
149
150     #create versions table
151     columns = []
152     columns.append(Column('serial', Integer, primary_key=True))
153     columns.append(Column('node', Integer,
154                           ForeignKey('nodes.node',
155                                      ondelete='CASCADE',
156                                      onupdate='CASCADE')))
157     columns.append(Column('hash', String(256)))
158     columns.append(Column('size', BigInteger, nullable=False, default=0))
159     columns.append(Column('type', String(256), nullable=False, default=''))
160     columns.append(Column('source', Integer))
161     columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
162     columns.append(Column('muser', String(256), nullable=False, default=''))
163     columns.append(Column('uuid', String(64), nullable=False, default=''))
164     columns.append(Column('checksum', String(256), nullable=False, default=''))
165     columns.append(Column('cluster', Integer, nullable=False, default=0))
166     versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
167     Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
168     Index('idx_versions_node_uuid', versions.c.uuid)
169
170     #create attributes table
171     columns = []
172     columns.append(Column('serial', Integer,
173                           ForeignKey('versions.serial',
174                                      ondelete='CASCADE',
175                                      onupdate='CASCADE'),
176                           primary_key=True))
177     columns.append(Column('domain', String(256), primary_key=True))
178     columns.append(Column('key', String(128), primary_key=True))
179     columns.append(Column('value', String(256)))
180     attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
181
182     metadata.create_all(engine)
183     return metadata.sorted_tables
184
185
186 class Node(DBWorker):
187     """Nodes store path organization and have multiple versions.
188        Versions store object history and have multiple attributes.
189        Attributes store metadata.
190     """
191
192     # TODO: Provide an interface for included and excluded clusters.
193
194     def __init__(self, **params):
195         DBWorker.__init__(self, **params)
196         try:
197             metadata = MetaData(self.engine)
198             self.nodes = Table('nodes', metadata, autoload=True)
199             self.policy = Table('policy', metadata, autoload=True)
200             self.statistics = Table('statistics', metadata, autoload=True)
201             self.versions = Table('versions', metadata, autoload=True)
202             self.attributes = Table('attributes', metadata, autoload=True)
203         except NoSuchTableError:
204             tables = create_tables(self.engine)
205             map(lambda t: self.__setattr__(t.name, t), tables)
206
207         s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
208                                            self.nodes.c.parent == ROOTNODE))
209         rp = self.conn.execute(s)
210         r = rp.fetchone()
211         rp.close()
212         if not r:
213             s = self.nodes.insert(
214             ).values(node=ROOTNODE, parent=ROOTNODE, path='')
215             self.conn.execute(s)
216
217     def node_create(self, parent, path):
218         """Create a new node from the given properties.
219            Return the node identifier of the new node.
220         """
221         #TODO catch IntegrityError?
222         s = self.nodes.insert().values(parent=parent, path=path)
223         r = self.conn.execute(s)
224         inserted_primary_key = r.inserted_primary_key[0]
225         r.close()
226         return inserted_primary_key
227
228     def node_lookup(self, path):
229         """Lookup the current node of the given path.
230            Return None if the path is not found.
231         """
232
233         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
234         s = select([self.nodes.c.node], self.nodes.c.path.like(
235             self.escape_like(path), escape='\\'))
236         r = self.conn.execute(s)
237         row = r.fetchone()
238         r.close()
239         if row:
240             return row[0]
241         return None
242
243     def node_lookup_bulk(self, paths):
244         """Lookup the current nodes for the given paths.
245            Return () if the path is not found.
246         """
247
248         # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
249         s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
250         r = self.conn.execute(s)
251         rows = r.fetchall()
252         r.close()
253         return [row[0] for row in rows]
254
255     def node_get_properties(self, node):
256         """Return the node's (parent, path).
257            Return None if the node is not found.
258         """
259
260         s = select([self.nodes.c.parent, self.nodes.c.path])
261         s = s.where(self.nodes.c.node == node)
262         r = self.conn.execute(s)
263         l = r.fetchone()
264         r.close()
265         return l
266
267     def node_get_versions(self, node, keys=(), propnames=_propnames):
268         """Return the properties of all versions at node.
269            If keys is empty, return all properties in the order
270            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
271         """
272
273         s = select([self.versions.c.serial,
274                     self.versions.c.node,
275                     self.versions.c.hash,
276                     self.versions.c.size,
277                     self.versions.c.type,
278                     self.versions.c.source,
279                     self.versions.c.mtime,
280                     self.versions.c.muser,
281                     self.versions.c.uuid,
282                     self.versions.c.checksum,
283                     self.versions.c.cluster], self.versions.c.node == node)
284         s = s.order_by(self.versions.c.serial)
285         r = self.conn.execute(s)
286         rows = r.fetchall()
287         r.close()
288         if not rows:
289             return rows
290
291         if not keys:
292             return rows
293
294         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
295
296     def node_count_children(self, node):
297         """Return node's child count."""
298
299         s = select([func.count(self.nodes.c.node)])
300         s = s.where(and_(self.nodes.c.parent == node,
301                          self.nodes.c.node != ROOTNODE))
302         r = self.conn.execute(s)
303         row = r.fetchone()
304         r.close()
305         return row[0]
306
307     def node_purge_children(self, parent, before=inf, cluster=0):
308         """Delete all versions with the specified
309            parent and cluster, and return
310            the hashes and size of versions deleted.
311            Clears out nodes with no remaining versions.
312         """
313         #update statistics
314         c1 = select([self.nodes.c.node],
315                     self.nodes.c.parent == parent)
316         where_clause = and_(self.versions.c.node.in_(c1),
317                             self.versions.c.cluster == cluster)
318         s = select([func.count(self.versions.c.serial),
319                     func.sum(self.versions.c.size)])
320         s = s.where(where_clause)
321         if before != inf:
322             s = s.where(self.versions.c.mtime <= before)
323         r = self.conn.execute(s)
324         row = r.fetchone()
325         r.close()
326         if not row:
327             return (), 0
328         nr, size = row[0], -row[1] if row[1] else 0
329         mtime = time()
330         self.statistics_update(parent, -nr, size, mtime, cluster)
331         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
332
333         s = select([self.versions.c.hash])
334         s = s.where(where_clause)
335         r = self.conn.execute(s)
336         hashes = [row[0] for row in r.fetchall()]
337         r.close()
338
339         #delete versions
340         s = self.versions.delete().where(where_clause)
341         r = self.conn.execute(s)
342         r.close()
343
344         #delete nodes
345         s = select([self.nodes.c.node],
346                    and_(self.nodes.c.parent == parent,
347                         select([func.count(self.versions.c.serial)],
348                                self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
349         rp = self.conn.execute(s)
350         nodes = [r[0] for r in rp.fetchall()]
351         rp.close()
352         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
353         self.conn.execute(s).close()
354
355         return hashes, size
356
357     def node_purge(self, node, before=inf, cluster=0):
358         """Delete all versions with the specified
359            node and cluster, and return
360            the hashes and size of versions deleted.
361            Clears out the node if it has no remaining versions.
362         """
363
364         #update statistics
365         s = select([func.count(self.versions.c.serial),
366                     func.sum(self.versions.c.size)])
367         where_clause = and_(self.versions.c.node == node,
368                             self.versions.c.cluster == cluster)
369         s = s.where(where_clause)
370         if before != inf:
371             s = s.where(self.versions.c.mtime <= before)
372         r = self.conn.execute(s)
373         row = r.fetchone()
374         nr, size = row[0], row[1]
375         r.close()
376         if not nr:
377             return (), 0
378         mtime = time()
379         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
380
381         s = select([self.versions.c.hash])
382         s = s.where(where_clause)
383         r = self.conn.execute(s)
384         hashes = [r[0] for r in r.fetchall()]
385         r.close()
386
387         #delete versions
388         s = self.versions.delete().where(where_clause)
389         r = self.conn.execute(s)
390         r.close()
391
392         #delete nodes
393         s = select([self.nodes.c.node],
394                    and_(self.nodes.c.node == node,
395                         select([func.count(self.versions.c.serial)],
396                                self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
397         r = self.conn.execute(s)
398         nodes = r.fetchall()
399         r.close()
400         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
401         self.conn.execute(s).close()
402
403         return hashes, size
404
405     def node_remove(self, node):
406         """Remove the node specified.
407            Return false if the node has children or is not found.
408         """
409
410         if self.node_count_children(node):
411             return False
412
413         mtime = time()
414         s = select([func.count(self.versions.c.serial),
415                     func.sum(self.versions.c.size),
416                     self.versions.c.cluster])
417         s = s.where(self.versions.c.node == node)
418         s = s.group_by(self.versions.c.cluster)
419         r = self.conn.execute(s)
420         for population, size, cluster in r.fetchall():
421             self.statistics_update_ancestors(
422                 node, -population, -size, mtime, cluster)
423         r.close()
424
425         s = self.nodes.delete().where(self.nodes.c.node == node)
426         self.conn.execute(s).close()
427         return True
428
429     def policy_get(self, node):
430         s = select([self.policy.c.key, self.policy.c.value],
431                    self.policy.c.node == node)
432         r = self.conn.execute(s)
433         d = dict(r.fetchall())
434         r.close()
435         return d
436
437     def policy_set(self, node, policy):
438         #insert or replace
439         for k, v in policy.iteritems():
440             s = self.policy.update().where(and_(self.policy.c.node == node,
441                                                 self.policy.c.key == k))
442             s = s.values(value=v)
443             rp = self.conn.execute(s)
444             rp.close()
445             if rp.rowcount == 0:
446                 s = self.policy.insert()
447                 values = {'node': node, 'key': k, 'value': v}
448                 r = self.conn.execute(s, values)
449                 r.close()
450
451     def statistics_get(self, node, cluster=0):
452         """Return population, total size and last mtime
453            for all versions under node that belong to the cluster.
454         """
455
456         s = select([self.statistics.c.population,
457                     self.statistics.c.size,
458                     self.statistics.c.mtime])
459         s = s.where(and_(self.statistics.c.node == node,
460                          self.statistics.c.cluster == cluster))
461         r = self.conn.execute(s)
462         row = r.fetchone()
463         r.close()
464         return row
465
466     def statistics_update(self, node, population, size, mtime, cluster=0):
467         """Update the statistics of the given node.
468            Statistics keep track the population, total
469            size of objects and mtime in the node's namespace.
470            May be zero or positive or negative numbers.
471         """
472         s = select([self.statistics.c.population, self.statistics.c.size],
473                    and_(self.statistics.c.node == node,
474                         self.statistics.c.cluster == cluster))
475         rp = self.conn.execute(s)
476         r = rp.fetchone()
477         rp.close()
478         if not r:
479             prepopulation, presize = (0, 0)
480         else:
481             prepopulation, presize = r
482         population += prepopulation
483         size += presize
484
485         #insert or replace
486         #TODO better upsert
487         u = self.statistics.update().where(and_(self.statistics.c.node == node,
488                                            self.statistics.c.cluster == cluster))
489         u = u.values(population=population, size=size, mtime=mtime)
490         rp = self.conn.execute(u)
491         rp.close()
492         if rp.rowcount == 0:
493             ins = self.statistics.insert()
494             ins = ins.values(node=node, population=population, size=size,
495                              mtime=mtime, cluster=cluster)
496             self.conn.execute(ins).close()
497
498     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
499         """Update the statistics of the given node's parent.
500            Then recursively update all parents up to the root.
501            Population is not recursive.
502         """
503
504         while True:
505             if node == ROOTNODE:
506                 break
507             props = self.node_get_properties(node)
508             if props is None:
509                 break
510             parent, path = props
511             self.statistics_update(parent, population, size, mtime, cluster)
512             node = parent
513             population = 0  # Population isn't recursive
514
515     def statistics_latest(self, node, before=inf, except_cluster=0):
516         """Return population, total size and last mtime
517            for all latest versions under node that
518            do not belong to the cluster.
519         """
520
521         # The node.
522         props = self.node_get_properties(node)
523         if props is None:
524             return None
525         parent, path = props
526
527         # The latest version.
528         s = select([self.versions.c.serial,
529                     self.versions.c.node,
530                     self.versions.c.hash,
531                     self.versions.c.size,
532                     self.versions.c.type,
533                     self.versions.c.source,
534                     self.versions.c.mtime,
535                     self.versions.c.muser,
536                     self.versions.c.uuid,
537                     self.versions.c.checksum,
538                     self.versions.c.cluster])
539         if before != inf:
540             filtered = select([func.max(self.versions.c.serial)],
541                               self.versions.c.node == node)
542             filtered = filtered.where(self.versions.c.mtime < before)
543         else:
544             filtered = select([self.nodes.c.latest_version],
545                               self.versions.c.node == node)
546         s = s.where(and_(self.versions.c.cluster != except_cluster,
547                          self.versions.c.serial == filtered))
548         r = self.conn.execute(s)
549         props = r.fetchone()
550         r.close()
551         if not props:
552             return None
553         mtime = props[MTIME]
554
555         # First level, just under node (get population).
556         v = self.versions.alias('v')
557         s = select([func.count(v.c.serial),
558                     func.sum(v.c.size),
559                     func.max(v.c.mtime)])
560         if before != inf:
561             c1 = select([func.max(self.versions.c.serial)])
562             c1 = c1.where(self.versions.c.mtime < before)
563             c1.where(self.versions.c.node == v.c.node)
564         else:
565             c1 = select([self.nodes.c.latest_version])
566             c1.where(self.nodes.c.node == v.c.node)
567         c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
568         s = s.where(and_(v.c.serial == c1,
569                          v.c.cluster != except_cluster,
570                          v.c.node.in_(c2)))
571         rp = self.conn.execute(s)
572         r = rp.fetchone()
573         rp.close()
574         if not r:
575             return None
576         count = r[0]
577         mtime = max(mtime, r[2])
578         if count == 0:
579             return (0, 0, mtime)
580
581         # All children (get size and mtime).
582         # This is why the full path is stored.
583         s = select([func.count(v.c.serial),
584                     func.sum(v.c.size),
585                     func.max(v.c.mtime)])
586         if before != inf:
587             c1 = select([func.max(self.versions.c.serial)],
588                         self.versions.c.node == v.c.node)
589             c1 = c1.where(self.versions.c.mtime < before)
590         else:
591             c1 = select([self.nodes.c.serial],
592                         self.nodes.c.node == v.c.node)
593         c2 = select([self.nodes.c.node], self.nodes.c.path.like(
594             self.escape_like(path) + '%', escape='\\'))
595         s = s.where(and_(v.c.serial == c1,
596                          v.c.cluster != except_cluster,
597                          v.c.node.in_(c2)))
598         rp = self.conn.execute(s)
599         r = rp.fetchone()
600         rp.close()
601         if not r:
602             return None
603         size = r[1] - props[SIZE]
604         mtime = max(mtime, r[2])
605         return (count, size, mtime)
606
607     def nodes_set_latest_version(self, node, serial):
608         s = self.nodes.update().where(self.nodes.c.node == node)
609         s = s.values(latest_version=serial)
610         self.conn.execute(s).close()
611
612     def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
613         """Create a new version from the given properties.
614            Return the (serial, mtime) of the new version.
615         """
616
617         mtime = time()
618         s = self.versions.insert(
619         ).values(node=node, hash=hash, size=size, type=type, source=source,
620                  mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
621         serial = self.conn.execute(s).inserted_primary_key[0]
622         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
623
624         self.nodes_set_latest_version(node, serial)
625
626         return serial, mtime
627
628     def version_lookup(self, node, before=inf, cluster=0, all_props=True):
629         """Lookup the current version of the given node.
630            Return a list with its properties:
631            (serial, node, hash, size, type, source, mtime,
632             muser, uuid, checksum, cluster)
633            or None if the current version is not found in the given cluster.
634         """
635
636         v = self.versions.alias('v')
637         if not all_props:
638             s = select([v.c.serial])
639         else:
640             s = select([v.c.serial, v.c.node, v.c.hash,
641                         v.c.size, v.c.type, v.c.source,
642                         v.c.mtime, v.c.muser, v.c.uuid,
643                         v.c.checksum, v.c.cluster])
644         if before != inf:
645             c = select([func.max(self.versions.c.serial)],
646                        self.versions.c.node == node)
647             c = c.where(self.versions.c.mtime < before)
648         else:
649             c = select([self.nodes.c.latest_version],
650                        self.nodes.c.node == node)
651         s = s.where(and_(v.c.serial == c,
652                          v.c.cluster == cluster))
653         r = self.conn.execute(s)
654         props = r.fetchone()
655         r.close()
656         if props:
657             return props
658         return None
659
660     def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
661         """Lookup the current versions of the given nodes.
662            Return a list with their properties:
663            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
664         """
665         if not nodes:
666             return ()
667         v = self.versions.alias('v')
668         if not all_props:
669             s = select([v.c.serial])
670         else:
671             s = select([v.c.serial, v.c.node, v.c.hash,
672                         v.c.size, v.c.type, v.c.source,
673                         v.c.mtime, v.c.muser, v.c.uuid,
674                         v.c.checksum, v.c.cluster])
675         if before != inf:
676             c = select([func.max(self.versions.c.serial)],
677                        self.versions.c.node.in_(nodes))
678             c = c.where(self.versions.c.mtime < before)
679             c = c.group_by(self.versions.c.node)
680         else:
681             c = select([self.nodes.c.latest_version],
682                        self.nodes.c.node.in_(nodes))
683         s = s.where(and_(v.c.serial.in_(c),
684                          v.c.cluster == cluster))
685         s = s.order_by(v.c.node)
686         r = self.conn.execute(s)
687         rproxy = r.fetchall()
688         r.close()
689         return (tuple(row.values()) for row in rproxy)
690
691     def version_get_properties(self, serial, keys=(), propnames=_propnames):
692         """Return a sequence of values for the properties of
693            the version specified by serial and the keys, in the order given.
694            If keys is empty, return all properties in the order
695            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
696         """
697
698         v = self.versions.alias()
699         s = select([v.c.serial, v.c.node, v.c.hash,
700                     v.c.size, v.c.type, v.c.source,
701                     v.c.mtime, v.c.muser, v.c.uuid,
702                     v.c.checksum, v.c.cluster], v.c.serial == serial)
703         rp = self.conn.execute(s)
704         r = rp.fetchone()
705         rp.close()
706         if r is None:
707             return r
708
709         if not keys:
710             return r
711         return [r[propnames[k]] for k in keys if k in propnames]
712
713     def version_put_property(self, serial, key, value):
714         """Set value for the property of version specified by key."""
715
716         if key not in _propnames:
717             return
718         s = self.versions.update()
719         s = s.where(self.versions.c.serial == serial)
720         s = s.values(**{key: value})
721         self.conn.execute(s).close()
722
723     def version_recluster(self, serial, cluster):
724         """Move the version into another cluster."""
725
726         props = self.version_get_properties(serial)
727         if not props:
728             return
729         node = props[NODE]
730         size = props[SIZE]
731         oldcluster = props[CLUSTER]
732         if cluster == oldcluster:
733             return
734
735         mtime = time()
736         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
737         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
738
739         s = self.versions.update()
740         s = s.where(self.versions.c.serial == serial)
741         s = s.values(cluster=cluster)
742         self.conn.execute(s).close()
743
744     def version_remove(self, serial):
745         """Remove the serial specified."""
746
747         props = self.version_get_properties(serial)
748         if not props:
749             return
750         node = props[NODE]
751         hash = props[HASH]
752         size = props[SIZE]
753         cluster = props[CLUSTER]
754
755         mtime = time()
756         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
757
758         s = self.versions.delete().where(self.versions.c.serial == serial)
759         self.conn.execute(s).close()
760
761         props = self.version_lookup(node, cluster=cluster, all_props=False)
762         if props:
763             self.nodes_set_latest_version(v.node, serial)
764
765         return hash, size
766
767     def attribute_get(self, serial, domain, keys=()):
768         """Return a list of (key, value) pairs of the version specified by serial.
769            If keys is empty, return all attributes.
770            Othwerise, return only those specified.
771         """
772
773         if keys:
774             attrs = self.attributes.alias()
775             s = select([attrs.c.key, attrs.c.value])
776             s = s.where(and_(attrs.c.key.in_(keys),
777                              attrs.c.serial == serial,
778                              attrs.c.domain == domain))
779         else:
780             attrs = self.attributes.alias()
781             s = select([attrs.c.key, attrs.c.value])
782             s = s.where(and_(attrs.c.serial == serial,
783                              attrs.c.domain == domain))
784         r = self.conn.execute(s)
785         l = r.fetchall()
786         r.close()
787         return l
788
789     def attribute_set(self, serial, domain, items):
790         """Set the attributes of the version specified by serial.
791            Receive attributes as an iterable of (key, value) pairs.
792         """
793         #insert or replace
794         #TODO better upsert
795         for k, v in items:
796             s = self.attributes.update()
797             s = s.where(and_(self.attributes.c.serial == serial,
798                              self.attributes.c.domain == domain,
799                              self.attributes.c.key == k))
800             s = s.values(value=v)
801             rp = self.conn.execute(s)
802             rp.close()
803             if rp.rowcount == 0:
804                 s = self.attributes.insert()
805                 s = s.values(serial=serial, domain=domain, key=k, value=v)
806                 self.conn.execute(s).close()
807
808     def attribute_del(self, serial, domain, keys=()):
809         """Delete attributes of the version specified by serial.
810            If keys is empty, delete all attributes.
811            Otherwise delete those specified.
812         """
813
814         if keys:
815             #TODO more efficient way to do this?
816             for key in keys:
817                 s = self.attributes.delete()
818                 s = s.where(and_(self.attributes.c.serial == serial,
819                                  self.attributes.c.domain == domain,
820                                  self.attributes.c.key == key))
821                 self.conn.execute(s).close()
822         else:
823             s = self.attributes.delete()
824             s = s.where(and_(self.attributes.c.serial == serial,
825                              self.attributes.c.domain == domain))
826             self.conn.execute(s).close()
827
828     def attribute_copy(self, source, dest):
829         s = select(
830             [dest, self.attributes.c.domain,
831                 self.attributes.c.key, self.attributes.c.value],
832             self.attributes.c.serial == source)
833         rp = self.conn.execute(s)
834         attributes = rp.fetchall()
835         rp.close()
836         for dest, domain, k, v in attributes:
837             #insert or replace
838             s = self.attributes.update().where(and_(
839                 self.attributes.c.serial == dest,
840                 self.attributes.c.domain == domain,
841                 self.attributes.c.key == k))
842             rp = self.conn.execute(s, value=v)
843             rp.close()
844             if rp.rowcount == 0:
845                 s = self.attributes.insert()
846                 values = {'serial': dest, 'domain': domain,
847                           'key': k, 'value': v}
848                 self.conn.execute(s, values).close()
849
850     def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
851         """Return a list with all keys pairs defined
852            for all latest versions under parent that
853            do not belong to the cluster.
854         """
855
856         # TODO: Use another table to store before=inf results.
857         a = self.attributes.alias('a')
858         v = self.versions.alias('v')
859         n = self.nodes.alias('n')
860         s = select([a.c.key]).distinct()
861         if before != inf:
862             filtered = select([func.max(self.versions.c.serial)])
863             filtered = filtered.where(self.versions.c.mtime < before)
864             filtered = filtered.where(self.versions.c.node == v.c.node)
865         else:
866             filtered = select([self.nodes.c.latest_version])
867             filtered = filtered.where(self.nodes.c.node == v.c.node)
868         s = s.where(v.c.serial == filtered)
869         s = s.where(v.c.cluster != except_cluster)
870         s = s.where(v.c.node.in_(select([self.nodes.c.node],
871                                         self.nodes.c.parent == parent)))
872         s = s.where(a.c.serial == v.c.serial)
873         s = s.where(a.c.domain == domain)
874         s = s.where(n.c.node == v.c.node)
875         conj = []
876         for path, match in pathq:
877             if match == MATCH_PREFIX:
878                 conj.append(
879                     n.c.path.like(self.escape_like(path) + '%', escape='\\'))
880             elif match == MATCH_EXACT:
881                 conj.append(n.c.path == path)
882         if conj:
883             s = s.where(or_(*conj))
884         rp = self.conn.execute(s)
885         rows = rp.fetchall()
886         rp.close()
887         return [r[0] for r in rows]
888
889     def latest_version_list(self, parent, prefix='', delimiter=None,
890                             start='', limit=10000, before=inf,
891                             except_cluster=0, pathq=[], domain=None,
892                             filterq=[], sizeq=None, all_props=False):
893         """Return a (list of (path, serial) tuples, list of common prefixes)
894            for the current versions of the paths with the given parent,
895            matching the following criteria.
896
897            The property tuple for a version is returned if all
898            of these conditions are true:
899
900                 a. parent matches
901
902                 b. path > start
903
904                 c. path starts with prefix (and paths in pathq)
905
906                 d. version is the max up to before
907
908                 e. version is not in cluster
909
910                 f. the path does not have the delimiter occuring
911                    after the prefix, or ends with the delimiter
912
913                 g. serial matches the attribute filter query.
914
915                    A filter query is a comma-separated list of
916                    terms in one of these three forms:
917
918                    key
919                        an attribute with this key must exist
920
921                    !key
922                        an attribute with this key must not exist
923
924                    key ?op value
925                        the attribute with this key satisfies the value
926                        where ?op is one of ==, != <=, >=, <, >.
927
928                 h. the size is in the range set by sizeq
929
930            The list of common prefixes includes the prefixes
931            matching up to the first delimiter after prefix,
932            and are reported only once, as "virtual directories".
933            The delimiter is included in the prefixes.
934
935            If arguments are None, then the corresponding matching rule
936            will always match.
937
938            Limit applies to the first list of tuples returned.
939
940            If all_props is True, return all properties after path, not just serial.
941         """
942
943         if not start or start < prefix:
944             start = strprevling(prefix)
945         nextling = strnextling(prefix)
946
947         v = self.versions.alias('v')
948         n = self.nodes.alias('n')
949         if not all_props:
950             s = select([n.c.path, v.c.serial]).distinct()
951         else:
952             s = select([n.c.path,
953                         v.c.serial, v.c.node, v.c.hash,
954                         v.c.size, v.c.type, v.c.source,
955                         v.c.mtime, v.c.muser, v.c.uuid,
956                         v.c.checksum, v.c.cluster]).distinct()
957         if before != inf:
958             filtered = select([func.max(self.versions.c.serial)])
959             filtered = filtered.where(self.versions.c.mtime < before)
960         else:
961             filtered = select([self.nodes.c.latest_version])
962         s = s.where(
963             v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
964         s = s.where(v.c.cluster != except_cluster)
965         s = s.where(v.c.node.in_(select([self.nodes.c.node],
966                                         self.nodes.c.parent == parent)))
967
968         s = s.where(n.c.node == v.c.node)
969         s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
970         conj = []
971         for path, match in pathq:
972             if match == MATCH_PREFIX:
973                 conj.append(
974                     n.c.path.like(self.escape_like(path) + '%', escape='\\'))
975             elif match == MATCH_EXACT:
976                 conj.append(n.c.path == path)
977         if conj:
978             s = s.where(or_(*conj))
979
980         if sizeq and len(sizeq) == 2:
981             if sizeq[0]:
982                 s = s.where(v.c.size >= sizeq[0])
983             if sizeq[1]:
984                 s = s.where(v.c.size < sizeq[1])
985
986         if domain and filterq:
987             a = self.attributes.alias('a')
988             included, excluded, opers = parse_filters(filterq)
989             if included:
990                 subs = select([1])
991                 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
992                 subs = subs.where(a.c.domain == domain)
993                 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
994                 s = s.where(exists(subs))
995             if excluded:
996                 subs = select([1])
997                 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
998                 subs = subs.where(a.c.domain == domain)
999                 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1000                 s = s.where(not_(exists(subs)))
1001             if opers:
1002                 for k, o, val in opers:
1003                     subs = select([1])
1004                     subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1005                     subs = subs.where(a.c.domain == domain)
1006                     subs = subs.where(
1007                         and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1008                     s = s.where(exists(subs))
1009
1010         s = s.order_by(n.c.path)
1011
1012         if not delimiter:
1013             s = s.limit(limit)
1014             rp = self.conn.execute(s, start=start)
1015             r = rp.fetchall()
1016             rp.close()
1017             return r, ()
1018
1019         pfz = len(prefix)
1020         dz = len(delimiter)
1021         count = 0
1022         prefixes = []
1023         pappend = prefixes.append
1024         matches = []
1025         mappend = matches.append
1026
1027         rp = self.conn.execute(s, start=start)
1028         while True:
1029             props = rp.fetchone()
1030             if props is None:
1031                 break
1032             path = props[0]
1033             serial = props[1]
1034             idx = path.find(delimiter, pfz)
1035
1036             if idx < 0:
1037                 mappend(props)
1038                 count += 1
1039                 if count >= limit:
1040                     break
1041                 continue
1042
1043             if idx + dz == len(path):
1044                 mappend(props)
1045                 count += 1
1046                 continue  # Get one more, in case there is a path.
1047             pf = path[:idx + dz]
1048             pappend(pf)
1049             if count >= limit:
1050                 break
1051
1052             rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1053         rp.close()
1054
1055         return matches, prefixes
1056
1057     def latest_uuid(self, uuid):
1058         """Return a (path, serial) tuple, for the latest version of the given uuid."""
1059
1060         v = self.versions.alias('v')
1061         n = self.nodes.alias('n')
1062         s = select([n.c.path, v.c.serial])
1063         filtered = select([func.max(self.versions.c.serial)])
1064         s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
1065         s = s.where(n.c.node == v.c.node)
1066
1067         r = self.conn.execute(s)
1068         l = r.fetchone()
1069         r.close()
1070         return l