1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, Float, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
40 from sqlalchemy.sql.expression import _UpdateBase
42 from sqlalchemy.sql.expression import UpdateBase as _UpdateBase
44 from dbworker import DBWorker
48 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
53 def strnextling(prefix):
54 """Return the first unicode string
55 greater than but not starting with given prefix.
56 strnextling('hello') -> 'hellp'
59 ## all strings start with the null string,
60 ## therefore we have to approximate strnextling('')
61 ## with the last unicode character supported by python
62 ## 0x10ffff for wide (32-bit unicode) python builds
63 ## 0x00ffff for narrow (16-bit unicode) python builds
64 ## We will not autodetect. 0xffff is safe enough.
73 def strprevling(prefix):
74 """Return an approximation of the last unicode string
75 less than but not starting with given prefix.
76 strprevling(u'hello') -> u'helln\\xffff'
79 ## There is no prevling for the null string
84 #s += unichr(c-1) + unichr(0xffff)
100 class Node(DBWorker):
101 """Nodes store path organization and have multiple versions.
102 Versions store object history and have multiple attributes.
103 Attributes store metadata.
106 # TODO: Provide an interface for included and excluded clusters.
108 def __init__(self, **params):
109 DBWorker.__init__(self, **params)
110 metadata = MetaData()
114 columns.append(Column('node', Integer, primary_key=True))
115 columns.append(Column('parent', Integer,
116 ForeignKey('nodes.node',
119 autoincrement=False))
120 columns.append(Column('path', String(2048), default='', nullable=False))
121 self.nodes = Table('nodes', metadata, *columns)
122 # place an index on path
123 Index('idx_nodes_path', self.nodes.c.path, unique=True)
125 #create statistics table
127 columns.append(Column('node', Integer,
128 ForeignKey('nodes.node',
132 columns.append(Column('population', Integer, nullable=False, default=0))
133 columns.append(Column('size', Integer, nullable=False, default=0))
134 columns.append(Column('mtime', Float))
135 columns.append(Column('cluster', Integer, nullable=False, default=0,
137 self.statistics = Table('statistics', metadata, *columns)
139 #create versions table
141 columns.append(Column('serial', Integer, primary_key=True))
142 columns.append(Column('node', Integer,
143 ForeignKey('nodes.node',
145 onupdate='CASCADE')))
146 columns.append(Column('size', Integer, nullable=False, default=0))
147 columns.append(Column('source', Integer))
148 columns.append(Column('mtime', Float))
149 columns.append(Column('muser', String(255), nullable=False, default=''))
150 columns.append(Column('cluster', Integer, nullable=False, default=0))
151 self.versions = Table('versions', metadata, *columns)
152 # place an index on node
153 Index('idx_versions_node', self.versions.c.node)
154 # TODO: Sort out if more indexes are needed.
155 #Index('idx_versions_node', self.versions.c.mtime)
157 #create attributes table
159 columns.append(Column('serial', Integer,
160 ForeignKey('versions.serial',
164 columns.append(Column('key', String(255), primary_key=True))
165 columns.append(Column('value', String(255)))
166 self.attributes = Table('attributes', metadata, *columns)
168 metadata.create_all(self.engine)
170 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
171 self.nodes.c.parent == ROOTNODE))
172 r = self.conn.execute(s).fetchone()
174 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
177 def node_create(self, parent, path):
178 """Create a new node from the given properties.
179 Return the node identifier of the new node.
181 #TODO catch IntegrityError?
182 s = self.nodes.insert().values(parent=parent, path=path)
183 r = self.conn.execute(s)
184 inserted_primary_key = r.inserted_primary_key[0]
186 return inserted_primary_key
188 def node_lookup(self, path):
189 """Lookup the current node of the given path.
190 Return None if the path is not found.
193 s = select([self.nodes.c.node], self.nodes.c.path == path)
194 r = self.conn.execute(s)
201 def node_get_properties(self, node):
202 """Return the node's (parent, path).
203 Return None if the node is not found.
206 s = select([self.nodes.c.parent, self.nodes.c.path])
207 s = s.where(self.nodes.c.node == node)
208 r = self.conn.execute(s)
213 def node_get_versions(self, node, keys=(), propnames=_propnames):
214 """Return the properties of all versions at node.
215 If keys is empty, return all properties in the order
216 (serial, node, size, source, mtime, muser, cluster).
219 s = select(['*'], self.versions.c.node == node)
220 s = s.order_by(self.versions.c.serial)
221 r = self.conn.execute(s)
229 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
231 def node_count_children(self, node):
232 """Return node's child count."""
234 s = select([func.count(self.nodes.c.node)])
235 s = s.where(and_(self.nodes.c.parent == node,
236 self.nodes.c.node != ROOTNODE))
237 r = self.conn.execute(s)
242 def node_purge_children(self, parent, before=inf, cluster=0):
243 """Delete all versions with the specified
244 parent and cluster, and return
245 the serials of versions deleted.
246 Clears out nodes with no remaining versions.
249 c1 = select([self.nodes.c.node],
250 self.nodes.c.parent == parent)
251 where_clause = and_(self.versions.c.node.in_(c1),
252 self.versions.c.cluster == cluster,
253 self.versions.c.mtime <= before)
254 s = select([func.count(self.versions.c.serial),
255 func.sum(self.versions.c.size)])
256 s = s.where(where_clause)
257 r = self.conn.execute(s)
262 nr, size = row[0], -row[1] if row[1] else 0
264 self.statistics_update(parent, -nr, size, mtime, cluster)
265 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
267 s = select([self.versions.c.serial])
268 s = s.where(where_clause)
269 r = self.conn.execute(s)
270 serials = [row[SERIAL] for row in r.fetchall()]
274 s = self.versions.delete().where(where_clause)
275 r = self.conn.execute(s)
279 s = select([self.nodes.c.node],
280 and_(self.nodes.c.parent == parent,
281 select([func.count(self.versions.c.serial)],
282 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
283 rp = self.conn.execute(s)
284 nodes = [r[0] for r in rp.fetchall()]
286 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
287 self.conn.execute(s).close()
291 def node_purge(self, node, before=inf, cluster=0):
292 """Delete all versions with the specified
293 node and cluster, and return
294 the serials of versions deleted.
295 Clears out the node if it has no remaining versions.
299 s = select([func.count(self.versions.c.serial),
300 func.sum(self.versions.c.size)])
301 where_clause = and_(self.versions.c.node == node,
302 self.versions.c.cluster == cluster,
303 self.versions.c.mtime <= before)
304 s = s.where(where_clause)
305 r = self.conn.execute(s)
307 nr, size = row[0], row[1]
312 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
314 s = select([self.versions.c.serial])
315 s = s.where(where_clause)
316 r = self.conn.execute(s)
317 serials = [r[SERIAL] for r in r.fetchall()]
320 s = self.versions.delete().where(where_clause)
321 r = self.conn.execute(s)
325 s = select([self.nodes.c.node],
326 and_(self.nodes.c.node == node,
327 select([func.count(self.versions.c.serial)],
328 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
329 r = self.conn.execute(s)
332 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
333 self.conn.execute(s).close()
337 def node_remove(self, node):
338 """Remove the node specified.
339 Return false if the node has children or is not found.
342 if self.node_count_children(node):
346 s = select([func.count(self.versions.c.serial),
347 func.sum(self.versions.c.size),
348 self.versions.c.cluster])
349 s = s.where(self.versions.c.node == node)
350 s = s.group_by(self.versions.c.cluster)
351 r = self.conn.execute(s)
352 for population, size, cluster in r.fetchall():
353 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
356 s = self.nodes.delete().where(self.nodes.c.node == node)
357 self.conn.execute(s).close()
360 def statistics_get(self, node, cluster=0):
361 """Return population, total size and last mtime
362 for all versions under node that belong to the cluster.
365 s = select([self.statistics.c.population,
366 self.statistics.c.size,
367 self.statistics.c.mtime])
368 s = s.where(and_(self.statistics.c.node == node,
369 self.statistics.c.cluster == cluster))
370 r = self.conn.execute(s)
375 def statistics_update(self, node, population, size, mtime, cluster=0):
376 """Update the statistics of the given node.
377 Statistics keep track the population, total
378 size of objects and mtime in the node's namespace.
379 May be zero or positive or negative numbers.
382 s = select([self.statistics.c.population, self.statistics.c.size],
383 and_(self.statistics.c.node == node,
384 self.statistics.c.cluster == cluster))
385 rp = self.conn.execute(s)
389 prepopulation, presize = (0, 0)
391 prepopulation, presize = r
392 population += prepopulation
397 u = self.statistics.update().where(and_(self.statistics.c.node==node,
398 self.statistics.c.cluster==cluster))
399 u = u.values(population=population, size=size, mtime=mtime)
400 rp = self.conn.execute(u)
403 ins = self.statistics.insert()
404 ins = ins.values(node=node, population=population, size=size,
405 mtime=mtime, cluster=cluster)
406 self.conn.execute(ins).close()
408 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
409 """Update the statistics of the given node's parent.
410 Then recursively update all parents up to the root.
411 Population is not recursive.
417 props = self.node_get_properties(node)
421 self.statistics_update(parent, population, size, mtime, cluster)
423 population = 0 # Population isn't recursive
425 def statistics_latest(self, node, before=inf, except_cluster=0):
426 """Return population, total size and last mtime
427 for all latest versions under node that
428 do not belong to the cluster.
432 props = self.node_get_properties(node)
437 # The latest version.
438 s = select([self.versions.c.serial,
439 self.versions.c.node,
440 self.versions.c.size,
441 self.versions.c.source,
442 self.versions.c.mtime,
443 self.versions.c.muser,
444 self.versions.c.cluster])
445 s = s.where(and_(self.versions.c.cluster != except_cluster,
446 self.versions.c.serial == select(
447 [func.max(self.versions.c.serial)],
448 and_(self.versions.c.node == node,
449 self.versions.c.mtime < before))))
450 r = self.conn.execute(s)
457 # First level, just under node (get population).
458 v = self.versions.alias('v')
459 s = select([func.count(v.c.serial),
461 func.max(v.c.mtime)])
462 c1 = select([func.max(self.versions.c.serial)],
463 and_(self.versions.c.node == v.c.node,
464 self.versions.c.mtime < before))
465 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
466 s = s.where(and_(v.c.serial == c1,
467 v.c.cluster != except_cluster,
469 rp = self.conn.execute(s)
475 mtime = max(mtime, r[2])
479 # All children (get size and mtime).
480 # XXX: This is why the full path is stored.
481 s = select([func.count(v.c.serial),
483 func.max(v.c.mtime)])
484 c1 = select([func.max(self.versions.c.serial)],
485 and_(self.versions.c.node == v.c.node,
486 self.versions.c.mtime < before))
487 c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
488 s = s.where(and_(v.c.serial == c1,
489 v.c.cluster != except_cluster,
491 rp = self.conn.execute(s)
496 size = r[1] - props[SIZE]
497 mtime = max(mtime, r[2])
498 return (count, size, mtime)
500 def version_create(self, node, size, source, muser, cluster=0):
501 """Create a new version from the given properties.
502 Return the (serial, mtime) of the new version.
506 props = (node, size, source, mtime, muser, cluster)
509 s = self.versions.insert().values(**props)
510 serial = self.conn.execute(s).inserted_primary_key[0]
511 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
514 def version_lookup(self, node, before=inf, cluster=0):
515 """Lookup the current version of the given node.
516 Return a list with its properties:
517 (serial, node, size, source, mtime, muser, cluster)
518 or None if the current version is not found in the given cluster.
521 v = self.versions.alias('v')
522 s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
523 v.c.muser, v.c.cluster])
524 c = select([func.max(self.versions.c.serial)],
525 and_(self.versions.c.node == node,
526 self.versions.c.mtime < before))
527 s = s.where(and_(v.c.serial == c,
528 v.c.cluster == cluster))
529 r = self.conn.execute(s)
536 def version_get_properties(self, serial, keys=(), propnames=_propnames):
537 """Return a sequence of values for the properties of
538 the version specified by serial and the keys, in the order given.
539 If keys is empty, return all properties in the order
540 (serial, node, size, source, mtime, muser, cluster).
543 v = self.versions.alias()
544 s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
545 v.c.muser, v.c.cluster], v.c.serial == serial)
546 rp = self.conn.execute(s)
554 return [r[propnames[k]] for k in keys if k in propnames]
556 def version_recluster(self, serial, cluster):
557 """Move the version into another cluster."""
559 props = self.version_get_properties(serial)
564 oldcluster = props[CLUSTER]
565 if cluster == oldcluster:
569 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
570 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
572 s = self.versions.update()
573 s = s.where(self.versions.c.serial == serial)
574 s = s.values(cluster = cluster)
575 self.conn.execute(s).close()
577 def version_remove(self, serial):
578 """Remove the serial specified."""
580 props = self.node_get_properties(serial)
585 cluster = props[CLUSTER]
588 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
590 s = self.versions.delete().where(self.versions.c.serial == serial)
591 self.conn.execute(s).close()
594 def attribute_get(self, serial, keys=()):
595 """Return a list of (key, value) pairs of the version specified by serial.
596 If keys is empty, return all attributes.
597 Othwerise, return only those specified.
601 attrs = self.attributes.alias()
602 s = select([attrs.c.key, attrs.c.value])
603 s = s.where(and_(attrs.c.key.in_(keys),
604 attrs.c.serial == serial))
606 attrs = self.attributes.alias()
607 s = select([attrs.c.key, attrs.c.value])
608 s = s.where(attrs.c.serial == serial)
609 r = self.conn.execute(s)
614 def attribute_set(self, serial, items):
615 """Set the attributes of the version specified by serial.
616 Receive attributes as an iterable of (key, value) pairs.
621 s = self.attributes.update()
622 s = s.where(and_(self.attributes.c.serial == serial,
623 self.attributes.c.key == k))
624 s = s.values(value = v)
625 rp = self.conn.execute(s)
628 s = self.attributes.insert()
629 s = s.values(serial=serial, key=k, value=v)
630 self.conn.execute(s).close()
632 def attribute_del(self, serial, keys=()):
633 """Delete attributes of the version specified by serial.
634 If keys is empty, delete all attributes.
635 Otherwise delete those specified.
639 #TODO more efficient way to do this?
641 s = self.attributes.delete()
642 s = s.where(and_(self.attributes.c.serial == serial,
643 self.attributes.c.key == key))
644 self.conn.execute(s).close()
646 s = self.attributes.delete()
647 s = s.where(self.attributes.c.serial == serial)
648 self.conn.execute(s).close()
650 def attribute_copy(self, source, dest):
651 s = select([dest, self.attributes.c.key, self.attributes.c.value],
652 self.attributes.c.serial == source)
653 rp = self.conn.execute(s)
654 attributes = rp.fetchall()
656 for dest, k, v in attributes:
657 s = self.attributes.update().where(and_(
658 self.attributes.c.serial == dest,
659 self.attributes.c.key == k))
660 rp = self.conn.execute(s, value=v)
663 s = self.attributes.insert()
664 values = {'serial':dest, 'key':k, 'value':v}
665 self.conn.execute(s, values).close()
667 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
668 """Return a list with all keys pairs defined
669 for all latest versions under parent that
670 do not belong to the cluster.
673 # TODO: Use another table to store before=inf results.
674 a = self.attributes.alias('a')
675 v = self.versions.alias('v')
676 n = self.nodes.alias('n')
677 s = select([a.c.key]).distinct()
678 s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
679 and_(self.versions.c.node == v.c.node,
680 self.versions.c.mtime < before)))
681 s = s.where(v.c.cluster != except_cluster)
682 s = s.where(v.c.node.in_(select([self.nodes.c.node],
683 self.nodes.c.parent == parent)))
684 s = s.where(a.c.serial == v.c.serial)
685 s = s.where(n.c.node == v.c.node)
688 conj.append(n.c.path.like(x + '%'))
690 s = s.where(or_(*conj))
691 rp = self.conn.execute(s)
694 return [r[0] for r in rows]
696 def latest_version_list(self, parent, prefix='', delimiter=None,
697 start='', limit=10000, before=inf,
698 except_cluster=0, pathq=[], filterq=None):
699 """Return a (list of (path, serial) tuples, list of common prefixes)
700 for the current versions of the paths with the given parent,
701 matching the following criteria.
703 The property tuple for a version is returned if all
704 of these conditions are true:
710 c. path starts with prefix (and paths in pathq)
712 d. version is the max up to before
714 e. version is not in cluster
716 f. the path does not have the delimiter occuring
717 after the prefix, or ends with the delimiter
719 g. serial matches the attribute filter query.
721 A filter query is a comma-separated list of
722 terms in one of these three forms:
725 an attribute with this key must exist
728 an attribute with this key must not exist
731 the attribute with this key satisfies the value
732 where ?op is one of ==, != <=, >=, <, >.
734 The list of common prefixes includes the prefixes
735 matching up to the first delimiter after prefix,
736 and are reported only once, as "virtual directories".
737 The delimiter is included in the prefixes.
739 If arguments are None, then the corresponding matching rule
742 Limit applies to the first list of tuples returned.
745 if not start or start < prefix:
746 start = strprevling(prefix)
747 nextling = strnextling(prefix)
749 a = self.attributes.alias('a')
750 v = self.versions.alias('v')
751 n = self.nodes.alias('n')
752 s = select([n.c.path, v.c.serial]).distinct()
753 s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
754 and_(self.versions.c.node == v.c.node,
755 self.versions.c.mtime < before)))
756 s = s.where(v.c.cluster != except_cluster)
757 s = s.where(v.c.node.in_(select([self.nodes.c.node],
758 self.nodes.c.parent == parent)))
760 s = s.where(a.c.serial == v.c.serial)
762 s = s.where(n.c.node == v.c.node)
763 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
766 conj.append(n.c.path.like(x + '%'))
769 s = s.where(or_(*conj))
772 s = s.where(a.c.key.in_(filterq.split(',')))
774 s = s.order_by(n.c.path)
778 rp = self.conn.execute(s, start=start)
787 pappend = prefixes.append
789 mappend = matches.append
791 rp = self.conn.execute(s, start=start)
793 props = rp.fetchone()
797 idx = path.find(delimiter, pfz)
806 if idx + dz == len(path):
809 continue # Get one more, in case there is a path.
815 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
817 return matches, prefixes