1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
40 from dbworker import DBWorker
44 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
49 def strnextling(prefix):
50 """Return the first unicode string
51 greater than but not starting with given prefix.
52 strnextling('hello') -> 'hellp'
55 ## all strings start with the null string,
56 ## therefore we have to approximate strnextling('')
57 ## with the last unicode character supported by python
58 ## 0x10ffff for wide (32-bit unicode) python builds
59 ## 0x00ffff for narrow (16-bit unicode) python builds
60 ## We will not autodetect. 0xffff is safe enough.
69 def strprevling(prefix):
70 """Return an approximation of the last unicode string
71 less than but not starting with given prefix.
72 strprevling(u'hello') -> u'helln\\xffff'
75 ## There is no prevling for the null string
80 #s += unichr(c-1) + unichr(0xffff)
98 """Nodes store path organization and have multiple versions.
99 Versions store object history and have multiple attributes.
100 Attributes store metadata.
103 # TODO: Provide an interface for included and excluded clusters.
105 def __init__(self, **params):
106 DBWorker.__init__(self, **params)
107 metadata = MetaData()
111 columns.append(Column('node', Integer, primary_key=True))
112 columns.append(Column('parent', Integer,
113 ForeignKey('nodes.node',
116 autoincrement=False))
117 columns.append(Column('path', String(2048), default='', nullable=False))
118 self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
119 # place an index on path
120 Index('idx_nodes_path', self.nodes.c.path)
124 columns.append(Column('node', Integer,
125 ForeignKey('nodes.node',
129 columns.append(Column('key', String(255), primary_key=True))
130 columns.append(Column('value', String(255)))
131 self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
133 #create statistics table
135 columns.append(Column('node', Integer,
136 ForeignKey('nodes.node',
140 columns.append(Column('population', Integer, nullable=False, default=0))
141 columns.append(Column('size', BigInteger, nullable=False, default=0))
142 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
143 columns.append(Column('cluster', Integer, nullable=False, default=0,
144 primary_key=True, autoincrement=False))
145 self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
147 #create versions table
149 columns.append(Column('serial', Integer, primary_key=True))
150 columns.append(Column('node', Integer,
151 ForeignKey('nodes.node',
153 onupdate='CASCADE')))
154 columns.append(Column('hash', String(255)))
155 columns.append(Column('size', BigInteger, nullable=False, default=0))
156 columns.append(Column('source', Integer))
157 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
158 columns.append(Column('muser', String(255), nullable=False, default=''))
159 columns.append(Column('cluster', Integer, nullable=False, default=0))
160 self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
161 Index('idx_versions_node_mtime', self.versions.c.node,
162 self.versions.c.mtime)
164 #create attributes table
166 columns.append(Column('serial', Integer,
167 ForeignKey('versions.serial',
171 columns.append(Column('key', String(255), primary_key=True))
172 columns.append(Column('value', String(255)))
173 self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
175 metadata.create_all(self.engine)
177 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
178 self.nodes.c.parent == ROOTNODE))
179 rp = self.conn.execute(s)
183 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
186 def node_create(self, parent, path):
187 """Create a new node from the given properties.
188 Return the node identifier of the new node.
190 #TODO catch IntegrityError?
191 s = self.nodes.insert().values(parent=parent, path=path)
192 r = self.conn.execute(s)
193 inserted_primary_key = r.inserted_primary_key[0]
195 return inserted_primary_key
197 def node_lookup(self, path):
198 """Lookup the current node of the given path.
199 Return None if the path is not found.
202 s = select([self.nodes.c.node], self.nodes.c.path == path)
203 r = self.conn.execute(s)
210 def node_get_properties(self, node):
211 """Return the node's (parent, path).
212 Return None if the node is not found.
215 s = select([self.nodes.c.parent, self.nodes.c.path])
216 s = s.where(self.nodes.c.node == node)
217 r = self.conn.execute(s)
222 def node_get_versions(self, node, keys=(), propnames=_propnames):
223 """Return the properties of all versions at node.
224 If keys is empty, return all properties in the order
225 (serial, node, size, source, mtime, muser, cluster).
228 s = select([self.versions.c.serial,
229 self.versions.c.node,
230 self.versions.c.hash,
231 self.versions.c.size,
232 self.versions.c.source,
233 self.versions.c.mtime,
234 self.versions.c.muser,
235 self.versions.c.cluster], self.versions.c.node == node)
236 s = s.order_by(self.versions.c.serial)
237 r = self.conn.execute(s)
246 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
248 def node_count_children(self, node):
249 """Return node's child count."""
251 s = select([func.count(self.nodes.c.node)])
252 s = s.where(and_(self.nodes.c.parent == node,
253 self.nodes.c.node != ROOTNODE))
254 r = self.conn.execute(s)
259 def node_purge_children(self, parent, before=inf, cluster=0):
260 """Delete all versions with the specified
261 parent and cluster, and return
262 the serials of versions deleted.
263 Clears out nodes with no remaining versions.
266 c1 = select([self.nodes.c.node],
267 self.nodes.c.parent == parent)
268 where_clause = and_(self.versions.c.node.in_(c1),
269 self.versions.c.cluster == cluster)
270 s = select([func.count(self.versions.c.serial),
271 func.sum(self.versions.c.size)])
272 s = s.where(where_clause)
274 s = s.where(self.versions.c.mtime <= before)
275 r = self.conn.execute(s)
280 nr, size = row[0], -row[1] if row[1] else 0
282 self.statistics_update(parent, -nr, size, mtime, cluster)
283 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
285 s = select([self.versions.c.serial])
286 s = s.where(where_clause)
287 r = self.conn.execute(s)
288 serials = [row[SERIAL] for row in r.fetchall()]
292 s = self.versions.delete().where(where_clause)
293 r = self.conn.execute(s)
297 s = select([self.nodes.c.node],
298 and_(self.nodes.c.parent == parent,
299 select([func.count(self.versions.c.serial)],
300 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
301 rp = self.conn.execute(s)
302 nodes = [r[0] for r in rp.fetchall()]
304 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
305 self.conn.execute(s).close()
309 def node_purge(self, node, before=inf, cluster=0):
310 """Delete all versions with the specified
311 node and cluster, and return
312 the serials of versions deleted.
313 Clears out the node if it has no remaining versions.
317 s = select([func.count(self.versions.c.serial),
318 func.sum(self.versions.c.size)])
319 where_clause = and_(self.versions.c.node == node,
320 self.versions.c.cluster == cluster)
321 s = s.where(where_clause)
323 s = s.where(self.versions.c.mtime <= before)
324 r = self.conn.execute(s)
326 nr, size = row[0], row[1]
331 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
333 s = select([self.versions.c.serial])
334 s = s.where(where_clause)
335 r = self.conn.execute(s)
336 serials = [r[SERIAL] for r in r.fetchall()]
340 s = self.versions.delete().where(where_clause)
341 r = self.conn.execute(s)
345 s = select([self.nodes.c.node],
346 and_(self.nodes.c.node == node,
347 select([func.count(self.versions.c.serial)],
348 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
349 r = self.conn.execute(s)
352 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
353 self.conn.execute(s).close()
357 def node_remove(self, node):
358 """Remove the node specified.
359 Return false if the node has children or is not found.
362 if self.node_count_children(node):
366 s = select([func.count(self.versions.c.serial),
367 func.sum(self.versions.c.size),
368 self.versions.c.cluster])
369 s = s.where(self.versions.c.node == node)
370 s = s.group_by(self.versions.c.cluster)
371 r = self.conn.execute(s)
372 for population, size, cluster in r.fetchall():
373 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
376 s = self.nodes.delete().where(self.nodes.c.node == node)
377 self.conn.execute(s).close()
380 def policy_get(self, node):
381 s = select([self.policies.c.key, self.policies.c.value],
382 self.policies.c.node==node)
383 r = self.conn.execute(s)
384 d = dict(r.fetchall())
388 def policy_set(self, node, policy):
390 for k, v in policy.iteritems():
391 s = self.policies.update().where(and_(self.policies.c.node == node,
392 self.policies.c.key == k))
393 s = s.values(value = v)
394 rp = self.conn.execute(s)
397 s = self.policies.insert()
398 values = {'node':node, 'key':k, 'value':v}
399 r = self.conn.execute(s, values)
402 def statistics_get(self, node, cluster=0):
403 """Return population, total size and last mtime
404 for all versions under node that belong to the cluster.
407 s = select([self.statistics.c.population,
408 self.statistics.c.size,
409 self.statistics.c.mtime])
410 s = s.where(and_(self.statistics.c.node == node,
411 self.statistics.c.cluster == cluster))
412 r = self.conn.execute(s)
417 def statistics_update(self, node, population, size, mtime, cluster=0):
418 """Update the statistics of the given node.
419 Statistics keep track the population, total
420 size of objects and mtime in the node's namespace.
421 May be zero or positive or negative numbers.
424 s = select([self.statistics.c.population, self.statistics.c.size],
425 and_(self.statistics.c.node == node,
426 self.statistics.c.cluster == cluster))
427 rp = self.conn.execute(s)
431 prepopulation, presize = (0, 0)
433 prepopulation, presize = r
434 population += prepopulation
439 u = self.statistics.update().where(and_(self.statistics.c.node==node,
440 self.statistics.c.cluster==cluster))
441 u = u.values(population=population, size=size, mtime=mtime)
442 rp = self.conn.execute(u)
445 ins = self.statistics.insert()
446 ins = ins.values(node=node, population=population, size=size,
447 mtime=mtime, cluster=cluster)
448 self.conn.execute(ins).close()
450 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
451 """Update the statistics of the given node's parent.
452 Then recursively update all parents up to the root.
453 Population is not recursive.
459 props = self.node_get_properties(node)
463 self.statistics_update(parent, population, size, mtime, cluster)
465 population = 0 # Population isn't recursive
467 def statistics_latest(self, node, before=inf, except_cluster=0):
468 """Return population, total size and last mtime
469 for all latest versions under node that
470 do not belong to the cluster.
474 props = self.node_get_properties(node)
479 # The latest version.
480 s = select([self.versions.c.serial,
481 self.versions.c.node,
482 self.versions.c.hash,
483 self.versions.c.size,
484 self.versions.c.source,
485 self.versions.c.mtime,
486 self.versions.c.muser,
487 self.versions.c.cluster])
488 filtered = select([func.max(self.versions.c.serial)],
489 self.versions.c.node == node)
491 filtered = filtered.where(self.versions.c.mtime < before)
492 s = s.where(and_(self.versions.c.cluster != except_cluster,
493 self.versions.c.serial == filtered))
494 r = self.conn.execute(s)
501 # First level, just under node (get population).
502 v = self.versions.alias('v')
503 s = select([func.count(v.c.serial),
505 func.max(v.c.mtime)])
506 c1 = select([func.max(self.versions.c.serial)])
508 c1 = c1.where(self.versions.c.mtime < before)
509 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
510 s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
511 v.c.cluster != except_cluster,
513 rp = self.conn.execute(s)
519 mtime = max(mtime, r[2])
523 # All children (get size and mtime).
524 # XXX: This is why the full path is stored.
525 s = select([func.count(v.c.serial),
527 func.max(v.c.mtime)])
528 c1 = select([func.max(self.versions.c.serial)],
529 self.versions.c.node == v.c.node)
531 c1 = c1.where(self.versions.c.mtime < before)
532 c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
533 s = s.where(and_(v.c.serial == c1,
534 v.c.cluster != except_cluster,
536 rp = self.conn.execute(s)
541 size = r[1] - props[SIZE]
542 mtime = max(mtime, r[2])
543 return (count, size, mtime)
545 def version_create(self, node, hash, size, source, muser, cluster=0):
546 """Create a new version from the given properties.
547 Return the (serial, mtime) of the new version.
551 s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
552 mtime=mtime, muser=muser, cluster=cluster)
553 serial = self.conn.execute(s).inserted_primary_key[0]
554 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
557 def version_lookup(self, node, before=inf, cluster=0):
558 """Lookup the current version of the given node.
559 Return a list with its properties:
560 (serial, node, hash, size, source, mtime, muser, cluster)
561 or None if the current version is not found in the given cluster.
564 v = self.versions.alias('v')
565 s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
566 v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
567 c = select([func.max(self.versions.c.serial)],
568 self.versions.c.node == node)
570 c = c.where(self.versions.c.mtime < before)
571 s = s.where(and_(v.c.serial == c,
572 v.c.cluster == cluster))
573 r = self.conn.execute(s)
580 def version_get_properties(self, serial, keys=(), propnames=_propnames):
581 """Return a sequence of values for the properties of
582 the version specified by serial and the keys, in the order given.
583 If keys is empty, return all properties in the order
584 (serial, node, hash, size, source, mtime, muser, cluster).
587 v = self.versions.alias()
588 s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
589 v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
590 rp = self.conn.execute(s)
598 return [r[propnames[k]] for k in keys if k in propnames]
600 def version_recluster(self, serial, cluster):
601 """Move the version into another cluster."""
603 props = self.version_get_properties(serial)
608 oldcluster = props[CLUSTER]
609 if cluster == oldcluster:
613 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
614 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
616 s = self.versions.update()
617 s = s.where(self.versions.c.serial == serial)
618 s = s.values(cluster = cluster)
619 self.conn.execute(s).close()
621 def version_remove(self, serial):
622 """Remove the serial specified."""
624 props = self.node_get_properties(serial)
629 cluster = props[CLUSTER]
632 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
634 s = self.versions.delete().where(self.versions.c.serial == serial)
635 self.conn.execute(s).close()
638 def attribute_get(self, serial, keys=()):
639 """Return a list of (key, value) pairs of the version specified by serial.
640 If keys is empty, return all attributes.
641 Othwerise, return only those specified.
645 attrs = self.attributes.alias()
646 s = select([attrs.c.key, attrs.c.value])
647 s = s.where(and_(attrs.c.key.in_(keys),
648 attrs.c.serial == serial))
650 attrs = self.attributes.alias()
651 s = select([attrs.c.key, attrs.c.value])
652 s = s.where(attrs.c.serial == serial)
653 r = self.conn.execute(s)
658 def attribute_set(self, serial, items):
659 """Set the attributes of the version specified by serial.
660 Receive attributes as an iterable of (key, value) pairs.
665 s = self.attributes.update()
666 s = s.where(and_(self.attributes.c.serial == serial,
667 self.attributes.c.key == k))
668 s = s.values(value = v)
669 rp = self.conn.execute(s)
672 s = self.attributes.insert()
673 s = s.values(serial=serial, key=k, value=v)
674 self.conn.execute(s).close()
676 def attribute_del(self, serial, keys=()):
677 """Delete attributes of the version specified by serial.
678 If keys is empty, delete all attributes.
679 Otherwise delete those specified.
683 #TODO more efficient way to do this?
685 s = self.attributes.delete()
686 s = s.where(and_(self.attributes.c.serial == serial,
687 self.attributes.c.key == key))
688 self.conn.execute(s).close()
690 s = self.attributes.delete()
691 s = s.where(self.attributes.c.serial == serial)
692 self.conn.execute(s).close()
694 def attribute_copy(self, source, dest):
695 s = select([dest, self.attributes.c.key, self.attributes.c.value],
696 self.attributes.c.serial == source)
697 rp = self.conn.execute(s)
698 attributes = rp.fetchall()
700 for dest, k, v in attributes:
702 s = self.attributes.update().where(and_(
703 self.attributes.c.serial == dest,
704 self.attributes.c.key == k))
705 rp = self.conn.execute(s, value=v)
708 s = self.attributes.insert()
709 values = {'serial':dest, 'key':k, 'value':v}
710 self.conn.execute(s, values).close()
712 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
713 """Return a list with all keys pairs defined
714 for all latest versions under parent that
715 do not belong to the cluster.
718 # TODO: Use another table to store before=inf results.
719 a = self.attributes.alias('a')
720 v = self.versions.alias('v')
721 n = self.nodes.alias('n')
722 s = select([a.c.key]).distinct()
723 filtered = select([func.max(self.versions.c.serial)])
725 filtered = filtered.where(self.versions.c.mtime < before)
726 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
727 s = s.where(v.c.cluster != except_cluster)
728 s = s.where(v.c.node.in_(select([self.nodes.c.node],
729 self.nodes.c.parent == parent)))
730 s = s.where(a.c.serial == v.c.serial)
731 s = s.where(n.c.node == v.c.node)
734 conj.append(n.c.path.like(x + '%'))
736 s = s.where(or_(*conj))
737 rp = self.conn.execute(s)
740 return [r[0] for r in rows]
742 def latest_version_list(self, parent, prefix='', delimiter=None,
743 start='', limit=10000, before=inf,
744 except_cluster=0, pathq=[], filterq=None):
745 """Return a (list of (path, serial) tuples, list of common prefixes)
746 for the current versions of the paths with the given parent,
747 matching the following criteria.
749 The property tuple for a version is returned if all
750 of these conditions are true:
756 c. path starts with prefix (and paths in pathq)
758 d. version is the max up to before
760 e. version is not in cluster
762 f. the path does not have the delimiter occuring
763 after the prefix, or ends with the delimiter
765 g. serial matches the attribute filter query.
767 A filter query is a comma-separated list of
768 terms in one of these three forms:
771 an attribute with this key must exist
774 an attribute with this key must not exist
777 the attribute with this key satisfies the value
778 where ?op is one of ==, != <=, >=, <, >.
780 The list of common prefixes includes the prefixes
781 matching up to the first delimiter after prefix,
782 and are reported only once, as "virtual directories".
783 The delimiter is included in the prefixes.
785 If arguments are None, then the corresponding matching rule
788 Limit applies to the first list of tuples returned.
791 if not start or start < prefix:
792 start = strprevling(prefix)
793 nextling = strnextling(prefix)
795 a = self.attributes.alias('a')
796 v = self.versions.alias('v')
797 n = self.nodes.alias('n')
798 s = select([n.c.path, v.c.serial]).distinct()
799 filtered = select([func.max(self.versions.c.serial)])
801 filtered = filtered.where(self.versions.c.mtime < before)
802 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
803 s = s.where(v.c.cluster != except_cluster)
804 s = s.where(v.c.node.in_(select([self.nodes.c.node],
805 self.nodes.c.parent == parent)))
807 s = s.where(a.c.serial == v.c.serial)
809 s = s.where(n.c.node == v.c.node)
810 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
813 conj.append(n.c.path.like(x + '%'))
816 s = s.where(or_(*conj))
819 s = s.where(a.c.key.in_(filterq.split(',')))
821 s = s.order_by(n.c.path)
825 rp = self.conn.execute(s, start=start)
834 pappend = prefixes.append
836 mappend = matches.append
838 rp = self.conn.execute(s, start=start)
840 props = rp.fetchone()
844 idx = path.find(delimiter, pfz)
853 if idx + dz == len(path):
856 continue # Get one more, in case there is a path.
862 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
865 return matches, prefixes