1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, Float, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from sqlalchemy.ext.compiler import compiles
40 from dbworker import DBWorker
44 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
49 def strnextling(prefix):
50 """Return the first unicode string
51 greater than but not starting with given prefix.
52 strnextling('hello') -> 'hellp'
55 ## all strings start with the null string,
56 ## therefore we have to approximate strnextling('')
57 ## with the last unicode character supported by python
58 ## 0x10ffff for wide (32-bit unicode) python builds
59 ## 0x00ffff for narrow (16-bit unicode) python builds
60 ## We will not autodetect. 0xffff is safe enough.
69 def strprevling(prefix):
70 """Return an approximation of the last unicode string
71 less than but not starting with given prefix.
72 strprevling(u'hello') -> u'helln\\xffff'
75 ## There is no prevling for the null string
80 #s += unichr(c-1) + unichr(0xffff)
97 """Nodes store path organization and have multiple versions.
98 Versions store object history and have multiple attributes.
99 Attributes store metadata.
102 # TODO: Provide an interface for included and excluded clusters.
104 def __init__(self, **params):
105 DBWorker.__init__(self, **params)
106 metadata = MetaData()
110 columns.append(Column('node', Integer, primary_key=True))
111 columns.append(Column('parent', Integer,
112 ForeignKey('nodes.node',
115 autoincrement=False))
116 columns.append(Column('path', String(2048), default='', nullable=False))
117 self.nodes = Table('nodes', metadata, *columns)
118 # place an index on path
119 Index('idx_nodes_path', self.nodes.c.path, unique=True)
121 #create statistics table
123 columns.append(Column('node', Integer,
124 ForeignKey('nodes.node',
128 columns.append(Column('population', Integer, nullable=False, default=0))
129 columns.append(Column('size', Integer, nullable=False, default=0))
130 columns.append(Column('mtime', Float))
131 columns.append(Column('cluster', Integer, nullable=False, default=0,
133 self.statistics = Table('statistics', metadata, *columns)
135 #create versions table
137 columns.append(Column('serial', Integer, primary_key=True))
138 columns.append(Column('node', Integer,
139 ForeignKey('nodes.node',
141 onupdate='CASCADE')))
142 columns.append(Column('size', Integer, nullable=False, default=0))
143 columns.append(Column('source', Integer))
144 columns.append(Column('mtime', Float))
145 columns.append(Column('muser', String(255), nullable=False, default=''))
146 columns.append(Column('cluster', Integer, nullable=False, default=0))
147 self.versions = Table('versions', metadata, *columns)
148 Index('idx_versions_node_mtime', self.versions.c.node,
149 self.versions.c.mtime)
151 #create attributes table
153 columns.append(Column('serial', Integer,
154 ForeignKey('versions.serial',
158 columns.append(Column('key', String(255), primary_key=True))
159 columns.append(Column('value', String(255)))
160 self.attributes = Table('attributes', metadata, *columns)
162 metadata.create_all(self.engine)
164 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
165 self.nodes.c.parent == ROOTNODE))
166 r = self.conn.execute(s).fetchone()
168 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
171 def node_create(self, parent, path):
172 """Create a new node from the given properties.
173 Return the node identifier of the new node.
175 #TODO catch IntegrityError?
176 s = self.nodes.insert().values(parent=parent, path=path)
177 r = self.conn.execute(s)
178 inserted_primary_key = r.inserted_primary_key[0]
180 return inserted_primary_key
182 def node_lookup(self, path):
183 """Lookup the current node of the given path.
184 Return None if the path is not found.
187 s = select([self.nodes.c.node], self.nodes.c.path == path)
188 r = self.conn.execute(s)
195 def node_get_properties(self, node):
196 """Return the node's (parent, path).
197 Return None if the node is not found.
200 s = select([self.nodes.c.parent, self.nodes.c.path])
201 s = s.where(self.nodes.c.node == node)
202 r = self.conn.execute(s)
207 def node_get_versions(self, node, keys=(), propnames=_propnames):
208 """Return the properties of all versions at node.
209 If keys is empty, return all properties in the order
210 (serial, node, size, source, mtime, muser, cluster).
213 s = select(['*'], self.versions.c.node == node)
214 s = s.order_by(self.versions.c.serial)
215 r = self.conn.execute(s)
223 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
225 def node_count_children(self, node):
226 """Return node's child count."""
228 s = select([func.count(self.nodes.c.node)])
229 s = s.where(and_(self.nodes.c.parent == node,
230 self.nodes.c.node != ROOTNODE))
231 r = self.conn.execute(s)
236 def node_purge_children(self, parent, before=inf, cluster=0):
237 """Delete all versions with the specified
238 parent and cluster, and return
239 the serials of versions deleted.
240 Clears out nodes with no remaining versions.
243 c1 = select([self.nodes.c.node],
244 self.nodes.c.parent == parent)
245 where_clause = and_(self.versions.c.node.in_(c1),
246 self.versions.c.cluster == cluster,
247 self.versions.c.mtime <= before)
248 s = select([func.count(self.versions.c.serial),
249 func.sum(self.versions.c.size)])
250 s = s.where(where_clause)
251 r = self.conn.execute(s)
256 nr, size = row[0], -row[1] if row[1] else 0
258 self.statistics_update(parent, -nr, size, mtime, cluster)
259 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
261 s = select([self.versions.c.serial])
262 s = s.where(where_clause)
263 r = self.conn.execute(s)
264 serials = [row[SERIAL] for row in r.fetchall()]
268 s = self.versions.delete().where(where_clause)
269 r = self.conn.execute(s)
273 s = select([self.nodes.c.node],
274 and_(self.nodes.c.parent == parent,
275 select([func.count(self.versions.c.serial)],
276 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
277 rp = self.conn.execute(s)
278 nodes = [r[0] for r in rp.fetchall()]
280 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
281 self.conn.execute(s).close()
285 def node_purge(self, node, before=inf, cluster=0):
286 """Delete all versions with the specified
287 node and cluster, and return
288 the serials of versions deleted.
289 Clears out the node if it has no remaining versions.
293 s = select([func.count(self.versions.c.serial),
294 func.sum(self.versions.c.size)])
295 where_clause = and_(self.versions.c.node == node,
296 self.versions.c.cluster == cluster,
297 self.versions.c.mtime <= before)
298 s = s.where(where_clause)
299 r = self.conn.execute(s)
301 nr, size = row[0], row[1]
306 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
308 s = select([self.versions.c.serial])
309 s = s.where(where_clause)
310 r = self.conn.execute(s)
311 serials = [r[SERIAL] for r in r.fetchall()]
314 s = self.versions.delete().where(where_clause)
315 r = self.conn.execute(s)
319 s = select([self.nodes.c.node],
320 and_(self.nodes.c.node == node,
321 select([func.count(self.versions.c.serial)],
322 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
323 r = self.conn.execute(s)
326 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
327 self.conn.execute(s).close()
331 def node_remove(self, node):
332 """Remove the node specified.
333 Return false if the node has children or is not found.
336 if self.node_count_children(node):
340 s = select([func.count(self.versions.c.serial),
341 func.sum(self.versions.c.size),
342 self.versions.c.cluster])
343 s = s.where(self.versions.c.node == node)
344 s = s.group_by(self.versions.c.cluster)
345 r = self.conn.execute(s)
346 for population, size, cluster in r.fetchall():
347 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
350 s = self.nodes.delete().where(self.nodes.c.node == node)
351 self.conn.execute(s).close()
354 def statistics_get(self, node, cluster=0):
355 """Return population, total size and last mtime
356 for all versions under node that belong to the cluster.
359 s = select([self.statistics.c.population,
360 self.statistics.c.size,
361 self.statistics.c.mtime])
362 s = s.where(and_(self.statistics.c.node == node,
363 self.statistics.c.cluster == cluster))
364 r = self.conn.execute(s)
369 def statistics_update(self, node, population, size, mtime, cluster=0):
370 """Update the statistics of the given node.
371 Statistics keep track the population, total
372 size of objects and mtime in the node's namespace.
373 May be zero or positive or negative numbers.
376 s = select([self.statistics.c.population, self.statistics.c.size],
377 and_(self.statistics.c.node == node,
378 self.statistics.c.cluster == cluster))
379 rp = self.conn.execute(s)
383 prepopulation, presize = (0, 0)
385 prepopulation, presize = r
386 population += prepopulation
391 u = self.statistics.update().where(and_(self.statistics.c.node==node,
392 self.statistics.c.cluster==cluster))
393 u = u.values(population=population, size=size, mtime=mtime)
394 rp = self.conn.execute(u)
397 ins = self.statistics.insert()
398 ins = ins.values(node=node, population=population, size=size,
399 mtime=mtime, cluster=cluster)
400 self.conn.execute(ins).close()
402 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
403 """Update the statistics of the given node's parent.
404 Then recursively update all parents up to the root.
405 Population is not recursive.
411 props = self.node_get_properties(node)
415 self.statistics_update(parent, population, size, mtime, cluster)
417 population = 0 # Population isn't recursive
419 def statistics_latest(self, node, before=inf, except_cluster=0):
420 """Return population, total size and last mtime
421 for all latest versions under node that
422 do not belong to the cluster.
426 props = self.node_get_properties(node)
431 # The latest version.
432 s = select([self.versions.c.serial,
433 self.versions.c.node,
434 self.versions.c.size,
435 self.versions.c.source,
436 self.versions.c.mtime,
437 self.versions.c.muser,
438 self.versions.c.cluster])
439 s = s.where(and_(self.versions.c.cluster != except_cluster,
440 self.versions.c.serial == select(
441 [func.max(self.versions.c.serial)],
442 and_(self.versions.c.node == node,
443 self.versions.c.mtime < before))))
444 r = self.conn.execute(s)
451 # First level, just under node (get population).
452 v = self.versions.alias('v')
453 s = select([func.count(v.c.serial),
455 func.max(v.c.mtime)])
456 c1 = select([func.max(self.versions.c.serial)],
457 and_(self.versions.c.node == v.c.node,
458 self.versions.c.mtime < before))
459 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
460 s = s.where(and_(v.c.serial == c1,
461 v.c.cluster != except_cluster,
463 rp = self.conn.execute(s)
469 mtime = max(mtime, r[2])
473 # All children (get size and mtime).
474 # XXX: This is why the full path is stored.
475 s = select([func.count(v.c.serial),
477 func.max(v.c.mtime)])
478 c1 = select([func.max(self.versions.c.serial)],
479 and_(self.versions.c.node == v.c.node,
480 self.versions.c.mtime < before))
481 c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
482 s = s.where(and_(v.c.serial == c1,
483 v.c.cluster != except_cluster,
485 rp = self.conn.execute(s)
490 size = r[1] - props[SIZE]
491 mtime = max(mtime, r[2])
492 return (count, size, mtime)
494 def version_create(self, node, size, source, muser, cluster=0):
495 """Create a new version from the given properties.
496 Return the (serial, mtime) of the new version.
500 props = (node, size, source, mtime, muser, cluster)
503 s = self.versions.insert().values(**props)
504 serial = self.conn.execute(s).inserted_primary_key[0]
505 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
508 def version_lookup(self, node, before=inf, cluster=0):
509 """Lookup the current version of the given node.
510 Return a list with its properties:
511 (serial, node, size, source, mtime, muser, cluster)
512 or None if the current version is not found in the given cluster.
515 v = self.versions.alias('v')
516 s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
517 v.c.muser, v.c.cluster])
518 c = select([func.max(self.versions.c.serial)],
519 and_(self.versions.c.node == node,
520 self.versions.c.mtime < before))
521 s = s.where(and_(v.c.serial == c,
522 v.c.cluster == cluster))
523 r = self.conn.execute(s)
530 def version_get_properties(self, serial, keys=(), propnames=_propnames):
531 """Return a sequence of values for the properties of
532 the version specified by serial and the keys, in the order given.
533 If keys is empty, return all properties in the order
534 (serial, node, size, source, mtime, muser, cluster).
537 v = self.versions.alias()
538 s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
539 v.c.muser, v.c.cluster], v.c.serial == serial)
540 rp = self.conn.execute(s)
548 return [r[propnames[k]] for k in keys if k in propnames]
550 def version_recluster(self, serial, cluster):
551 """Move the version into another cluster."""
553 props = self.version_get_properties(serial)
558 oldcluster = props[CLUSTER]
559 if cluster == oldcluster:
563 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
564 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
566 s = self.versions.update()
567 s = s.where(self.versions.c.serial == serial)
568 s = s.values(cluster = cluster)
569 self.conn.execute(s).close()
571 def version_remove(self, serial):
572 """Remove the serial specified."""
574 props = self.node_get_properties(serial)
579 cluster = props[CLUSTER]
582 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
584 s = self.versions.delete().where(self.versions.c.serial == serial)
585 self.conn.execute(s).close()
588 def attribute_get(self, serial, keys=()):
589 """Return a list of (key, value) pairs of the version specified by serial.
590 If keys is empty, return all attributes.
591 Othwerise, return only those specified.
595 attrs = self.attributes.alias()
596 s = select([attrs.c.key, attrs.c.value])
597 s = s.where(and_(attrs.c.key.in_(keys),
598 attrs.c.serial == serial))
600 attrs = self.attributes.alias()
601 s = select([attrs.c.key, attrs.c.value])
602 s = s.where(attrs.c.serial == serial)
603 r = self.conn.execute(s)
608 def attribute_set(self, serial, items):
609 """Set the attributes of the version specified by serial.
610 Receive attributes as an iterable of (key, value) pairs.
615 s = self.attributes.update()
616 s = s.where(and_(self.attributes.c.serial == serial,
617 self.attributes.c.key == k))
618 s = s.values(value = v)
619 rp = self.conn.execute(s)
622 s = self.attributes.insert()
623 s = s.values(serial=serial, key=k, value=v)
624 self.conn.execute(s).close()
626 def attribute_del(self, serial, keys=()):
627 """Delete attributes of the version specified by serial.
628 If keys is empty, delete all attributes.
629 Otherwise delete those specified.
633 #TODO more efficient way to do this?
635 s = self.attributes.delete()
636 s = s.where(and_(self.attributes.c.serial == serial,
637 self.attributes.c.key == key))
638 self.conn.execute(s).close()
640 s = self.attributes.delete()
641 s = s.where(self.attributes.c.serial == serial)
642 self.conn.execute(s).close()
644 def attribute_copy(self, source, dest):
645 s = select([dest, self.attributes.c.key, self.attributes.c.value],
646 self.attributes.c.serial == source)
647 rp = self.conn.execute(s)
648 attributes = rp.fetchall()
650 for dest, k, v in attributes:
652 s = self.attributes.update().where(and_(
653 self.attributes.c.serial == dest,
654 self.attributes.c.key == k))
655 rp = self.conn.execute(s, value=v)
658 s = self.attributes.insert()
659 values = {'serial':dest, 'key':k, 'value':v}
660 self.conn.execute(s, values).close()
662 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
663 """Return a list with all keys pairs defined
664 for all latest versions under parent that
665 do not belong to the cluster.
668 # TODO: Use another table to store before=inf results.
669 a = self.attributes.alias('a')
670 v = self.versions.alias('v')
671 n = self.nodes.alias('n')
672 s = select([a.c.key]).distinct()
673 s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
674 and_(self.versions.c.node == v.c.node,
675 self.versions.c.mtime < before)))
676 s = s.where(v.c.cluster != except_cluster)
677 s = s.where(v.c.node.in_(select([self.nodes.c.node],
678 self.nodes.c.parent == parent)))
679 s = s.where(a.c.serial == v.c.serial)
680 s = s.where(n.c.node == v.c.node)
683 conj.append(n.c.path.like(x + '%'))
685 s = s.where(or_(*conj))
686 rp = self.conn.execute(s)
689 return [r[0] for r in rows]
691 def latest_version_list(self, parent, prefix='', delimiter=None,
692 start='', limit=10000, before=inf,
693 except_cluster=0, pathq=[], filterq=None):
694 """Return a (list of (path, serial) tuples, list of common prefixes)
695 for the current versions of the paths with the given parent,
696 matching the following criteria.
698 The property tuple for a version is returned if all
699 of these conditions are true:
705 c. path starts with prefix (and paths in pathq)
707 d. version is the max up to before
709 e. version is not in cluster
711 f. the path does not have the delimiter occuring
712 after the prefix, or ends with the delimiter
714 g. serial matches the attribute filter query.
716 A filter query is a comma-separated list of
717 terms in one of these three forms:
720 an attribute with this key must exist
723 an attribute with this key must not exist
726 the attribute with this key satisfies the value
727 where ?op is one of ==, != <=, >=, <, >.
729 The list of common prefixes includes the prefixes
730 matching up to the first delimiter after prefix,
731 and are reported only once, as "virtual directories".
732 The delimiter is included in the prefixes.
734 If arguments are None, then the corresponding matching rule
737 Limit applies to the first list of tuples returned.
740 if not start or start < prefix:
741 start = strprevling(prefix)
742 nextling = strnextling(prefix)
744 a = self.attributes.alias('a')
745 v = self.versions.alias('v')
746 n = self.nodes.alias('n')
747 s = select([n.c.path, v.c.serial]).distinct()
748 s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
749 and_(self.versions.c.node == v.c.node,
750 self.versions.c.mtime < before)))
751 s = s.where(v.c.cluster != except_cluster)
752 s = s.where(v.c.node.in_(select([self.nodes.c.node],
753 self.nodes.c.parent == parent)))
755 s = s.where(a.c.serial == v.c.serial)
757 s = s.where(n.c.node == v.c.node)
758 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
761 conj.append(n.c.path.like(x + '%'))
764 s = s.where(or_(*conj))
767 s = s.where(a.c.key.in_(filterq.split(',')))
769 s = s.order_by(n.c.path)
773 rp = self.conn.execute(s, start=start)
782 pappend = prefixes.append
784 mappend = matches.append
786 rp = self.conn.execute(s, start=start)
788 props = rp.fetchone()
792 idx = path.find(delimiter, pfz)
801 if idx + dz == len(path):
804 continue # Get one more, in case there is a path.
810 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
812 return matches, prefixes