1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.types import Text
37 from sqlalchemy.schema import Index, Sequence
38 from sqlalchemy.sql import func, and_, or_, null, select, bindparam, text
39 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.engine.reflection import Inspector
42 from dbworker import DBWorker
46 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
51 def strnextling(prefix):
52 """Return the first unicode string
53 greater than but not starting with given prefix.
54 strnextling('hello') -> 'hellp'
57 ## all strings start with the null string,
58 ## therefore we have to approximate strnextling('')
59 ## with the last unicode character supported by python
60 ## 0x10ffff for wide (32-bit unicode) python builds
61 ## 0x00ffff for narrow (16-bit unicode) python builds
62 ## We will not autodetect. 0xffff is safe enough.
71 def strprevling(prefix):
72 """Return an approximation of the last unicode string
73 less than but not starting with given prefix.
74 strprevling(u'hello') -> u'helln\\xffff'
77 ## There is no prevling for the null string
82 s += unichr(c-1) + unichr(0xffff)
99 """Nodes store path organization and have multiple versions.
100 Versions store object history and have multiple attributes.
101 Attributes store metadata.
104 # TODO: Provide an interface for included and excluded clusters.
106 def __init__(self, **params):
107 DBWorker.__init__(self, **params)
108 metadata = MetaData()
112 columns.append(Column('node', Integer, primary_key=True))
113 columns.append(Column('parent', Integer,
114 ForeignKey('nodes.node',
117 autoincrement=False))
119 columns.append(Column('path', String(path_length), default='', nullable=False))
120 self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
124 columns.append(Column('node', Integer,
125 ForeignKey('nodes.node',
129 columns.append(Column('key', String(255), primary_key=True))
130 columns.append(Column('value', String(255)))
131 self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
133 #create statistics table
135 columns.append(Column('node', Integer,
136 ForeignKey('nodes.node',
140 columns.append(Column('population', Integer, nullable=False, default=0))
141 columns.append(Column('size', BigInteger, nullable=False, default=0))
142 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
143 columns.append(Column('cluster', Integer, nullable=False, default=0,
144 primary_key=True, autoincrement=False))
145 self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
147 #create versions table
149 columns.append(Column('serial', Integer, primary_key=True))
150 columns.append(Column('node', Integer,
151 ForeignKey('nodes.node',
153 onupdate='CASCADE')))
154 columns.append(Column('hash', String(255)))
155 columns.append(Column('size', BigInteger, nullable=False, default=0))
156 columns.append(Column('source', Integer))
157 columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
158 columns.append(Column('muser', String(255), nullable=False, default=''))
159 columns.append(Column('cluster', Integer, nullable=False, default=0))
160 self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
161 Index('idx_versions_node_mtime', self.versions.c.node,
162 self.versions.c.mtime)
164 #create attributes table
166 columns.append(Column('serial', Integer,
167 ForeignKey('versions.serial',
171 columns.append(Column('key', String(255), primary_key=True))
172 columns.append(Column('value', String(255)))
173 self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
175 metadata.create_all(self.engine)
177 # the following code creates an index of specific length
178 # this can be accompliced in sqlalchemy >= 0.7.3
179 # providing mysql_length option during index creation
180 insp = Inspector.from_engine(self.engine)
181 indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
182 if 'idx_nodes_path' not in indexes:
183 explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
184 s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
185 self.conn.execute(s).close()
187 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
188 self.nodes.c.parent == ROOTNODE))
189 rp = self.conn.execute(s)
193 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
196 def node_create(self, parent, path):
197 """Create a new node from the given properties.
198 Return the node identifier of the new node.
200 #TODO catch IntegrityError?
201 s = self.nodes.insert().values(parent=parent, path=path)
202 r = self.conn.execute(s)
203 inserted_primary_key = r.inserted_primary_key[0]
205 return inserted_primary_key
207 def node_lookup(self, path):
208 """Lookup the current node of the given path.
209 Return None if the path is not found.
212 # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
213 s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
214 r = self.conn.execute(s)
221 def node_get_properties(self, node):
222 """Return the node's (parent, path).
223 Return None if the node is not found.
226 s = select([self.nodes.c.parent, self.nodes.c.path])
227 s = s.where(self.nodes.c.node == node)
228 r = self.conn.execute(s)
233 def node_get_versions(self, node, keys=(), propnames=_propnames):
234 """Return the properties of all versions at node.
235 If keys is empty, return all properties in the order
236 (serial, node, size, source, mtime, muser, cluster).
239 s = select([self.versions.c.serial,
240 self.versions.c.node,
241 self.versions.c.hash,
242 self.versions.c.size,
243 self.versions.c.source,
244 self.versions.c.mtime,
245 self.versions.c.muser,
246 self.versions.c.cluster], self.versions.c.node == node)
247 s = s.order_by(self.versions.c.serial)
248 r = self.conn.execute(s)
257 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
259 def node_count_children(self, node):
260 """Return node's child count."""
262 s = select([func.count(self.nodes.c.node)])
263 s = s.where(and_(self.nodes.c.parent == node,
264 self.nodes.c.node != ROOTNODE))
265 r = self.conn.execute(s)
270 def node_purge_children(self, parent, before=inf, cluster=0):
271 """Delete all versions with the specified
272 parent and cluster, and return
273 the hashes of versions deleted.
274 Clears out nodes with no remaining versions.
277 c1 = select([self.nodes.c.node],
278 self.nodes.c.parent == parent)
279 where_clause = and_(self.versions.c.node.in_(c1),
280 self.versions.c.cluster == cluster)
281 s = select([func.count(self.versions.c.serial),
282 func.sum(self.versions.c.size)])
283 s = s.where(where_clause)
285 s = s.where(self.versions.c.mtime <= before)
286 r = self.conn.execute(s)
291 nr, size = row[0], -row[1] if row[1] else 0
293 self.statistics_update(parent, -nr, size, mtime, cluster)
294 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
296 s = select([self.versions.c.hash])
297 s = s.where(where_clause)
298 r = self.conn.execute(s)
299 hashes = [row[0] for row in r.fetchall()]
303 s = self.versions.delete().where(where_clause)
304 r = self.conn.execute(s)
308 s = select([self.nodes.c.node],
309 and_(self.nodes.c.parent == parent,
310 select([func.count(self.versions.c.serial)],
311 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
312 rp = self.conn.execute(s)
313 nodes = [r[0] for r in rp.fetchall()]
315 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
316 self.conn.execute(s).close()
320 def node_purge(self, node, before=inf, cluster=0):
321 """Delete all versions with the specified
322 node and cluster, and return
323 the hashes of versions deleted.
324 Clears out the node if it has no remaining versions.
328 s = select([func.count(self.versions.c.serial),
329 func.sum(self.versions.c.size)])
330 where_clause = and_(self.versions.c.node == node,
331 self.versions.c.cluster == cluster)
332 s = s.where(where_clause)
334 s = s.where(self.versions.c.mtime <= before)
335 r = self.conn.execute(s)
337 nr, size = row[0], row[1]
342 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
344 s = select([self.versions.c.hash])
345 s = s.where(where_clause)
346 r = self.conn.execute(s)
347 hashes = [r[0] for r in r.fetchall()]
351 s = self.versions.delete().where(where_clause)
352 r = self.conn.execute(s)
356 s = select([self.nodes.c.node],
357 and_(self.nodes.c.node == node,
358 select([func.count(self.versions.c.serial)],
359 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
360 r = self.conn.execute(s)
363 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
364 self.conn.execute(s).close()
368 def node_remove(self, node):
369 """Remove the node specified.
370 Return false if the node has children or is not found.
373 if self.node_count_children(node):
377 s = select([func.count(self.versions.c.serial),
378 func.sum(self.versions.c.size),
379 self.versions.c.cluster])
380 s = s.where(self.versions.c.node == node)
381 s = s.group_by(self.versions.c.cluster)
382 r = self.conn.execute(s)
383 for population, size, cluster in r.fetchall():
384 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
387 s = self.nodes.delete().where(self.nodes.c.node == node)
388 self.conn.execute(s).close()
391 def policy_get(self, node):
392 s = select([self.policies.c.key, self.policies.c.value],
393 self.policies.c.node==node)
394 r = self.conn.execute(s)
395 d = dict(r.fetchall())
399 def policy_set(self, node, policy):
401 for k, v in policy.iteritems():
402 s = self.policies.update().where(and_(self.policies.c.node == node,
403 self.policies.c.key == k))
404 s = s.values(value = v)
405 rp = self.conn.execute(s)
408 s = self.policies.insert()
409 values = {'node':node, 'key':k, 'value':v}
410 r = self.conn.execute(s, values)
413 def statistics_get(self, node, cluster=0):
414 """Return population, total size and last mtime
415 for all versions under node that belong to the cluster.
418 s = select([self.statistics.c.population,
419 self.statistics.c.size,
420 self.statistics.c.mtime])
421 s = s.where(and_(self.statistics.c.node == node,
422 self.statistics.c.cluster == cluster))
423 r = self.conn.execute(s)
428 def statistics_update(self, node, population, size, mtime, cluster=0):
429 """Update the statistics of the given node.
430 Statistics keep track the population, total
431 size of objects and mtime in the node's namespace.
432 May be zero or positive or negative numbers.
434 s = select([self.statistics.c.population, self.statistics.c.size],
435 and_(self.statistics.c.node == node,
436 self.statistics.c.cluster == cluster))
437 rp = self.conn.execute(s)
441 prepopulation, presize = (0, 0)
443 prepopulation, presize = r
444 population += prepopulation
449 u = self.statistics.update().where(and_(self.statistics.c.node==node,
450 self.statistics.c.cluster==cluster))
451 u = u.values(population=population, size=size, mtime=mtime)
452 rp = self.conn.execute(u)
455 ins = self.statistics.insert()
456 ins = ins.values(node=node, population=population, size=size,
457 mtime=mtime, cluster=cluster)
458 self.conn.execute(ins).close()
460 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
461 """Update the statistics of the given node's parent.
462 Then recursively update all parents up to the root.
463 Population is not recursive.
469 props = self.node_get_properties(node)
473 self.statistics_update(parent, population, size, mtime, cluster)
475 population = 0 # Population isn't recursive
477 def statistics_latest(self, node, before=inf, except_cluster=0):
478 """Return population, total size and last mtime
479 for all latest versions under node that
480 do not belong to the cluster.
484 props = self.node_get_properties(node)
489 # The latest version.
490 s = select([self.versions.c.serial,
491 self.versions.c.node,
492 self.versions.c.hash,
493 self.versions.c.size,
494 self.versions.c.source,
495 self.versions.c.mtime,
496 self.versions.c.muser,
497 self.versions.c.cluster])
498 filtered = select([func.max(self.versions.c.serial)],
499 self.versions.c.node == node)
501 filtered = filtered.where(self.versions.c.mtime < before)
502 s = s.where(and_(self.versions.c.cluster != except_cluster,
503 self.versions.c.serial == filtered))
504 r = self.conn.execute(s)
511 # First level, just under node (get population).
512 v = self.versions.alias('v')
513 s = select([func.count(v.c.serial),
515 func.max(v.c.mtime)])
516 c1 = select([func.max(self.versions.c.serial)])
518 c1 = c1.where(self.versions.c.mtime < before)
519 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
520 s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
521 v.c.cluster != except_cluster,
523 rp = self.conn.execute(s)
529 mtime = max(mtime, r[2])
533 # All children (get size and mtime).
534 # XXX: This is why the full path is stored.
535 s = select([func.count(v.c.serial),
537 func.max(v.c.mtime)])
538 c1 = select([func.max(self.versions.c.serial)],
539 self.versions.c.node == v.c.node)
541 c1 = c1.where(self.versions.c.mtime < before)
542 c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
543 s = s.where(and_(v.c.serial == c1,
544 v.c.cluster != except_cluster,
546 rp = self.conn.execute(s)
551 size = r[1] - props[SIZE]
552 mtime = max(mtime, r[2])
553 return (count, size, mtime)
555 def version_create(self, node, hash, size, source, muser, cluster=0):
556 """Create a new version from the given properties.
557 Return the (serial, mtime) of the new version.
561 s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
562 mtime=mtime, muser=muser, cluster=cluster)
563 serial = self.conn.execute(s).inserted_primary_key[0]
564 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
567 def version_lookup(self, node, before=inf, cluster=0):
568 """Lookup the current version of the given node.
569 Return a list with its properties:
570 (serial, node, hash, size, source, mtime, muser, cluster)
571 or None if the current version is not found in the given cluster.
574 v = self.versions.alias('v')
575 s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
576 v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
577 c = select([func.max(self.versions.c.serial)],
578 self.versions.c.node == node)
580 c = c.where(self.versions.c.mtime < before)
581 s = s.where(and_(v.c.serial == c,
582 v.c.cluster == cluster))
583 r = self.conn.execute(s)
590 def version_get_properties(self, serial, keys=(), propnames=_propnames):
591 """Return a sequence of values for the properties of
592 the version specified by serial and the keys, in the order given.
593 If keys is empty, return all properties in the order
594 (serial, node, hash, size, source, mtime, muser, cluster).
597 v = self.versions.alias()
598 s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
599 v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
600 rp = self.conn.execute(s)
608 return [r[propnames[k]] for k in keys if k in propnames]
610 def version_recluster(self, serial, cluster):
611 """Move the version into another cluster."""
613 props = self.version_get_properties(serial)
618 oldcluster = props[CLUSTER]
619 if cluster == oldcluster:
623 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
624 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
626 s = self.versions.update()
627 s = s.where(self.versions.c.serial == serial)
628 s = s.values(cluster = cluster)
629 self.conn.execute(s).close()
631 def version_remove(self, serial):
632 """Remove the serial specified."""
634 props = self.version_get_properties(serial)
640 cluster = props[CLUSTER]
643 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
645 s = self.versions.delete().where(self.versions.c.serial == serial)
646 self.conn.execute(s).close()
649 def attribute_get(self, serial, keys=()):
650 """Return a list of (key, value) pairs of the version specified by serial.
651 If keys is empty, return all attributes.
652 Othwerise, return only those specified.
656 attrs = self.attributes.alias()
657 s = select([attrs.c.key, attrs.c.value])
658 s = s.where(and_(attrs.c.key.in_(keys),
659 attrs.c.serial == serial))
661 attrs = self.attributes.alias()
662 s = select([attrs.c.key, attrs.c.value])
663 s = s.where(attrs.c.serial == serial)
664 r = self.conn.execute(s)
669 def attribute_set(self, serial, items):
670 """Set the attributes of the version specified by serial.
671 Receive attributes as an iterable of (key, value) pairs.
676 s = self.attributes.update()
677 s = s.where(and_(self.attributes.c.serial == serial,
678 self.attributes.c.key == k))
679 s = s.values(value = v)
680 rp = self.conn.execute(s)
683 s = self.attributes.insert()
684 s = s.values(serial=serial, key=k, value=v)
685 self.conn.execute(s).close()
687 def attribute_del(self, serial, keys=()):
688 """Delete attributes of the version specified by serial.
689 If keys is empty, delete all attributes.
690 Otherwise delete those specified.
694 #TODO more efficient way to do this?
696 s = self.attributes.delete()
697 s = s.where(and_(self.attributes.c.serial == serial,
698 self.attributes.c.key == key))
699 self.conn.execute(s).close()
701 s = self.attributes.delete()
702 s = s.where(self.attributes.c.serial == serial)
703 self.conn.execute(s).close()
705 def attribute_copy(self, source, dest):
706 s = select([dest, self.attributes.c.key, self.attributes.c.value],
707 self.attributes.c.serial == source)
708 rp = self.conn.execute(s)
709 attributes = rp.fetchall()
711 for dest, k, v in attributes:
713 s = self.attributes.update().where(and_(
714 self.attributes.c.serial == dest,
715 self.attributes.c.key == k))
716 rp = self.conn.execute(s, value=v)
719 s = self.attributes.insert()
720 values = {'serial':dest, 'key':k, 'value':v}
721 self.conn.execute(s, values).close()
723 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
724 """Return a list with all keys pairs defined
725 for all latest versions under parent that
726 do not belong to the cluster.
729 # TODO: Use another table to store before=inf results.
730 a = self.attributes.alias('a')
731 v = self.versions.alias('v')
732 n = self.nodes.alias('n')
733 s = select([a.c.key]).distinct()
734 filtered = select([func.max(self.versions.c.serial)])
736 filtered = filtered.where(self.versions.c.mtime < before)
737 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
738 s = s.where(v.c.cluster != except_cluster)
739 s = s.where(v.c.node.in_(select([self.nodes.c.node],
740 self.nodes.c.parent == parent)))
741 s = s.where(a.c.serial == v.c.serial)
742 s = s.where(n.c.node == v.c.node)
745 conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
747 s = s.where(or_(*conj))
748 rp = self.conn.execute(s)
751 return [r[0] for r in rows]
753 def latest_version_list(self, parent, prefix='', delimiter=None,
754 start='', limit=10000, before=inf,
755 except_cluster=0, pathq=[], filterq=None):
756 """Return a (list of (path, serial) tuples, list of common prefixes)
757 for the current versions of the paths with the given parent,
758 matching the following criteria.
760 The property tuple for a version is returned if all
761 of these conditions are true:
767 c. path starts with prefix (and paths in pathq)
769 d. version is the max up to before
771 e. version is not in cluster
773 f. the path does not have the delimiter occuring
774 after the prefix, or ends with the delimiter
776 g. serial matches the attribute filter query.
778 A filter query is a comma-separated list of
779 terms in one of these three forms:
782 an attribute with this key must exist
785 an attribute with this key must not exist
788 the attribute with this key satisfies the value
789 where ?op is one of ==, != <=, >=, <, >.
791 The list of common prefixes includes the prefixes
792 matching up to the first delimiter after prefix,
793 and are reported only once, as "virtual directories".
794 The delimiter is included in the prefixes.
796 If arguments are None, then the corresponding matching rule
799 Limit applies to the first list of tuples returned.
802 if not start or start < prefix:
803 start = strprevling(prefix)
804 nextling = strnextling(prefix)
806 a = self.attributes.alias('a')
807 v = self.versions.alias('v')
808 n = self.nodes.alias('n')
809 s = select([n.c.path, v.c.serial]).distinct()
810 filtered = select([func.max(self.versions.c.serial)])
812 filtered = filtered.where(self.versions.c.mtime < before)
813 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
814 s = s.where(v.c.cluster != except_cluster)
815 s = s.where(v.c.node.in_(select([self.nodes.c.node],
816 self.nodes.c.parent == parent)))
818 s = s.where(a.c.serial == v.c.serial)
820 s = s.where(n.c.node == v.c.node)
821 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
824 conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
827 s = s.where(or_(*conj))
830 s = s.where(a.c.key.in_(filterq.split(',')))
832 s = s.order_by(n.c.path)
836 rp = self.conn.execute(s, start=start)
845 pappend = prefixes.append
847 mappend = matches.append
849 rp = self.conn.execute(s, start=start)
851 props = rp.fetchone()
855 idx = path.find(delimiter, pfz)
864 if idx + dz == len(path):
867 continue # Get one more, in case there is a path.
873 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
876 return matches, prefixes