1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, BigInteger, Float, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
40 from dbworker import DBWorker
44 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
49 def strnextling(prefix):
50 """Return the first unicode string
51 greater than but not starting with given prefix.
52 strnextling('hello') -> 'hellp'
55 ## all strings start with the null string,
56 ## therefore we have to approximate strnextling('')
57 ## with the last unicode character supported by python
58 ## 0x10ffff for wide (32-bit unicode) python builds
59 ## 0x00ffff for narrow (16-bit unicode) python builds
60 ## We will not autodetect. 0xffff is safe enough.
69 def strprevling(prefix):
70 """Return an approximation of the last unicode string
71 less than but not starting with given prefix.
72 strprevling(u'hello') -> u'helln\\xffff'
75 ## There is no prevling for the null string
80 #s += unichr(c-1) + unichr(0xffff)
97 """Nodes store path organization and have multiple versions.
98 Versions store object history and have multiple attributes.
99 Attributes store metadata.
102 # TODO: Provide an interface for included and excluded clusters.
104 def __init__(self, **params):
105 DBWorker.__init__(self, **params)
106 metadata = MetaData()
110 columns.append(Column('node', Integer, primary_key=True))
111 columns.append(Column('parent', Integer,
112 ForeignKey('nodes.node',
115 autoincrement=False))
116 columns.append(Column('path', String(2048), default='', nullable=False))
117 self.nodes = Table('nodes', metadata, *columns)
118 # place an index on path
119 Index('idx_nodes_path', self.nodes.c.path, unique=True)
121 #create statistics table
123 columns.append(Column('node', Integer,
124 ForeignKey('nodes.node',
128 columns.append(Column('population', Integer, nullable=False, default=0))
129 columns.append(Column('size', BigInteger, nullable=False, default=0))
130 columns.append(Column('mtime', Float))
131 columns.append(Column('cluster', Integer, nullable=False, default=0,
133 self.statistics = Table('statistics', metadata, *columns)
135 #create versions table
137 columns.append(Column('serial', Integer, primary_key=True))
138 columns.append(Column('node', Integer,
139 ForeignKey('nodes.node',
141 onupdate='CASCADE')))
142 columns.append(Column('size', BigInteger, nullable=False, default=0))
143 columns.append(Column('source', Integer))
144 columns.append(Column('mtime', Float))
145 columns.append(Column('muser', String(255), nullable=False, default=''))
146 columns.append(Column('cluster', Integer, nullable=False, default=0))
147 self.versions = Table('versions', metadata, *columns)
148 Index('idx_versions_node_mtime', self.versions.c.node,
149 self.versions.c.mtime)
151 #create attributes table
153 columns.append(Column('serial', Integer,
154 ForeignKey('versions.serial',
158 columns.append(Column('key', String(255), primary_key=True))
159 columns.append(Column('value', String(255)))
160 self.attributes = Table('attributes', metadata, *columns)
162 metadata.create_all(self.engine)
164 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
165 self.nodes.c.parent == ROOTNODE))
166 rp = self.conn.execute(s)
170 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
173 def node_create(self, parent, path):
174 """Create a new node from the given properties.
175 Return the node identifier of the new node.
177 #TODO catch IntegrityError?
178 s = self.nodes.insert().values(parent=parent, path=path)
179 r = self.conn.execute(s)
180 inserted_primary_key = r.inserted_primary_key[0]
182 return inserted_primary_key
184 def node_lookup(self, path):
185 """Lookup the current node of the given path.
186 Return None if the path is not found.
189 s = select([self.nodes.c.node], self.nodes.c.path == path)
190 r = self.conn.execute(s)
197 def node_get_properties(self, node):
198 """Return the node's (parent, path).
199 Return None if the node is not found.
202 s = select([self.nodes.c.parent, self.nodes.c.path])
203 s = s.where(self.nodes.c.node == node)
204 r = self.conn.execute(s)
209 def node_get_versions(self, node, keys=(), propnames=_propnames):
210 """Return the properties of all versions at node.
211 If keys is empty, return all properties in the order
212 (serial, node, size, source, mtime, muser, cluster).
215 s = select(['*'], self.versions.c.node == node)
216 s = s.order_by(self.versions.c.serial)
217 r = self.conn.execute(s)
226 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
228 def node_count_children(self, node):
229 """Return node's child count."""
231 s = select([func.count(self.nodes.c.node)])
232 s = s.where(and_(self.nodes.c.parent == node,
233 self.nodes.c.node != ROOTNODE))
234 r = self.conn.execute(s)
239 def node_purge_children(self, parent, before=inf, cluster=0):
240 """Delete all versions with the specified
241 parent and cluster, and return
242 the serials of versions deleted.
243 Clears out nodes with no remaining versions.
246 c1 = select([self.nodes.c.node],
247 self.nodes.c.parent == parent)
248 where_clause = and_(self.versions.c.node.in_(c1),
249 self.versions.c.cluster == cluster,
250 self.versions.c.mtime <= before)
251 s = select([func.count(self.versions.c.serial),
252 func.sum(self.versions.c.size)])
253 s = s.where(where_clause)
254 r = self.conn.execute(s)
259 nr, size = row[0], -row[1] if row[1] else 0
261 self.statistics_update(parent, -nr, size, mtime, cluster)
262 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
264 s = select([self.versions.c.serial])
265 s = s.where(where_clause)
266 r = self.conn.execute(s)
267 serials = [row[SERIAL] for row in r.fetchall()]
271 s = self.versions.delete().where(where_clause)
272 r = self.conn.execute(s)
276 s = select([self.nodes.c.node],
277 and_(self.nodes.c.parent == parent,
278 select([func.count(self.versions.c.serial)],
279 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
280 rp = self.conn.execute(s)
281 nodes = [r[0] for r in rp.fetchall()]
283 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
284 self.conn.execute(s).close()
288 def node_purge(self, node, before=inf, cluster=0):
289 """Delete all versions with the specified
290 node and cluster, and return
291 the serials of versions deleted.
292 Clears out the node if it has no remaining versions.
296 s = select([func.count(self.versions.c.serial),
297 func.sum(self.versions.c.size)])
298 where_clause = and_(self.versions.c.node == node,
299 self.versions.c.cluster == cluster,
300 self.versions.c.mtime <= before)
301 s = s.where(where_clause)
302 r = self.conn.execute(s)
304 nr, size = row[0], row[1]
309 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
311 s = select([self.versions.c.serial])
312 s = s.where(where_clause)
313 r = self.conn.execute(s)
314 serials = [r[SERIAL] for r in r.fetchall()]
318 s = self.versions.delete().where(where_clause)
319 r = self.conn.execute(s)
323 s = select([self.nodes.c.node],
324 and_(self.nodes.c.node == node,
325 select([func.count(self.versions.c.serial)],
326 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
327 r = self.conn.execute(s)
330 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
331 self.conn.execute(s).close()
335 def node_remove(self, node):
336 """Remove the node specified.
337 Return false if the node has children or is not found.
340 if self.node_count_children(node):
344 s = select([func.count(self.versions.c.serial),
345 func.sum(self.versions.c.size),
346 self.versions.c.cluster])
347 s = s.where(self.versions.c.node == node)
348 s = s.group_by(self.versions.c.cluster)
349 r = self.conn.execute(s)
350 for population, size, cluster in r.fetchall():
351 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
354 s = self.nodes.delete().where(self.nodes.c.node == node)
355 self.conn.execute(s).close()
358 def statistics_get(self, node, cluster=0):
359 """Return population, total size and last mtime
360 for all versions under node that belong to the cluster.
363 s = select([self.statistics.c.population,
364 self.statistics.c.size,
365 self.statistics.c.mtime])
366 s = s.where(and_(self.statistics.c.node == node,
367 self.statistics.c.cluster == cluster))
368 r = self.conn.execute(s)
373 def statistics_update(self, node, population, size, mtime, cluster=0):
374 """Update the statistics of the given node.
375 Statistics keep track the population, total
376 size of objects and mtime in the node's namespace.
377 May be zero or positive or negative numbers.
380 s = select([self.statistics.c.population, self.statistics.c.size],
381 and_(self.statistics.c.node == node,
382 self.statistics.c.cluster == cluster))
383 rp = self.conn.execute(s)
387 prepopulation, presize = (0, 0)
389 prepopulation, presize = r
390 population += prepopulation
395 u = self.statistics.update().where(and_(self.statistics.c.node==node,
396 self.statistics.c.cluster==cluster))
397 u = u.values(population=population, size=size, mtime=mtime)
398 rp = self.conn.execute(u)
401 ins = self.statistics.insert()
402 ins = ins.values(node=node, population=population, size=size,
403 mtime=mtime, cluster=cluster)
404 self.conn.execute(ins).close()
406 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
407 """Update the statistics of the given node's parent.
408 Then recursively update all parents up to the root.
409 Population is not recursive.
415 props = self.node_get_properties(node)
419 self.statistics_update(parent, population, size, mtime, cluster)
421 population = 0 # Population isn't recursive
423 def statistics_latest(self, node, before=inf, except_cluster=0):
424 """Return population, total size and last mtime
425 for all latest versions under node that
426 do not belong to the cluster.
430 props = self.node_get_properties(node)
435 # The latest version.
436 s = select([self.versions.c.serial,
437 self.versions.c.node,
438 self.versions.c.size,
439 self.versions.c.source,
440 self.versions.c.mtime,
441 self.versions.c.muser,
442 self.versions.c.cluster])
443 s = s.where(and_(self.versions.c.cluster != except_cluster,
444 self.versions.c.serial == select(
445 [func.max(self.versions.c.serial)],
446 and_(self.versions.c.node == node,
447 self.versions.c.mtime < before))))
448 r = self.conn.execute(s)
455 # First level, just under node (get population).
456 v = self.versions.alias('v')
457 s = select([func.count(v.c.serial),
459 func.max(v.c.mtime)])
460 c1 = select([func.max(self.versions.c.serial)],
461 and_(self.versions.c.node == v.c.node,
462 self.versions.c.mtime < before))
463 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
464 s = s.where(and_(v.c.serial == c1,
465 v.c.cluster != except_cluster,
467 rp = self.conn.execute(s)
473 mtime = max(mtime, r[2])
477 # All children (get size and mtime).
478 # XXX: This is why the full path is stored.
479 s = select([func.count(v.c.serial),
481 func.max(v.c.mtime)])
482 c1 = select([func.max(self.versions.c.serial)],
483 and_(self.versions.c.node == v.c.node,
484 self.versions.c.mtime < before))
485 c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
486 s = s.where(and_(v.c.serial == c1,
487 v.c.cluster != except_cluster,
489 rp = self.conn.execute(s)
494 size = r[1] - props[SIZE]
495 mtime = max(mtime, r[2])
496 return (count, size, mtime)
498 def version_create(self, node, size, source, muser, cluster=0):
499 """Create a new version from the given properties.
500 Return the (serial, mtime) of the new version.
504 props = (node, size, source, mtime, muser, cluster)
507 s = self.versions.insert().values(**props)
508 serial = self.conn.execute(s).inserted_primary_key[0]
509 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
512 def version_lookup(self, node, before=inf, cluster=0):
513 """Lookup the current version of the given node.
514 Return a list with its properties:
515 (serial, node, size, source, mtime, muser, cluster)
516 or None if the current version is not found in the given cluster.
519 v = self.versions.alias('v')
520 s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
521 v.c.muser, v.c.cluster])
522 c = select([func.max(self.versions.c.serial)],
523 and_(self.versions.c.node == node,
524 self.versions.c.mtime < before))
525 s = s.where(and_(v.c.serial == c,
526 v.c.cluster == cluster))
527 r = self.conn.execute(s)
534 def version_get_properties(self, serial, keys=(), propnames=_propnames):
535 """Return a sequence of values for the properties of
536 the version specified by serial and the keys, in the order given.
537 If keys is empty, return all properties in the order
538 (serial, node, size, source, mtime, muser, cluster).
541 v = self.versions.alias()
542 s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
543 v.c.muser, v.c.cluster], v.c.serial == serial)
544 rp = self.conn.execute(s)
552 return [r[propnames[k]] for k in keys if k in propnames]
554 def version_recluster(self, serial, cluster):
555 """Move the version into another cluster."""
557 props = self.version_get_properties(serial)
562 oldcluster = props[CLUSTER]
563 if cluster == oldcluster:
567 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
568 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
570 s = self.versions.update()
571 s = s.where(self.versions.c.serial == serial)
572 s = s.values(cluster = cluster)
573 self.conn.execute(s).close()
575 def version_remove(self, serial):
576 """Remove the serial specified."""
578 props = self.node_get_properties(serial)
583 cluster = props[CLUSTER]
586 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
588 s = self.versions.delete().where(self.versions.c.serial == serial)
589 self.conn.execute(s).close()
592 def attribute_get(self, serial, keys=()):
593 """Return a list of (key, value) pairs of the version specified by serial.
594 If keys is empty, return all attributes.
595 Othwerise, return only those specified.
599 attrs = self.attributes.alias()
600 s = select([attrs.c.key, attrs.c.value])
601 s = s.where(and_(attrs.c.key.in_(keys),
602 attrs.c.serial == serial))
604 attrs = self.attributes.alias()
605 s = select([attrs.c.key, attrs.c.value])
606 s = s.where(attrs.c.serial == serial)
607 r = self.conn.execute(s)
612 def attribute_set(self, serial, items):
613 """Set the attributes of the version specified by serial.
614 Receive attributes as an iterable of (key, value) pairs.
619 s = self.attributes.update()
620 s = s.where(and_(self.attributes.c.serial == serial,
621 self.attributes.c.key == k))
622 s = s.values(value = v)
623 rp = self.conn.execute(s)
626 s = self.attributes.insert()
627 s = s.values(serial=serial, key=k, value=v)
628 self.conn.execute(s).close()
630 def attribute_del(self, serial, keys=()):
631 """Delete attributes of the version specified by serial.
632 If keys is empty, delete all attributes.
633 Otherwise delete those specified.
637 #TODO more efficient way to do this?
639 s = self.attributes.delete()
640 s = s.where(and_(self.attributes.c.serial == serial,
641 self.attributes.c.key == key))
642 self.conn.execute(s).close()
644 s = self.attributes.delete()
645 s = s.where(self.attributes.c.serial == serial)
646 self.conn.execute(s).close()
648 def attribute_copy(self, source, dest):
649 s = select([dest, self.attributes.c.key, self.attributes.c.value],
650 self.attributes.c.serial == source)
651 rp = self.conn.execute(s)
652 attributes = rp.fetchall()
654 for dest, k, v in attributes:
656 s = self.attributes.update().where(and_(
657 self.attributes.c.serial == dest,
658 self.attributes.c.key == k))
659 rp = self.conn.execute(s, value=v)
662 s = self.attributes.insert()
663 values = {'serial':dest, 'key':k, 'value':v}
664 self.conn.execute(s, values).close()
666 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
667 """Return a list with all keys pairs defined
668 for all latest versions under parent that
669 do not belong to the cluster.
672 # TODO: Use another table to store before=inf results.
673 a = self.attributes.alias('a')
674 v = self.versions.alias('v')
675 n = self.nodes.alias('n')
676 s = select([a.c.key]).distinct()
677 s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
678 and_(self.versions.c.node == v.c.node,
679 self.versions.c.mtime < before)))
680 s = s.where(v.c.cluster != except_cluster)
681 s = s.where(v.c.node.in_(select([self.nodes.c.node],
682 self.nodes.c.parent == parent)))
683 s = s.where(a.c.serial == v.c.serial)
684 s = s.where(n.c.node == v.c.node)
687 conj.append(n.c.path.like(x + '%'))
689 s = s.where(or_(*conj))
690 rp = self.conn.execute(s)
693 return [r[0] for r in rows]
695 def latest_version_list(self, parent, prefix='', delimiter=None,
696 start='', limit=10000, before=inf,
697 except_cluster=0, pathq=[], filterq=None):
698 """Return a (list of (path, serial) tuples, list of common prefixes)
699 for the current versions of the paths with the given parent,
700 matching the following criteria.
702 The property tuple for a version is returned if all
703 of these conditions are true:
709 c. path starts with prefix (and paths in pathq)
711 d. version is the max up to before
713 e. version is not in cluster
715 f. the path does not have the delimiter occuring
716 after the prefix, or ends with the delimiter
718 g. serial matches the attribute filter query.
720 A filter query is a comma-separated list of
721 terms in one of these three forms:
724 an attribute with this key must exist
727 an attribute with this key must not exist
730 the attribute with this key satisfies the value
731 where ?op is one of ==, != <=, >=, <, >.
733 The list of common prefixes includes the prefixes
734 matching up to the first delimiter after prefix,
735 and are reported only once, as "virtual directories".
736 The delimiter is included in the prefixes.
738 If arguments are None, then the corresponding matching rule
741 Limit applies to the first list of tuples returned.
744 if not start or start < prefix:
745 start = strprevling(prefix)
746 nextling = strnextling(prefix)
748 a = self.attributes.alias('a')
749 v = self.versions.alias('v')
750 n = self.nodes.alias('n')
751 s = select([n.c.path, v.c.serial]).distinct()
752 s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
753 and_(self.versions.c.node == v.c.node,
754 self.versions.c.mtime < before)))
755 s = s.where(v.c.cluster != except_cluster)
756 s = s.where(v.c.node.in_(select([self.nodes.c.node],
757 self.nodes.c.parent == parent)))
759 s = s.where(a.c.serial == v.c.serial)
761 s = s.where(n.c.node == v.c.node)
762 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
765 conj.append(n.c.path.like(x + '%'))
768 s = s.where(or_(*conj))
771 s = s.where(a.c.key.in_(filterq.split(',')))
773 s = s.order_by(n.c.path)
777 rp = self.conn.execute(s, start=start)
786 pappend = prefixes.append
788 mappend = matches.append
790 rp = self.conn.execute(s, start=start)
792 props = rp.fetchone()
796 idx = path.find(delimiter, pfz)
805 if idx + dz == len(path):
808 continue # Get one more, in case there is a path.
814 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
817 return matches, prefixes