1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
41 from sqlalchemy.exc import NoSuchTableError
43 from dbworker import DBWorker
45 from pithos.backends.filter import parse_filters
50 (SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
53 (MATCH_PREFIX, MATCH_EXACT) = range(2)
58 def strnextling(prefix):
59 """Return the first unicode string
60 greater than but not starting with given prefix.
61 strnextling('hello') -> 'hellp'
64 ## all strings start with the null string,
65 ## therefore we have to approximate strnextling('')
66 ## with the last unicode character supported by python
67 ## 0x10ffff for wide (32-bit unicode) python builds
68 ## 0x00ffff for narrow (16-bit unicode) python builds
69 ## We will not autodetect. 0xffff is safe enough.
79 def strprevling(prefix):
80 """Return an approximation of the last unicode string
81 less than but not starting with given prefix.
82 strprevling(u'hello') -> u'helln\\xffff'
85 ## There is no prevling for the null string
90 s += unichr(c - 1) + unichr(0xffff)
108 def create_tables(engine):
109 metadata = MetaData()
113 columns.append(Column('node', Integer, primary_key=True))
114 columns.append(Column('parent', Integer,
115 ForeignKey('nodes.node',
118 autoincrement=False))
119 columns.append(Column('latest_version', Integer))
120 columns.append(Column('path', String(2048), default='', nullable=False))
121 nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
122 Index('idx_nodes_path', nodes.c.path, unique=True)
123 Index('idx_nodes_parent', nodes.c.parent)
127 columns.append(Column('node', Integer,
128 ForeignKey('nodes.node',
132 columns.append(Column('key', String(128), primary_key=True))
133 columns.append(Column('value', String(256)))
134 policy = Table('policy', metadata, *columns, mysql_engine='InnoDB')
136 #create statistics table
138 columns.append(Column('node', Integer,
139 ForeignKey('nodes.node',
143 columns.append(Column('population', Integer, nullable=False, default=0))
144 columns.append(Column('size', BigInteger, nullable=False, default=0))
145 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
146 columns.append(Column('cluster', Integer, nullable=False, default=0,
147 primary_key=True, autoincrement=False))
148 statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
150 #create versions table
152 columns.append(Column('serial', Integer, primary_key=True))
153 columns.append(Column('node', Integer,
154 ForeignKey('nodes.node',
156 onupdate='CASCADE')))
157 columns.append(Column('hash', String(256)))
158 columns.append(Column('size', BigInteger, nullable=False, default=0))
159 columns.append(Column('type', String(256), nullable=False, default=''))
160 columns.append(Column('source', Integer))
161 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
162 columns.append(Column('muser', String(256), nullable=False, default=''))
163 columns.append(Column('uuid', String(64), nullable=False, default=''))
164 columns.append(Column('checksum', String(256), nullable=False, default=''))
165 columns.append(Column('cluster', Integer, nullable=False, default=0))
166 versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
167 Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
168 Index('idx_versions_node_uuid', versions.c.uuid)
170 #create attributes table
172 columns.append(Column('serial', Integer,
173 ForeignKey('versions.serial',
177 columns.append(Column('domain', String(256), primary_key=True))
178 columns.append(Column('key', String(128), primary_key=True))
179 columns.append(Column('value', String(256)))
180 attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
182 metadata.create_all(engine)
183 return metadata.sorted_tables
186 class Node(DBWorker):
187 """Nodes store path organization and have multiple versions.
188 Versions store object history and have multiple attributes.
189 Attributes store metadata.
192 # TODO: Provide an interface for included and excluded clusters.
194 def __init__(self, **params):
195 DBWorker.__init__(self, **params)
197 metadata = MetaData(self.engine)
198 self.nodes = Table('nodes', metadata, autoload=True)
199 self.policy = Table('policy', metadata, autoload=True)
200 self.statistics = Table('statistics', metadata, autoload=True)
201 self.versions = Table('versions', metadata, autoload=True)
202 self.attributes = Table('attributes', metadata, autoload=True)
203 except NoSuchTableError:
204 tables = create_tables(self.engine)
205 map(lambda t: self.__setattr__(t.name, t), tables)
207 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
208 self.nodes.c.parent == ROOTNODE))
209 rp = self.conn.execute(s)
213 s = self.nodes.insert(
214 ).values(node=ROOTNODE, parent=ROOTNODE, path='')
217 def node_create(self, parent, path):
218 """Create a new node from the given properties.
219 Return the node identifier of the new node.
221 #TODO catch IntegrityError?
222 s = self.nodes.insert().values(parent=parent, path=path)
223 r = self.conn.execute(s)
224 inserted_primary_key = r.inserted_primary_key[0]
226 return inserted_primary_key
228 def node_lookup(self, path):
229 """Lookup the current node of the given path.
230 Return None if the path is not found.
233 # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
234 s = select([self.nodes.c.node], self.nodes.c.path.like(
235 self.escape_like(path), escape='\\'))
236 r = self.conn.execute(s)
243 def node_lookup_bulk(self, paths):
244 """Lookup the current nodes for the given paths.
245 Return () if the path is not found.
248 # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
249 s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
250 r = self.conn.execute(s)
253 return [row[0] for row in rows]
255 def node_get_properties(self, node):
256 """Return the node's (parent, path).
257 Return None if the node is not found.
260 s = select([self.nodes.c.parent, self.nodes.c.path])
261 s = s.where(self.nodes.c.node == node)
262 r = self.conn.execute(s)
267 def node_get_versions(self, node, keys=(), propnames=_propnames):
268 """Return the properties of all versions at node.
269 If keys is empty, return all properties in the order
270 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
273 s = select([self.versions.c.serial,
274 self.versions.c.node,
275 self.versions.c.hash,
276 self.versions.c.size,
277 self.versions.c.type,
278 self.versions.c.source,
279 self.versions.c.mtime,
280 self.versions.c.muser,
281 self.versions.c.uuid,
282 self.versions.c.checksum,
283 self.versions.c.cluster], self.versions.c.node == node)
284 s = s.order_by(self.versions.c.serial)
285 r = self.conn.execute(s)
294 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
296 def node_count_children(self, node):
297 """Return node's child count."""
299 s = select([func.count(self.nodes.c.node)])
300 s = s.where(and_(self.nodes.c.parent == node,
301 self.nodes.c.node != ROOTNODE))
302 r = self.conn.execute(s)
307 def node_purge_children(self, parent, before=inf, cluster=0):
308 """Delete all versions with the specified
309 parent and cluster, and return
310 the hashes and size of versions deleted.
311 Clears out nodes with no remaining versions.
314 c1 = select([self.nodes.c.node],
315 self.nodes.c.parent == parent)
316 where_clause = and_(self.versions.c.node.in_(c1),
317 self.versions.c.cluster == cluster)
318 s = select([func.count(self.versions.c.serial),
319 func.sum(self.versions.c.size)])
320 s = s.where(where_clause)
322 s = s.where(self.versions.c.mtime <= before)
323 r = self.conn.execute(s)
328 nr, size = row[0], -row[1] if row[1] else 0
330 self.statistics_update(parent, -nr, size, mtime, cluster)
331 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
333 s = select([self.versions.c.hash])
334 s = s.where(where_clause)
335 r = self.conn.execute(s)
336 hashes = [row[0] for row in r.fetchall()]
340 s = self.versions.delete().where(where_clause)
341 r = self.conn.execute(s)
345 s = select([self.nodes.c.node],
346 and_(self.nodes.c.parent == parent,
347 select([func.count(self.versions.c.serial)],
348 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
349 rp = self.conn.execute(s)
350 nodes = [r[0] for r in rp.fetchall()]
352 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
353 self.conn.execute(s).close()
357 def node_purge(self, node, before=inf, cluster=0):
358 """Delete all versions with the specified
359 node and cluster, and return
360 the hashes and size of versions deleted.
361 Clears out the node if it has no remaining versions.
365 s = select([func.count(self.versions.c.serial),
366 func.sum(self.versions.c.size)])
367 where_clause = and_(self.versions.c.node == node,
368 self.versions.c.cluster == cluster)
369 s = s.where(where_clause)
371 s = s.where(self.versions.c.mtime <= before)
372 r = self.conn.execute(s)
374 nr, size = row[0], row[1]
379 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
381 s = select([self.versions.c.hash])
382 s = s.where(where_clause)
383 r = self.conn.execute(s)
384 hashes = [r[0] for r in r.fetchall()]
388 s = self.versions.delete().where(where_clause)
389 r = self.conn.execute(s)
393 s = select([self.nodes.c.node],
394 and_(self.nodes.c.node == node,
395 select([func.count(self.versions.c.serial)],
396 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
397 r = self.conn.execute(s)
400 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
401 self.conn.execute(s).close()
405 def node_remove(self, node):
406 """Remove the node specified.
407 Return false if the node has children or is not found.
410 if self.node_count_children(node):
414 s = select([func.count(self.versions.c.serial),
415 func.sum(self.versions.c.size),
416 self.versions.c.cluster])
417 s = s.where(self.versions.c.node == node)
418 s = s.group_by(self.versions.c.cluster)
419 r = self.conn.execute(s)
420 for population, size, cluster in r.fetchall():
421 self.statistics_update_ancestors(
422 node, -population, -size, mtime, cluster)
425 s = self.nodes.delete().where(self.nodes.c.node == node)
426 self.conn.execute(s).close()
429 def policy_get(self, node):
430 s = select([self.policy.c.key, self.policy.c.value],
431 self.policy.c.node == node)
432 r = self.conn.execute(s)
433 d = dict(r.fetchall())
437 def policy_set(self, node, policy):
439 for k, v in policy.iteritems():
440 s = self.policy.update().where(and_(self.policy.c.node == node,
441 self.policy.c.key == k))
442 s = s.values(value=v)
443 rp = self.conn.execute(s)
446 s = self.policy.insert()
447 values = {'node': node, 'key': k, 'value': v}
448 r = self.conn.execute(s, values)
451 def statistics_get(self, node, cluster=0):
452 """Return population, total size and last mtime
453 for all versions under node that belong to the cluster.
456 s = select([self.statistics.c.population,
457 self.statistics.c.size,
458 self.statistics.c.mtime])
459 s = s.where(and_(self.statistics.c.node == node,
460 self.statistics.c.cluster == cluster))
461 r = self.conn.execute(s)
466 def statistics_update(self, node, population, size, mtime, cluster=0):
467 """Update the statistics of the given node.
468 Statistics keep track the population, total
469 size of objects and mtime in the node's namespace.
470 May be zero or positive or negative numbers.
472 s = select([self.statistics.c.population, self.statistics.c.size],
473 and_(self.statistics.c.node == node,
474 self.statistics.c.cluster == cluster))
475 rp = self.conn.execute(s)
479 prepopulation, presize = (0, 0)
481 prepopulation, presize = r
482 population += prepopulation
487 u = self.statistics.update().where(and_(self.statistics.c.node == node,
488 self.statistics.c.cluster == cluster))
489 u = u.values(population=population, size=size, mtime=mtime)
490 rp = self.conn.execute(u)
493 ins = self.statistics.insert()
494 ins = ins.values(node=node, population=population, size=size,
495 mtime=mtime, cluster=cluster)
496 self.conn.execute(ins).close()
498 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
499 """Update the statistics of the given node's parent.
500 Then recursively update all parents up to the root.
501 Population is not recursive.
507 props = self.node_get_properties(node)
511 self.statistics_update(parent, population, size, mtime, cluster)
513 population = 0 # Population isn't recursive
515 def statistics_latest(self, node, before=inf, except_cluster=0):
516 """Return population, total size and last mtime
517 for all latest versions under node that
518 do not belong to the cluster.
522 props = self.node_get_properties(node)
527 # The latest version.
528 s = select([self.versions.c.serial,
529 self.versions.c.node,
530 self.versions.c.hash,
531 self.versions.c.size,
532 self.versions.c.type,
533 self.versions.c.source,
534 self.versions.c.mtime,
535 self.versions.c.muser,
536 self.versions.c.uuid,
537 self.versions.c.checksum,
538 self.versions.c.cluster])
540 filtered = select([func.max(self.versions.c.serial)],
541 self.versions.c.node == node)
542 filtered = filtered.where(self.versions.c.mtime < before)
544 filtered = select([self.nodes.c.latest_version],
545 self.versions.c.node == node)
546 s = s.where(and_(self.versions.c.cluster != except_cluster,
547 self.versions.c.serial == filtered))
548 r = self.conn.execute(s)
555 # First level, just under node (get population).
556 v = self.versions.alias('v')
557 s = select([func.count(v.c.serial),
559 func.max(v.c.mtime)])
561 c1 = select([func.max(self.versions.c.serial)])
562 c1 = c1.where(self.versions.c.mtime < before)
563 c1.where(self.versions.c.node == v.c.node)
565 c1 = select([self.nodes.c.latest_version])
566 c1.where(self.nodes.c.node == v.c.node)
567 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
568 s = s.where(and_(v.c.serial == c1,
569 v.c.cluster != except_cluster,
571 rp = self.conn.execute(s)
577 mtime = max(mtime, r[2])
581 # All children (get size and mtime).
582 # This is why the full path is stored.
583 s = select([func.count(v.c.serial),
585 func.max(v.c.mtime)])
587 c1 = select([func.max(self.versions.c.serial)],
588 self.versions.c.node == v.c.node)
589 c1 = c1.where(self.versions.c.mtime < before)
591 c1 = select([self.nodes.c.serial],
592 self.nodes.c.node == v.c.node)
593 c2 = select([self.nodes.c.node], self.nodes.c.path.like(
594 self.escape_like(path) + '%', escape='\\'))
595 s = s.where(and_(v.c.serial == c1,
596 v.c.cluster != except_cluster,
598 rp = self.conn.execute(s)
603 size = r[1] - props[SIZE]
604 mtime = max(mtime, r[2])
605 return (count, size, mtime)
607 def nodes_set_latest_version(self, node, serial):
608 s = self.nodes.update().where(self.nodes.c.node == node)
609 s = s.values(latest_version=serial)
610 self.conn.execute(s).close()
612 def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
613 """Create a new version from the given properties.
614 Return the (serial, mtime) of the new version.
618 s = self.versions.insert(
619 ).values(node=node, hash=hash, size=size, type=type, source=source,
620 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
621 serial = self.conn.execute(s).inserted_primary_key[0]
622 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
624 self.nodes_set_latest_version(node, serial)
628 def version_lookup(self, node, before=inf, cluster=0, all_props=True):
629 """Lookup the current version of the given node.
630 Return a list with its properties:
631 (serial, node, hash, size, type, source, mtime,
632 muser, uuid, checksum, cluster)
633 or None if the current version is not found in the given cluster.
636 v = self.versions.alias('v')
638 s = select([v.c.serial])
640 s = select([v.c.serial, v.c.node, v.c.hash,
641 v.c.size, v.c.type, v.c.source,
642 v.c.mtime, v.c.muser, v.c.uuid,
643 v.c.checksum, v.c.cluster])
645 c = select([func.max(self.versions.c.serial)],
646 self.versions.c.node == node)
647 c = c.where(self.versions.c.mtime < before)
649 c = select([self.nodes.c.latest_version],
650 self.nodes.c.node == node)
651 s = s.where(and_(v.c.serial == c,
652 v.c.cluster == cluster))
653 r = self.conn.execute(s)
660 def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
661 """Lookup the current versions of the given nodes.
662 Return a list with their properties:
663 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
667 v = self.versions.alias('v')
669 s = select([v.c.serial])
671 s = select([v.c.serial, v.c.node, v.c.hash,
672 v.c.size, v.c.type, v.c.source,
673 v.c.mtime, v.c.muser, v.c.uuid,
674 v.c.checksum, v.c.cluster])
676 c = select([func.max(self.versions.c.serial)],
677 self.versions.c.node.in_(nodes))
678 c = c.where(self.versions.c.mtime < before)
679 c = c.group_by(self.versions.c.node)
681 c = select([self.nodes.c.latest_version],
682 self.nodes.c.node.in_(nodes))
683 s = s.where(and_(v.c.serial.in_(c),
684 v.c.cluster == cluster))
685 s = s.order_by(v.c.node)
686 r = self.conn.execute(s)
687 rproxy = r.fetchall()
689 return (tuple(row.values()) for row in rproxy)
691 def version_get_properties(self, serial, keys=(), propnames=_propnames):
692 """Return a sequence of values for the properties of
693 the version specified by serial and the keys, in the order given.
694 If keys is empty, return all properties in the order
695 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
698 v = self.versions.alias()
699 s = select([v.c.serial, v.c.node, v.c.hash,
700 v.c.size, v.c.type, v.c.source,
701 v.c.mtime, v.c.muser, v.c.uuid,
702 v.c.checksum, v.c.cluster], v.c.serial == serial)
703 rp = self.conn.execute(s)
711 return [r[propnames[k]] for k in keys if k in propnames]
713 def version_put_property(self, serial, key, value):
714 """Set value for the property of version specified by key."""
716 if key not in _propnames:
718 s = self.versions.update()
719 s = s.where(self.versions.c.serial == serial)
720 s = s.values(**{key: value})
721 self.conn.execute(s).close()
723 def version_recluster(self, serial, cluster):
724 """Move the version into another cluster."""
726 props = self.version_get_properties(serial)
731 oldcluster = props[CLUSTER]
732 if cluster == oldcluster:
736 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
737 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
739 s = self.versions.update()
740 s = s.where(self.versions.c.serial == serial)
741 s = s.values(cluster=cluster)
742 self.conn.execute(s).close()
744 def version_remove(self, serial):
745 """Remove the serial specified."""
747 props = self.version_get_properties(serial)
753 cluster = props[CLUSTER]
756 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
758 s = self.versions.delete().where(self.versions.c.serial == serial)
759 self.conn.execute(s).close()
761 props = self.version_lookup(node, cluster=cluster, all_props=False)
763 self.nodes_set_latest_version(v.node, serial)
767 def attribute_get(self, serial, domain, keys=()):
768 """Return a list of (key, value) pairs of the version specified by serial.
769 If keys is empty, return all attributes.
770 Othwerise, return only those specified.
774 attrs = self.attributes.alias()
775 s = select([attrs.c.key, attrs.c.value])
776 s = s.where(and_(attrs.c.key.in_(keys),
777 attrs.c.serial == serial,
778 attrs.c.domain == domain))
780 attrs = self.attributes.alias()
781 s = select([attrs.c.key, attrs.c.value])
782 s = s.where(and_(attrs.c.serial == serial,
783 attrs.c.domain == domain))
784 r = self.conn.execute(s)
789 def attribute_set(self, serial, domain, items):
790 """Set the attributes of the version specified by serial.
791 Receive attributes as an iterable of (key, value) pairs.
796 s = self.attributes.update()
797 s = s.where(and_(self.attributes.c.serial == serial,
798 self.attributes.c.domain == domain,
799 self.attributes.c.key == k))
800 s = s.values(value=v)
801 rp = self.conn.execute(s)
804 s = self.attributes.insert()
805 s = s.values(serial=serial, domain=domain, key=k, value=v)
806 self.conn.execute(s).close()
808 def attribute_del(self, serial, domain, keys=()):
809 """Delete attributes of the version specified by serial.
810 If keys is empty, delete all attributes.
811 Otherwise delete those specified.
815 #TODO more efficient way to do this?
817 s = self.attributes.delete()
818 s = s.where(and_(self.attributes.c.serial == serial,
819 self.attributes.c.domain == domain,
820 self.attributes.c.key == key))
821 self.conn.execute(s).close()
823 s = self.attributes.delete()
824 s = s.where(and_(self.attributes.c.serial == serial,
825 self.attributes.c.domain == domain))
826 self.conn.execute(s).close()
828 def attribute_copy(self, source, dest):
830 [dest, self.attributes.c.domain,
831 self.attributes.c.key, self.attributes.c.value],
832 self.attributes.c.serial == source)
833 rp = self.conn.execute(s)
834 attributes = rp.fetchall()
836 for dest, domain, k, v in attributes:
838 s = self.attributes.update().where(and_(
839 self.attributes.c.serial == dest,
840 self.attributes.c.domain == domain,
841 self.attributes.c.key == k))
842 rp = self.conn.execute(s, value=v)
845 s = self.attributes.insert()
846 values = {'serial': dest, 'domain': domain,
847 'key': k, 'value': v}
848 self.conn.execute(s, values).close()
850 def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
851 """Return a list with all keys pairs defined
852 for all latest versions under parent that
853 do not belong to the cluster.
856 # TODO: Use another table to store before=inf results.
857 a = self.attributes.alias('a')
858 v = self.versions.alias('v')
859 n = self.nodes.alias('n')
860 s = select([a.c.key]).distinct()
862 filtered = select([func.max(self.versions.c.serial)])
863 filtered = filtered.where(self.versions.c.mtime < before)
864 filtered = filtered.where(self.versions.c.node == v.c.node)
866 filtered = select([self.nodes.c.latest_version])
867 filtered = filtered.where(self.nodes.c.node == v.c.node)
868 s = s.where(v.c.serial == filtered)
869 s = s.where(v.c.cluster != except_cluster)
870 s = s.where(v.c.node.in_(select([self.nodes.c.node],
871 self.nodes.c.parent == parent)))
872 s = s.where(a.c.serial == v.c.serial)
873 s = s.where(a.c.domain == domain)
874 s = s.where(n.c.node == v.c.node)
876 for path, match in pathq:
877 if match == MATCH_PREFIX:
879 n.c.path.like(self.escape_like(path) + '%', escape='\\'))
880 elif match == MATCH_EXACT:
881 conj.append(n.c.path == path)
883 s = s.where(or_(*conj))
884 rp = self.conn.execute(s)
887 return [r[0] for r in rows]
889 def latest_version_list(self, parent, prefix='', delimiter=None,
890 start='', limit=10000, before=inf,
891 except_cluster=0, pathq=[], domain=None,
892 filterq=[], sizeq=None, all_props=False):
893 """Return a (list of (path, serial) tuples, list of common prefixes)
894 for the current versions of the paths with the given parent,
895 matching the following criteria.
897 The property tuple for a version is returned if all
898 of these conditions are true:
904 c. path starts with prefix (and paths in pathq)
906 d. version is the max up to before
908 e. version is not in cluster
910 f. the path does not have the delimiter occuring
911 after the prefix, or ends with the delimiter
913 g. serial matches the attribute filter query.
915 A filter query is a comma-separated list of
916 terms in one of these three forms:
919 an attribute with this key must exist
922 an attribute with this key must not exist
925 the attribute with this key satisfies the value
926 where ?op is one of ==, != <=, >=, <, >.
928 h. the size is in the range set by sizeq
930 The list of common prefixes includes the prefixes
931 matching up to the first delimiter after prefix,
932 and are reported only once, as "virtual directories".
933 The delimiter is included in the prefixes.
935 If arguments are None, then the corresponding matching rule
938 Limit applies to the first list of tuples returned.
940 If all_props is True, return all properties after path, not just serial.
943 if not start or start < prefix:
944 start = strprevling(prefix)
945 nextling = strnextling(prefix)
947 v = self.versions.alias('v')
948 n = self.nodes.alias('n')
950 s = select([n.c.path, v.c.serial]).distinct()
952 s = select([n.c.path,
953 v.c.serial, v.c.node, v.c.hash,
954 v.c.size, v.c.type, v.c.source,
955 v.c.mtime, v.c.muser, v.c.uuid,
956 v.c.checksum, v.c.cluster]).distinct()
958 filtered = select([func.max(self.versions.c.serial)])
959 filtered = filtered.where(self.versions.c.mtime < before)
961 filtered = select([self.nodes.c.latest_version])
963 v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
964 s = s.where(v.c.cluster != except_cluster)
965 s = s.where(v.c.node.in_(select([self.nodes.c.node],
966 self.nodes.c.parent == parent)))
968 s = s.where(n.c.node == v.c.node)
969 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
971 for path, match in pathq:
972 if match == MATCH_PREFIX:
974 n.c.path.like(self.escape_like(path) + '%', escape='\\'))
975 elif match == MATCH_EXACT:
976 conj.append(n.c.path == path)
978 s = s.where(or_(*conj))
980 if sizeq and len(sizeq) == 2:
982 s = s.where(v.c.size >= sizeq[0])
984 s = s.where(v.c.size < sizeq[1])
986 if domain and filterq:
987 a = self.attributes.alias('a')
988 included, excluded, opers = parse_filters(filterq)
991 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
992 subs = subs.where(a.c.domain == domain)
993 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
994 s = s.where(exists(subs))
997 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
998 subs = subs.where(a.c.domain == domain)
999 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1000 s = s.where(not_(exists(subs)))
1002 for k, o, val in opers:
1004 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1005 subs = subs.where(a.c.domain == domain)
1007 and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1008 s = s.where(exists(subs))
1010 s = s.order_by(n.c.path)
1014 rp = self.conn.execute(s, start=start)
1023 pappend = prefixes.append
1025 mappend = matches.append
1027 rp = self.conn.execute(s, start=start)
1029 props = rp.fetchone()
1034 idx = path.find(delimiter, pfz)
1043 if idx + dz == len(path):
1046 continue # Get one more, in case there is a path.
1047 pf = path[:idx + dz]
1052 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
1055 return matches, prefixes
1057 def latest_uuid(self, uuid):
1058 """Return a (path, serial) tuple, for the latest version of the given uuid."""
1060 v = self.versions.alias('v')
1061 n = self.nodes.alias('n')
1062 s = select([n.c.path, v.c.serial])
1063 filtered = select([func.max(self.versions.c.serial)])
1064 s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
1065 s = s.where(n.c.node == v.c.node)
1067 r = self.conn.execute(s)