1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
42 from dbworker import DBWorker
44 from pithos.backends.filter import parse_filters
49 ( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
51 ( MATCH_PREFIX, MATCH_EXACT ) = range(2)
56 def strnextling(prefix):
57 """Return the first unicode string
58 greater than but not starting with given prefix.
59 strnextling('hello') -> 'hellp'
62 ## all strings start with the null string,
63 ## therefore we have to approximate strnextling('')
64 ## with the last unicode character supported by python
65 ## 0x10ffff for wide (32-bit unicode) python builds
66 ## 0x00ffff for narrow (16-bit unicode) python builds
67 ## We will not autodetect. 0xffff is safe enough.
76 def strprevling(prefix):
77 """Return an approximation of the last unicode string
78 less than but not starting with given prefix.
79 strprevling(u'hello') -> u'helln\\xffff'
82 ## There is no prevling for the null string
87 s += unichr(c-1) + unichr(0xffff)
105 class Node(DBWorker):
106 """Nodes store path organization and have multiple versions.
107 Versions store object history and have multiple attributes.
108 Attributes store metadata.
111 # TODO: Provide an interface for included and excluded clusters.
113 def __init__(self, **params):
114 DBWorker.__init__(self, **params)
115 metadata = MetaData()
119 columns.append(Column('node', Integer, primary_key=True))
120 columns.append(Column('parent', Integer,
121 ForeignKey('nodes.node',
124 autoincrement=False))
125 columns.append(Column('latest_version', Integer))
126 columns.append(Column('path', String(2048), default='', nullable=False))
127 self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
128 Index('idx_nodes_path', self.nodes.c.path, unique=True)
129 Index('idx_nodes_parent', self.nodes.c.parent)
133 columns.append(Column('node', Integer,
134 ForeignKey('nodes.node',
138 columns.append(Column('key', String(128), primary_key=True))
139 columns.append(Column('value', String(256)))
140 self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
142 #create statistics table
144 columns.append(Column('node', Integer,
145 ForeignKey('nodes.node',
149 columns.append(Column('population', Integer, nullable=False, default=0))
150 columns.append(Column('size', BigInteger, nullable=False, default=0))
151 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
152 columns.append(Column('cluster', Integer, nullable=False, default=0,
153 primary_key=True, autoincrement=False))
154 self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
156 #create versions table
158 columns.append(Column('serial', Integer, primary_key=True))
159 columns.append(Column('node', Integer,
160 ForeignKey('nodes.node',
162 onupdate='CASCADE')))
163 columns.append(Column('hash', String(256)))
164 columns.append(Column('size', BigInteger, nullable=False, default=0))
165 columns.append(Column('type', String(256), nullable=False, default=''))
166 columns.append(Column('source', Integer))
167 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
168 columns.append(Column('muser', String(256), nullable=False, default=''))
169 columns.append(Column('uuid', String(64), nullable=False, default=''))
170 columns.append(Column('checksum', String(256), nullable=False, default=''))
171 columns.append(Column('cluster', Integer, nullable=False, default=0))
172 self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
173 Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
174 Index('idx_versions_node_uuid', self.versions.c.uuid)
176 #create attributes table
178 columns.append(Column('serial', Integer,
179 ForeignKey('versions.serial',
183 columns.append(Column('domain', String(256), primary_key=True))
184 columns.append(Column('key', String(128), primary_key=True))
185 columns.append(Column('value', String(256)))
186 self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
188 metadata.create_all(self.engine)
190 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
191 self.nodes.c.parent == ROOTNODE))
192 rp = self.conn.execute(s)
196 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
199 def node_create(self, parent, path):
200 """Create a new node from the given properties.
201 Return the node identifier of the new node.
203 #TODO catch IntegrityError?
204 s = self.nodes.insert().values(parent=parent, path=path)
205 r = self.conn.execute(s)
206 inserted_primary_key = r.inserted_primary_key[0]
208 return inserted_primary_key
210 def node_lookup(self, path):
211 """Lookup the current node of the given path.
212 Return None if the path is not found.
215 # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
216 s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
217 r = self.conn.execute(s)
224 def node_lookup_bulk(self, paths):
225 """Lookup the current nodes for the given paths.
226 Return () if the path is not found.
229 # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
230 s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
231 r = self.conn.execute(s)
234 return [row[0] for row in rows]
236 def node_get_properties(self, node):
237 """Return the node's (parent, path).
238 Return None if the node is not found.
241 s = select([self.nodes.c.parent, self.nodes.c.path])
242 s = s.where(self.nodes.c.node == node)
243 r = self.conn.execute(s)
248 def node_get_versions(self, node, keys=(), propnames=_propnames):
249 """Return the properties of all versions at node.
250 If keys is empty, return all properties in the order
251 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
254 s = select([self.versions.c.serial,
255 self.versions.c.node,
256 self.versions.c.hash,
257 self.versions.c.size,
258 self.versions.c.type,
259 self.versions.c.source,
260 self.versions.c.mtime,
261 self.versions.c.muser,
262 self.versions.c.uuid,
263 self.versions.c.checksum,
264 self.versions.c.cluster], self.versions.c.node == node)
265 s = s.order_by(self.versions.c.serial)
266 r = self.conn.execute(s)
275 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
277 def node_count_children(self, node):
278 """Return node's child count."""
280 s = select([func.count(self.nodes.c.node)])
281 s = s.where(and_(self.nodes.c.parent == node,
282 self.nodes.c.node != ROOTNODE))
283 r = self.conn.execute(s)
288 def node_purge_children(self, parent, before=inf, cluster=0):
289 """Delete all versions with the specified
290 parent and cluster, and return
291 the hashes and size of versions deleted.
292 Clears out nodes with no remaining versions.
295 c1 = select([self.nodes.c.node],
296 self.nodes.c.parent == parent)
297 where_clause = and_(self.versions.c.node.in_(c1),
298 self.versions.c.cluster == cluster)
299 s = select([func.count(self.versions.c.serial),
300 func.sum(self.versions.c.size)])
301 s = s.where(where_clause)
303 s = s.where(self.versions.c.mtime <= before)
304 r = self.conn.execute(s)
309 nr, size = row[0], -row[1] if row[1] else 0
311 self.statistics_update(parent, -nr, size, mtime, cluster)
312 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
314 s = select([self.versions.c.hash])
315 s = s.where(where_clause)
316 r = self.conn.execute(s)
317 hashes = [row[0] for row in r.fetchall()]
321 s = self.versions.delete().where(where_clause)
322 r = self.conn.execute(s)
326 s = select([self.nodes.c.node],
327 and_(self.nodes.c.parent == parent,
328 select([func.count(self.versions.c.serial)],
329 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
330 rp = self.conn.execute(s)
331 nodes = [r[0] for r in rp.fetchall()]
333 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
334 self.conn.execute(s).close()
338 def node_purge(self, node, before=inf, cluster=0):
339 """Delete all versions with the specified
340 node and cluster, and return
341 the hashes and size of versions deleted.
342 Clears out the node if it has no remaining versions.
346 s = select([func.count(self.versions.c.serial),
347 func.sum(self.versions.c.size)])
348 where_clause = and_(self.versions.c.node == node,
349 self.versions.c.cluster == cluster)
350 s = s.where(where_clause)
352 s = s.where(self.versions.c.mtime <= before)
353 r = self.conn.execute(s)
355 nr, size = row[0], row[1]
360 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
362 s = select([self.versions.c.hash])
363 s = s.where(where_clause)
364 r = self.conn.execute(s)
365 hashes = [r[0] for r in r.fetchall()]
369 s = self.versions.delete().where(where_clause)
370 r = self.conn.execute(s)
374 s = select([self.nodes.c.node],
375 and_(self.nodes.c.node == node,
376 select([func.count(self.versions.c.serial)],
377 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
378 r = self.conn.execute(s)
381 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
382 self.conn.execute(s).close()
386 def node_remove(self, node):
387 """Remove the node specified.
388 Return false if the node has children or is not found.
391 if self.node_count_children(node):
395 s = select([func.count(self.versions.c.serial),
396 func.sum(self.versions.c.size),
397 self.versions.c.cluster])
398 s = s.where(self.versions.c.node == node)
399 s = s.group_by(self.versions.c.cluster)
400 r = self.conn.execute(s)
401 for population, size, cluster in r.fetchall():
402 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
405 s = self.nodes.delete().where(self.nodes.c.node == node)
406 self.conn.execute(s).close()
409 def policy_get(self, node):
410 s = select([self.policies.c.key, self.policies.c.value],
411 self.policies.c.node==node)
412 r = self.conn.execute(s)
413 d = dict(r.fetchall())
417 def policy_set(self, node, policy):
419 for k, v in policy.iteritems():
420 s = self.policies.update().where(and_(self.policies.c.node == node,
421 self.policies.c.key == k))
422 s = s.values(value = v)
423 rp = self.conn.execute(s)
426 s = self.policies.insert()
427 values = {'node':node, 'key':k, 'value':v}
428 r = self.conn.execute(s, values)
431 def statistics_get(self, node, cluster=0):
432 """Return population, total size and last mtime
433 for all versions under node that belong to the cluster.
436 s = select([self.statistics.c.population,
437 self.statistics.c.size,
438 self.statistics.c.mtime])
439 s = s.where(and_(self.statistics.c.node == node,
440 self.statistics.c.cluster == cluster))
441 r = self.conn.execute(s)
446 def statistics_update(self, node, population, size, mtime, cluster=0):
447 """Update the statistics of the given node.
448 Statistics keep track the population, total
449 size of objects and mtime in the node's namespace.
450 May be zero or positive or negative numbers.
452 s = select([self.statistics.c.population, self.statistics.c.size],
453 and_(self.statistics.c.node == node,
454 self.statistics.c.cluster == cluster))
455 rp = self.conn.execute(s)
459 prepopulation, presize = (0, 0)
461 prepopulation, presize = r
462 population += prepopulation
467 u = self.statistics.update().where(and_(self.statistics.c.node==node,
468 self.statistics.c.cluster==cluster))
469 u = u.values(population=population, size=size, mtime=mtime)
470 rp = self.conn.execute(u)
473 ins = self.statistics.insert()
474 ins = ins.values(node=node, population=population, size=size,
475 mtime=mtime, cluster=cluster)
476 self.conn.execute(ins).close()
478 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
479 """Update the statistics of the given node's parent.
480 Then recursively update all parents up to the root.
481 Population is not recursive.
487 props = self.node_get_properties(node)
491 self.statistics_update(parent, population, size, mtime, cluster)
493 population = 0 # Population isn't recursive
495 def statistics_latest(self, node, before=inf, except_cluster=0):
496 """Return population, total size and last mtime
497 for all latest versions under node that
498 do not belong to the cluster.
502 props = self.node_get_properties(node)
507 # The latest version.
508 s = select([self.versions.c.serial,
509 self.versions.c.node,
510 self.versions.c.hash,
511 self.versions.c.size,
512 self.versions.c.type,
513 self.versions.c.source,
514 self.versions.c.mtime,
515 self.versions.c.muser,
516 self.versions.c.uuid,
517 self.versions.c.checksum,
518 self.versions.c.cluster])
520 filtered = select([func.max(self.versions.c.serial)],
521 self.versions.c.node == node)
522 filtered = filtered.where(self.versions.c.mtime < before)
524 filtered = select([self.nodes.c.latest_version],
525 self.versions.c.node == node)
526 s = s.where(and_(self.versions.c.cluster != except_cluster,
527 self.versions.c.serial == filtered))
528 r = self.conn.execute(s)
535 # First level, just under node (get population).
536 v = self.versions.alias('v')
537 s = select([func.count(v.c.serial),
539 func.max(v.c.mtime)])
541 c1 = select([func.max(self.versions.c.serial)])
542 c1 = c1.where(self.versions.c.mtime < before)
543 c1.where(self.versions.c.node == v.c.node)
545 c1 = select([self.nodes.c.latest_version])
546 c1.where(self.nodes.c.node == v.c.node)
547 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
548 s = s.where(and_(v.c.serial == c1,
549 v.c.cluster != except_cluster,
551 rp = self.conn.execute(s)
557 mtime = max(mtime, r[2])
561 # All children (get size and mtime).
562 # This is why the full path is stored.
563 s = select([func.count(v.c.serial),
565 func.max(v.c.mtime)])
567 c1 = select([func.max(self.versions.c.serial)],
568 self.versions.c.node == v.c.node)
569 c1 = c1.where(self.versions.c.mtime < before)
571 c1 = select([self.nodes.c.serial],
572 self.nodes.c.node == v.c.node)
573 c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
574 s = s.where(and_(v.c.serial == c1,
575 v.c.cluster != except_cluster,
577 rp = self.conn.execute(s)
582 size = r[1] - props[SIZE]
583 mtime = max(mtime, r[2])
584 return (count, size, mtime)
586 def nodes_set_latest_version(self, node, serial):
587 s = self.nodes.update().where(self.nodes.c.node == node)
588 s = s.values(latest_version = serial)
589 self.conn.execute(s).close()
591 def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
592 """Create a new version from the given properties.
593 Return the (serial, mtime) of the new version.
597 s = self.versions.insert().values(node=node, hash=hash, size=size, type=type, source=source,
598 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
599 serial = self.conn.execute(s).inserted_primary_key[0]
600 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
602 self.nodes_set_latest_version(node, serial)
606 def version_lookup(self, node, before=inf, cluster=0, all_props=True):
607 """Lookup the current version of the given node.
608 Return a list with its properties:
609 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
610 or None if the current version is not found in the given cluster.
613 v = self.versions.alias('v')
615 s = select([v.c.serial])
617 s = select([v.c.serial, v.c.node, v.c.hash,
618 v.c.size, v.c.type, v.c.source,
619 v.c.mtime, v.c.muser, v.c.uuid,
620 v.c.checksum, v.c.cluster])
622 c = select([func.max(self.versions.c.serial)],
623 self.versions.c.node == node)
624 c = c.where(self.versions.c.mtime < before)
626 c = select([self.nodes.c.latest_version],
627 self.nodes.c.node == node)
628 s = s.where(and_(v.c.serial == c,
629 v.c.cluster == cluster))
630 r = self.conn.execute(s)
637 def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
638 """Lookup the current versions of the given nodes.
639 Return a list with their properties:
640 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
644 v = self.versions.alias('v')
646 s = select([v.c.serial])
648 s = select([v.c.serial, v.c.node, v.c.hash,
649 v.c.size, v.c.type, v.c.source,
650 v.c.mtime, v.c.muser, v.c.uuid,
651 v.c.checksum, v.c.cluster])
653 c = select([func.max(self.versions.c.serial)],
654 self.versions.c.node.in_(nodes))
655 c = c.where(self.versions.c.mtime < before)
656 c = c.group_by(self.versions.c.node)
658 c = select([self.nodes.c.latest_version],
659 self.nodes.c.node.in_(nodes))
660 s = s.where(and_(v.c.serial.in_(c),
661 v.c.cluster == cluster))
662 s = s.order_by(v.c.node)
663 r = self.conn.execute(s)
664 rproxy = r.fetchall()
666 return (tuple(row.values()) for row in rproxy)
668 def version_get_properties(self, serial, keys=(), propnames=_propnames):
669 """Return a sequence of values for the properties of
670 the version specified by serial and the keys, in the order given.
671 If keys is empty, return all properties in the order
672 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
675 v = self.versions.alias()
676 s = select([v.c.serial, v.c.node, v.c.hash,
677 v.c.size, v.c.type, v.c.source,
678 v.c.mtime, v.c.muser, v.c.uuid,
679 v.c.checksum, v.c.cluster], v.c.serial == serial)
680 rp = self.conn.execute(s)
688 return [r[propnames[k]] for k in keys if k in propnames]
690 def version_put_property(self, serial, key, value):
691 """Set value for the property of version specified by key."""
693 if key not in _propnames:
695 s = self.versions.update()
696 s = s.where(self.versions.c.serial == serial)
697 s = s.values(**{key: value})
698 self.conn.execute(s).close()
700 def version_recluster(self, serial, cluster):
701 """Move the version into another cluster."""
703 props = self.version_get_properties(serial)
708 oldcluster = props[CLUSTER]
709 if cluster == oldcluster:
713 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
714 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
716 s = self.versions.update()
717 s = s.where(self.versions.c.serial == serial)
718 s = s.values(cluster = cluster)
719 self.conn.execute(s).close()
721 def version_remove(self, serial):
722 """Remove the serial specified."""
724 props = self.version_get_properties(serial)
730 cluster = props[CLUSTER]
733 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
735 s = self.versions.delete().where(self.versions.c.serial == serial)
736 self.conn.execute(s).close()
738 props = self.version_lookup(node, cluster=cluster, all_props=False)
740 self.nodes_set_latest_version(v.node, serial)
744 def attribute_get(self, serial, domain, keys=()):
745 """Return a list of (key, value) pairs of the version specified by serial.
746 If keys is empty, return all attributes.
747 Othwerise, return only those specified.
751 attrs = self.attributes.alias()
752 s = select([attrs.c.key, attrs.c.value])
753 s = s.where(and_(attrs.c.key.in_(keys),
754 attrs.c.serial == serial,
755 attrs.c.domain == domain))
757 attrs = self.attributes.alias()
758 s = select([attrs.c.key, attrs.c.value])
759 s = s.where(and_(attrs.c.serial == serial,
760 attrs.c.domain == domain))
761 r = self.conn.execute(s)
766 def attribute_set(self, serial, domain, items):
767 """Set the attributes of the version specified by serial.
768 Receive attributes as an iterable of (key, value) pairs.
773 s = self.attributes.update()
774 s = s.where(and_(self.attributes.c.serial == serial,
775 self.attributes.c.domain == domain,
776 self.attributes.c.key == k))
777 s = s.values(value = v)
778 rp = self.conn.execute(s)
781 s = self.attributes.insert()
782 s = s.values(serial=serial, domain=domain, key=k, value=v)
783 self.conn.execute(s).close()
785 def attribute_del(self, serial, domain, keys=()):
786 """Delete attributes of the version specified by serial.
787 If keys is empty, delete all attributes.
788 Otherwise delete those specified.
792 #TODO more efficient way to do this?
794 s = self.attributes.delete()
795 s = s.where(and_(self.attributes.c.serial == serial,
796 self.attributes.c.domain == domain,
797 self.attributes.c.key == key))
798 self.conn.execute(s).close()
800 s = self.attributes.delete()
801 s = s.where(and_(self.attributes.c.serial == serial,
802 self.attributes.c.domain == domain))
803 self.conn.execute(s).close()
805 def attribute_copy(self, source, dest):
806 s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
807 self.attributes.c.serial == source)
808 rp = self.conn.execute(s)
809 attributes = rp.fetchall()
811 for dest, domain, k, v in attributes:
813 s = self.attributes.update().where(and_(
814 self.attributes.c.serial == dest,
815 self.attributes.c.domain == domain,
816 self.attributes.c.key == k))
817 rp = self.conn.execute(s, value=v)
820 s = self.attributes.insert()
821 values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
822 self.conn.execute(s, values).close()
824 def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
825 """Return a list with all keys pairs defined
826 for all latest versions under parent that
827 do not belong to the cluster.
830 # TODO: Use another table to store before=inf results.
831 a = self.attributes.alias('a')
832 v = self.versions.alias('v')
833 n = self.nodes.alias('n')
834 s = select([a.c.key]).distinct()
836 filtered = select([func.max(self.versions.c.serial)])
837 filtered = filtered.where(self.versions.c.mtime < before)
838 filtered = filtered.where(self.versions.c.node == v.c.node)
840 filtered = select([self.nodes.c.latest_version])
841 filtered = filtered.where(self.nodes.c.node == v.c.node)
842 s = s.where(v.c.serial == filtered)
843 s = s.where(v.c.cluster != except_cluster)
844 s = s.where(v.c.node.in_(select([self.nodes.c.node],
845 self.nodes.c.parent == parent)))
846 s = s.where(a.c.serial == v.c.serial)
847 s = s.where(a.c.domain == domain)
848 s = s.where(n.c.node == v.c.node)
850 for path, match in pathq:
851 if match == MATCH_PREFIX:
852 conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
853 elif match == MATCH_EXACT:
854 conj.append(n.c.path == path)
856 s = s.where(or_(*conj))
857 rp = self.conn.execute(s)
860 return [r[0] for r in rows]
862 def latest_version_list(self, parent, prefix='', delimiter=None,
863 start='', limit=10000, before=inf,
864 except_cluster=0, pathq=[], domain=None,
865 filterq=[], sizeq=None, all_props=False):
866 """Return a (list of (path, serial) tuples, list of common prefixes)
867 for the current versions of the paths with the given parent,
868 matching the following criteria.
870 The property tuple for a version is returned if all
871 of these conditions are true:
877 c. path starts with prefix (and paths in pathq)
879 d. version is the max up to before
881 e. version is not in cluster
883 f. the path does not have the delimiter occuring
884 after the prefix, or ends with the delimiter
886 g. serial matches the attribute filter query.
888 A filter query is a comma-separated list of
889 terms in one of these three forms:
892 an attribute with this key must exist
895 an attribute with this key must not exist
898 the attribute with this key satisfies the value
899 where ?op is one of ==, != <=, >=, <, >.
901 h. the size is in the range set by sizeq
903 The list of common prefixes includes the prefixes
904 matching up to the first delimiter after prefix,
905 and are reported only once, as "virtual directories".
906 The delimiter is included in the prefixes.
908 If arguments are None, then the corresponding matching rule
911 Limit applies to the first list of tuples returned.
913 If all_props is True, return all properties after path, not just serial.
916 if not start or start < prefix:
917 start = strprevling(prefix)
918 nextling = strnextling(prefix)
920 v = self.versions.alias('v')
921 n = self.nodes.alias('n')
923 s = select([n.c.path, v.c.serial]).distinct()
925 s = select([n.c.path,
926 v.c.serial, v.c.node, v.c.hash,
927 v.c.size, v.c.type, v.c.source,
928 v.c.mtime, v.c.muser, v.c.uuid,
929 v.c.checksum, v.c.cluster]).distinct()
931 filtered = select([func.max(self.versions.c.serial)])
932 filtered = filtered.where(self.versions.c.mtime < before)
934 filtered = select([self.nodes.c.latest_version])
935 s = s.where(v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
936 s = s.where(v.c.cluster != except_cluster)
937 s = s.where(v.c.node.in_(select([self.nodes.c.node],
938 self.nodes.c.parent == parent)))
940 s = s.where(n.c.node == v.c.node)
941 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
943 for path, match in pathq:
944 if match == MATCH_PREFIX:
945 conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
946 elif match == MATCH_EXACT:
947 conj.append(n.c.path == path)
949 s = s.where(or_(*conj))
951 if sizeq and len(sizeq) == 2:
953 s = s.where(v.c.size >= sizeq[0])
955 s = s.where(v.c.size < sizeq[1])
957 if domain and filterq:
958 a = self.attributes.alias('a')
959 included, excluded, opers = parse_filters(filterq)
962 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
963 subs = subs.where(a.c.domain == domain)
964 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
965 s = s.where(exists(subs))
968 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
969 subs = subs.where(a.c.domain == domain)
970 subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
971 s = s.where(not_(exists(subs)))
973 for k, o, val in opers:
975 subs = subs.where(a.c.serial == v.c.serial).correlate(v)
976 subs = subs.where(a.c.domain == domain)
977 subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
978 s = s.where(exists(subs))
980 s = s.order_by(n.c.path)
984 rp = self.conn.execute(s, start=start)
993 pappend = prefixes.append
995 mappend = matches.append
997 rp = self.conn.execute(s, start=start)
999 props = rp.fetchone()
1004 idx = path.find(delimiter, pfz)
1013 if idx + dz == len(path):
1016 continue # Get one more, in case there is a path.
1017 pf = path[:idx + dz]
1022 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
1025 return matches, prefixes
1027 def latest_uuid(self, uuid):
1028 """Return a (path, serial) tuple, for the latest version of the given uuid."""
1030 v = self.versions.alias('v')
1031 n = self.nodes.alias('n')
1032 s = select([n.c.path, v.c.serial])
1033 filtered = select([func.max(self.versions.c.serial)])
1034 s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
1035 s = s.where(n.c.node == v.c.node)
1037 r = self.conn.execute(s)