1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
40 from dbworker import DBWorker
44 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
49 def strnextling(prefix):
50 """Return the first unicode string
51 greater than but not starting with given prefix.
52 strnextling('hello') -> 'hellp'
55 ## all strings start with the null string,
56 ## therefore we have to approximate strnextling('')
57 ## with the last unicode character supported by python
58 ## 0x10ffff for wide (32-bit unicode) python builds
59 ## 0x00ffff for narrow (16-bit unicode) python builds
60 ## We will not autodetect. 0xffff is safe enough.
69 def strprevling(prefix):
70 """Return an approximation of the last unicode string
71 less than but not starting with given prefix.
72 strprevling(u'hello') -> u'helln\\xffff'
75 ## There is no prevling for the null string
80 #s += unichr(c-1) + unichr(0xffff)
98 """Nodes store path organization and have multiple versions.
99 Versions store object history and have multiple attributes.
100 Attributes store metadata.
103 # TODO: Provide an interface for included and excluded clusters.
105 def __init__(self, **params):
106 DBWorker.__init__(self, **params)
107 metadata = MetaData()
111 columns.append(Column('node', Integer, primary_key=True))
112 columns.append(Column('parent', Integer,
113 ForeignKey('nodes.node',
116 autoincrement=False))
117 columns.append(Column('path', String(2048), default='', nullable=False))
118 self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
119 # place an index on path
120 Index('idx_nodes_path', self.nodes.c.path)
122 #create statistics table
124 columns.append(Column('node', Integer,
125 ForeignKey('nodes.node',
129 columns.append(Column('population', Integer, nullable=False, default=0))
130 columns.append(Column('size', BigInteger, nullable=False, default=0))
131 columns.append(Column('mtime', DECIMAL))
132 columns.append(Column('cluster', Integer, nullable=False, default=0,
133 primary_key=True, autoincrement=False))
134 self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
136 #create versions table
138 columns.append(Column('serial', Integer, primary_key=True))
139 columns.append(Column('node', Integer,
140 ForeignKey('nodes.node',
142 onupdate='CASCADE')))
143 columns.append(Column('hash', String(255)))
144 columns.append(Column('size', BigInteger, nullable=False, default=0))
145 columns.append(Column('source', Integer))
146 columns.append(Column('mtime', DECIMAL))
147 columns.append(Column('muser', String(255), nullable=False, default=''))
148 columns.append(Column('cluster', Integer, nullable=False, default=0))
149 self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
150 Index('idx_versions_node_mtime', self.versions.c.node,
151 self.versions.c.mtime)
153 #create attributes table
155 columns.append(Column('serial', Integer,
156 ForeignKey('versions.serial',
160 columns.append(Column('key', String(255), primary_key=True))
161 columns.append(Column('value', String(255)))
162 self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
164 metadata.create_all(self.engine)
166 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
167 self.nodes.c.parent == ROOTNODE))
168 rp = self.conn.execute(s)
172 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
175 def node_create(self, parent, path):
176 """Create a new node from the given properties.
177 Return the node identifier of the new node.
179 #TODO catch IntegrityError?
180 s = self.nodes.insert().values(parent=parent, path=path)
181 r = self.conn.execute(s)
182 inserted_primary_key = r.inserted_primary_key[0]
184 return inserted_primary_key
186 def node_lookup(self, path):
187 """Lookup the current node of the given path.
188 Return None if the path is not found.
191 s = select([self.nodes.c.node], self.nodes.c.path == path)
192 r = self.conn.execute(s)
199 def node_get_properties(self, node):
200 """Return the node's (parent, path).
201 Return None if the node is not found.
204 s = select([self.nodes.c.parent, self.nodes.c.path])
205 s = s.where(self.nodes.c.node == node)
206 r = self.conn.execute(s)
211 def node_get_versions(self, node, keys=(), propnames=_propnames):
212 """Return the properties of all versions at node.
213 If keys is empty, return all properties in the order
214 (serial, node, size, source, mtime, muser, cluster).
217 s = select([self.versions.c.serial,
218 self.versions.c.node,
219 self.versions.c.hash,
220 self.versions.c.size,
221 self.versions.c.source,
222 self.versions.c.mtime,
223 self.versions.c.muser,
224 self.versions.c.cluster], self.versions.c.node == node)
225 s = s.order_by(self.versions.c.serial)
226 r = self.conn.execute(s)
235 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
237 def node_count_children(self, node):
238 """Return node's child count."""
240 s = select([func.count(self.nodes.c.node)])
241 s = s.where(and_(self.nodes.c.parent == node,
242 self.nodes.c.node != ROOTNODE))
243 r = self.conn.execute(s)
248 def node_purge_children(self, parent, before=inf, cluster=0):
249 """Delete all versions with the specified
250 parent and cluster, and return
251 the serials of versions deleted.
252 Clears out nodes with no remaining versions.
255 c1 = select([self.nodes.c.node],
256 self.nodes.c.parent == parent)
257 where_clause = and_(self.versions.c.node.in_(c1),
258 self.versions.c.cluster == cluster)
259 s = select([func.count(self.versions.c.serial),
260 func.sum(self.versions.c.size)])
261 s = s.where(where_clause)
263 s = s.where(self.versions.c.mtime <= before)
264 r = self.conn.execute(s)
269 nr, size = row[0], -row[1] if row[1] else 0
271 self.statistics_update(parent, -nr, size, mtime, cluster)
272 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
274 s = select([self.versions.c.serial])
275 s = s.where(where_clause)
276 r = self.conn.execute(s)
277 serials = [row[SERIAL] for row in r.fetchall()]
281 s = self.versions.delete().where(where_clause)
282 r = self.conn.execute(s)
286 s = select([self.nodes.c.node],
287 and_(self.nodes.c.parent == parent,
288 select([func.count(self.versions.c.serial)],
289 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
290 rp = self.conn.execute(s)
291 nodes = [r[0] for r in rp.fetchall()]
293 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
294 self.conn.execute(s).close()
298 def node_purge(self, node, before=inf, cluster=0):
299 """Delete all versions with the specified
300 node and cluster, and return
301 the serials of versions deleted.
302 Clears out the node if it has no remaining versions.
306 s = select([func.count(self.versions.c.serial),
307 func.sum(self.versions.c.size)])
308 where_clause = and_(self.versions.c.node == node,
309 self.versions.c.cluster == cluster)
310 s = s.where(where_clause)
312 s = s.where(self.versions.c.mtime <= before)
313 r = self.conn.execute(s)
315 nr, size = row[0], row[1]
320 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
322 s = select([self.versions.c.serial])
323 s = s.where(where_clause)
324 r = self.conn.execute(s)
325 serials = [r[SERIAL] for r in r.fetchall()]
329 s = self.versions.delete().where(where_clause)
330 r = self.conn.execute(s)
334 s = select([self.nodes.c.node],
335 and_(self.nodes.c.node == node,
336 select([func.count(self.versions.c.serial)],
337 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
338 r = self.conn.execute(s)
341 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
342 self.conn.execute(s).close()
346 def node_remove(self, node):
347 """Remove the node specified.
348 Return false if the node has children or is not found.
351 if self.node_count_children(node):
355 s = select([func.count(self.versions.c.serial),
356 func.sum(self.versions.c.size),
357 self.versions.c.cluster])
358 s = s.where(self.versions.c.node == node)
359 s = s.group_by(self.versions.c.cluster)
360 r = self.conn.execute(s)
361 for population, size, cluster in r.fetchall():
362 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
365 s = self.nodes.delete().where(self.nodes.c.node == node)
366 self.conn.execute(s).close()
369 def statistics_get(self, node, cluster=0):
370 """Return population, total size and last mtime
371 for all versions under node that belong to the cluster.
374 s = select([self.statistics.c.population,
375 self.statistics.c.size,
376 self.statistics.c.mtime])
377 s = s.where(and_(self.statistics.c.node == node,
378 self.statistics.c.cluster == cluster))
379 r = self.conn.execute(s)
384 def statistics_update(self, node, population, size, mtime, cluster=0):
385 """Update the statistics of the given node.
386 Statistics keep track the population, total
387 size of objects and mtime in the node's namespace.
388 May be zero or positive or negative numbers.
391 s = select([self.statistics.c.population, self.statistics.c.size],
392 and_(self.statistics.c.node == node,
393 self.statistics.c.cluster == cluster))
394 rp = self.conn.execute(s)
398 prepopulation, presize = (0, 0)
400 prepopulation, presize = r
401 population += prepopulation
406 u = self.statistics.update().where(and_(self.statistics.c.node==node,
407 self.statistics.c.cluster==cluster))
408 u = u.values(population=population, size=size, mtime=mtime)
409 rp = self.conn.execute(u)
412 ins = self.statistics.insert()
413 ins = ins.values(node=node, population=population, size=size,
414 mtime=mtime, cluster=cluster)
415 self.conn.execute(ins).close()
417 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
418 """Update the statistics of the given node's parent.
419 Then recursively update all parents up to the root.
420 Population is not recursive.
426 props = self.node_get_properties(node)
430 self.statistics_update(parent, population, size, mtime, cluster)
432 population = 0 # Population isn't recursive
434 def statistics_latest(self, node, before=inf, except_cluster=0):
435 """Return population, total size and last mtime
436 for all latest versions under node that
437 do not belong to the cluster.
441 props = self.node_get_properties(node)
446 # The latest version.
447 s = select([self.versions.c.serial,
448 self.versions.c.node,
449 self.versions.c.hash,
450 self.versions.c.size,
451 self.versions.c.source,
452 self.versions.c.mtime,
453 self.versions.c.muser,
454 self.versions.c.cluster])
455 filtered = select([func.max(self.versions.c.serial)],
456 self.versions.c.node == node)
458 filtered = filtered.where(self.versions.c.mtime < before)
459 s = s.where(and_(self.versions.c.cluster != except_cluster,
460 self.versions.c.serial == filtered))
461 r = self.conn.execute(s)
468 # First level, just under node (get population).
469 v = self.versions.alias('v')
470 s = select([func.count(v.c.serial),
472 func.max(v.c.mtime)])
473 c1 = select([func.max(self.versions.c.serial)])
475 c1 = c1.where(self.versions.c.mtime < before)
476 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
477 s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
478 v.c.cluster != except_cluster,
480 rp = self.conn.execute(s)
486 mtime = max(mtime, r[2])
490 # All children (get size and mtime).
491 # XXX: This is why the full path is stored.
492 s = select([func.count(v.c.serial),
494 func.max(v.c.mtime)])
495 c1 = select([func.max(self.versions.c.serial)],
496 self.versions.c.node == v.c.node)
498 c1 = c1.where(self.versions.c.mtime < before)
499 c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
500 s = s.where(and_(v.c.serial == c1,
501 v.c.cluster != except_cluster,
503 rp = self.conn.execute(s)
508 size = r[1] - props[SIZE]
509 mtime = max(mtime, r[2])
510 return (count, size, mtime)
512 def version_create(self, node, hash, size, source, muser, cluster=0):
513 """Create a new version from the given properties.
514 Return the (serial, mtime) of the new version.
518 s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
519 mtime=mtime, muser=muser, cluster=cluster)
520 serial = self.conn.execute(s).inserted_primary_key[0]
521 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
524 def version_lookup(self, node, before=inf, cluster=0):
525 """Lookup the current version of the given node.
526 Return a list with its properties:
527 (serial, node, hash, size, source, mtime, muser, cluster)
528 or None if the current version is not found in the given cluster.
531 v = self.versions.alias('v')
532 s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
533 v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
534 c = select([func.max(self.versions.c.serial)],
535 self.versions.c.node == node)
537 c = c.where(self.versions.c.mtime < before)
538 s = s.where(and_(v.c.serial == c,
539 v.c.cluster == cluster))
540 r = self.conn.execute(s)
547 def version_get_properties(self, serial, keys=(), propnames=_propnames):
548 """Return a sequence of values for the properties of
549 the version specified by serial and the keys, in the order given.
550 If keys is empty, return all properties in the order
551 (serial, node, hash, size, source, mtime, muser, cluster).
554 v = self.versions.alias()
555 s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
556 v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
557 rp = self.conn.execute(s)
565 return [r[propnames[k]] for k in keys if k in propnames]
567 def version_recluster(self, serial, cluster):
568 """Move the version into another cluster."""
570 props = self.version_get_properties(serial)
575 oldcluster = props[CLUSTER]
576 if cluster == oldcluster:
580 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
581 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
583 s = self.versions.update()
584 s = s.where(self.versions.c.serial == serial)
585 s = s.values(cluster = cluster)
586 self.conn.execute(s).close()
588 def version_remove(self, serial):
589 """Remove the serial specified."""
591 props = self.node_get_properties(serial)
596 cluster = props[CLUSTER]
599 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
601 s = self.versions.delete().where(self.versions.c.serial == serial)
602 self.conn.execute(s).close()
605 def attribute_get(self, serial, keys=()):
606 """Return a list of (key, value) pairs of the version specified by serial.
607 If keys is empty, return all attributes.
608 Othwerise, return only those specified.
612 attrs = self.attributes.alias()
613 s = select([attrs.c.key, attrs.c.value])
614 s = s.where(and_(attrs.c.key.in_(keys),
615 attrs.c.serial == serial))
617 attrs = self.attributes.alias()
618 s = select([attrs.c.key, attrs.c.value])
619 s = s.where(attrs.c.serial == serial)
620 r = self.conn.execute(s)
625 def attribute_set(self, serial, items):
626 """Set the attributes of the version specified by serial.
627 Receive attributes as an iterable of (key, value) pairs.
632 s = self.attributes.update()
633 s = s.where(and_(self.attributes.c.serial == serial,
634 self.attributes.c.key == k))
635 s = s.values(value = v)
636 rp = self.conn.execute(s)
639 s = self.attributes.insert()
640 s = s.values(serial=serial, key=k, value=v)
641 self.conn.execute(s).close()
643 def attribute_del(self, serial, keys=()):
644 """Delete attributes of the version specified by serial.
645 If keys is empty, delete all attributes.
646 Otherwise delete those specified.
650 #TODO more efficient way to do this?
652 s = self.attributes.delete()
653 s = s.where(and_(self.attributes.c.serial == serial,
654 self.attributes.c.key == key))
655 self.conn.execute(s).close()
657 s = self.attributes.delete()
658 s = s.where(self.attributes.c.serial == serial)
659 self.conn.execute(s).close()
661 def attribute_copy(self, source, dest):
662 s = select([dest, self.attributes.c.key, self.attributes.c.value],
663 self.attributes.c.serial == source)
664 rp = self.conn.execute(s)
665 attributes = rp.fetchall()
667 for dest, k, v in attributes:
669 s = self.attributes.update().where(and_(
670 self.attributes.c.serial == dest,
671 self.attributes.c.key == k))
672 rp = self.conn.execute(s, value=v)
675 s = self.attributes.insert()
676 values = {'serial':dest, 'key':k, 'value':v}
677 self.conn.execute(s, values).close()
679 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
680 """Return a list with all keys pairs defined
681 for all latest versions under parent that
682 do not belong to the cluster.
685 # TODO: Use another table to store before=inf results.
686 a = self.attributes.alias('a')
687 v = self.versions.alias('v')
688 n = self.nodes.alias('n')
689 s = select([a.c.key]).distinct()
690 filtered = select([func.max(self.versions.c.serial)])
692 filtered = filtered.where(self.versions.c.mtime < before)
693 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
694 s = s.where(v.c.cluster != except_cluster)
695 s = s.where(v.c.node.in_(select([self.nodes.c.node],
696 self.nodes.c.parent == parent)))
697 s = s.where(a.c.serial == v.c.serial)
698 s = s.where(n.c.node == v.c.node)
701 conj.append(n.c.path.like(x + '%'))
703 s = s.where(or_(*conj))
704 rp = self.conn.execute(s)
707 return [r[0] for r in rows]
709 def latest_version_list(self, parent, prefix='', delimiter=None,
710 start='', limit=10000, before=inf,
711 except_cluster=0, pathq=[], filterq=None):
712 """Return a (list of (path, serial) tuples, list of common prefixes)
713 for the current versions of the paths with the given parent,
714 matching the following criteria.
716 The property tuple for a version is returned if all
717 of these conditions are true:
723 c. path starts with prefix (and paths in pathq)
725 d. version is the max up to before
727 e. version is not in cluster
729 f. the path does not have the delimiter occuring
730 after the prefix, or ends with the delimiter
732 g. serial matches the attribute filter query.
734 A filter query is a comma-separated list of
735 terms in one of these three forms:
738 an attribute with this key must exist
741 an attribute with this key must not exist
744 the attribute with this key satisfies the value
745 where ?op is one of ==, != <=, >=, <, >.
747 The list of common prefixes includes the prefixes
748 matching up to the first delimiter after prefix,
749 and are reported only once, as "virtual directories".
750 The delimiter is included in the prefixes.
752 If arguments are None, then the corresponding matching rule
755 Limit applies to the first list of tuples returned.
758 if not start or start < prefix:
759 start = strprevling(prefix)
760 nextling = strnextling(prefix)
762 a = self.attributes.alias('a')
763 v = self.versions.alias('v')
764 n = self.nodes.alias('n')
765 s = select([n.c.path, v.c.serial]).distinct()
766 filtered = select([func.max(self.versions.c.serial)])
768 filtered = filtered.where(self.versions.c.mtime < before)
769 s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
770 s = s.where(v.c.cluster != except_cluster)
771 s = s.where(v.c.node.in_(select([self.nodes.c.node],
772 self.nodes.c.parent == parent)))
774 s = s.where(a.c.serial == v.c.serial)
776 s = s.where(n.c.node == v.c.node)
777 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
780 conj.append(n.c.path.like(x + '%'))
783 s = s.where(or_(*conj))
786 s = s.where(a.c.key.in_(filterq.split(',')))
788 s = s.order_by(n.c.path)
792 rp = self.conn.execute(s, start=start)
801 pappend = prefixes.append
803 mappend = matches.append
805 rp = self.conn.execute(s, start=start)
807 props = rp.fetchone()
811 idx = path.find(delimiter, pfz)
820 if idx + dz == len(path):
823 continue # Get one more, in case there is a path.
829 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
832 return matches, prefixes