1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, Float, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, or_, null, select, bindparam
38 from duplicate import insertOnDuplicate
39 from dbworker import DBWorker
43 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
48 def strnextling(prefix):
49 """Return the first unicode string
50 greater than but not starting with given prefix.
51 strnextling('hello') -> 'hellp'
54 ## all strings start with the null string,
55 ## therefore we have to approximate strnextling('')
56 ## with the last unicode character supported by python
57 ## 0x10ffff for wide (32-bit unicode) python builds
58 ## 0x00ffff for narrow (16-bit unicode) python builds
59 ## We will not autodetect. 0xffff is safe enough.
68 def strprevling(prefix):
69 """Return an approximation of the last unicode string
70 less than but not starting with given prefix.
71 strprevling(u'hello') -> u'helln\\xffff'
74 ## There is no prevling for the null string
79 #s += unichr(c-1) + unichr(0xffff)
96 """Nodes store path organization and have multiple versions.
97 Versions store object history and have multiple attributes.
98 Attributes store metadata.
101 # TODO: Provide an interface for included and excluded clusters.
103 def __init__(self, **params):
104 DBWorker.__init__(self, **params)
105 metadata = MetaData()
109 columns.append(Column('node', Integer, primary_key=True))
110 columns.append(Column('parent', Integer,
111 ForeignKey('nodes.node',
114 autoincrement=False))
115 columns.append(Column('path', String(2048), default='', nullable=False))
116 self.nodes = Table('nodes', metadata, *columns)
117 # place an index on path
118 Index('idx_nodes_path', self.nodes.c.path, unique=True)
120 #create statistics table
122 columns.append(Column('node', Integer,
123 ForeignKey('nodes.node',
127 columns.append(Column('population', Integer, nullable=False, default=0))
128 columns.append(Column('size', Integer, nullable=False, default=0))
129 columns.append(Column('mtime', Float))
130 columns.append(Column('cluster', Integer, nullable=False, default=0,
132 self.statistics = Table('statistics', metadata, *columns)
134 #create versions table
136 columns.append(Column('serial', Integer, primary_key=True))
137 columns.append(Column('node', Integer,
138 ForeignKey('nodes.node',
140 onupdate='CASCADE')))
141 columns.append(Column('size', Integer, nullable=False, default=0))
142 columns.append(Column('source', Integer))
143 columns.append(Column('mtime', Float))
144 columns.append(Column('muser', String(255), nullable=False, default=''))
145 columns.append(Column('cluster', Integer, nullable=False, default=0))
146 self.versions = Table('versions', metadata, *columns)
147 # place an index on node
148 Index('idx_versions_node', self.versions.c.node)
149 # TODO: Sort out if more indexes are needed.
150 #Index('idx_versions_node', self.versions.c.mtime)
152 #create attributes table
154 columns.append(Column('serial', Integer,
155 ForeignKey('versions.serial',
159 columns.append(Column('key', String(255), primary_key=True))
160 columns.append(Column('value', String(255)))
161 self.attributes = Table('attributes', metadata, *columns)
163 metadata.create_all(self.engine)
165 s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
166 self.nodes.c.parent == ROOTNODE))
167 r = self.conn.execute(s).fetchone()
169 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
172 def node_create(self, parent, path):
173 """Create a new node from the given properties.
174 Return the node identifier of the new node.
176 #TODO catch IntegrityError?
177 s = self.nodes.insert().values(parent=parent, path=path)
178 r = self.conn.execute(s)
179 inserted_primary_key = r.inserted_primary_key[0]
181 return inserted_primary_key
183 def node_lookup(self, path):
184 """Lookup the current node of the given path.
185 Return None if the path is not found.
188 s = select([self.nodes.c.node], self.nodes.c.path == path)
189 r = self.conn.execute(s)
196 def node_get_properties(self, node):
197 """Return the node's (parent, path).
198 Return None if the node is not found.
201 s = select([self.nodes.c.parent, self.nodes.c.path])
202 s = s.where(self.nodes.c.node == node)
203 r = self.conn.execute(s)
208 def node_get_versions(self, node, keys=(), propnames=_propnames):
209 """Return the properties of all versions at node.
210 If keys is empty, return all properties in the order
211 (serial, node, size, source, mtime, muser, cluster).
214 s = select(['*'], self.versions.c.node == node)
215 r = self.conn.execute(s)
223 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
225 def node_count_children(self, node):
226 """Return node's child count."""
228 s = select([func.count(self.nodes.c.node)])
229 s = s.where(and_(self.nodes.c.parent == node,
230 self.nodes.c.node != ROOTNODE))
231 r = self.conn.execute(s)
236 def node_purge_children(self, parent, before=inf, cluster=0):
237 """Delete all versions with the specified
238 parent and cluster, and return
239 the serials of versions deleted.
240 Clears out nodes with no remaining versions.
243 #TODO handle before=inf
244 c1 = select([self.nodes.c.node],
245 self.nodes.c.parent == parent)
246 where_clause = and_(self.versions.c.node.in_(c1),
247 self.versions.c.cluster == cluster,
248 self.versions.c.mtime <= before)
249 s = select([func.count(self.versions.c.serial),
250 func.sum(self.versions.c.size)])
251 s = s.where(where_clause)
252 r = self.conn.execute(s)
257 nr, size = row[0], -row[1] if row[1] else 0
259 self.statistics_update(parent, -nr, size, mtime, cluster)
260 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
262 s = select([self.versions.c.serial])
263 s = s.where(where_clause)
264 r = self.conn.execute(s)
265 serials = [row[SERIAL] for row in r.fetchall()]
269 s = self.versions.delete().where(where_clause)
270 r = self.conn.execute(s)
274 s = select([self.nodes.c.node],
275 and_(self.nodes.c.parent == parent,
276 select([func.count(self.versions.c.serial)],
277 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
278 r = self.conn.execute(s)
281 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
282 self.conn.execute(s).close()
286 def node_purge(self, node, before=inf, cluster=0):
287 """Delete all versions with the specified
288 node and cluster, and return
289 the serials of versions deleted.
290 Clears out the node if it has no remaining versions.
294 s = select([func.count(self.versions.c.serial),
295 func.sum(self.versions.c.size)])
296 where_clause = and_(self.versions.c.node == node,
297 self.versions.c.cluster == cluster,
298 self.versions.c.mtime <= before)
299 s = s.where(where_clause)
300 r = self.conn.execute(s)
302 nr, size = row[0], row[1]
307 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
309 s = select([self.versions.c.serial])
310 s = s.where(where_clause)
311 r = self.conn.execute(s)
312 serials = [r[SERIAL] for r in r.fetchall()]
315 s = self.versions.delete().where(where_clause)
316 r = self.conn.execute(s)
320 s = select([self.nodes.c.node],
321 and_(self.nodes.c.node == node,
322 select([func.count(self.versions.c.serial)],
323 self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
324 r = self.conn.execute(s)
327 s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
328 self.conn.execute(s).close()
332 def node_remove(self, node):
333 """Remove the node specified.
334 Return false if the node has children or is not found.
337 if self.node_count_children(node):
341 s = select([func.count(self.versions.c.serial),
342 func.sum(self.versions.c.size),
343 self.versions.c.cluster])
344 s = s.where(self.versions.c.node == node)
345 s = s.group_by(self.versions.c.cluster)
346 r = self.conn.execute(s)
347 for population, size, cluster in r.fetchall():
348 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
351 s = self.nodes.delete().where(self.nodes.c.node == node)
352 self.conn.execute(s).close()
355 def statistics_get(self, node, cluster=0):
356 """Return population, total size and last mtime
357 for all versions under node that belong to the cluster.
360 s = select([self.statistics.c.population,
361 self.statistics.c.size,
362 self.statistics.c.mtime])
363 s = s.where(and_(self.statistics.c.node == node,
364 self.statistics.c.cluster == cluster))
365 r = self.conn.execute(s)
370 def statistics_update(self, node, population, size, mtime, cluster=0):
371 """Update the statistics of the given node.
372 Statistics keep track the population, total
373 size of objects and mtime in the node's namespace.
374 May be zero or positive or negative numbers.
377 s = select([self.statistics.c.population, self.statistics.c.size],
378 and_(self.statistics.c.node == node,
379 self.statistics.c.cluster == cluster))
380 rp = self.conn.execute(s)
384 prepopulation, presize = (0, 0)
386 prepopulation, presize = r
387 population += prepopulation
391 u = self.statistics.update().where(and_(self.statistics.c.node==node,
392 self.statistics.c.cluster==cluster))
393 u = u.values(population=population, size=size, mtime=mtime)
394 rp = self.conn.execute(u)
397 ins = self.statistics.insert()
398 ins = ins.values(node=node, population=population, size=size,
399 mtime=mtime, cluster=cluster)
400 self.conn.execute(ins).close()
402 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
403 """Update the statistics of the given node's parent.
404 Then recursively update all parents up to the root.
405 Population is not recursive.
411 props = self.node_get_properties(node)
415 self.statistics_update(parent, population, size, mtime, cluster)
417 population = 0 # Population isn't recursive
419 def statistics_latest(self, node, before=inf, except_cluster=0):
420 """Return population, total size and last mtime
421 for all latest versions under node that
422 do not belong to the cluster.
426 props = self.node_get_properties(node)
431 # The latest version.
432 s = select([self.versions.c.serial,
433 self.versions.c.node,
434 self.versions.c.size,
435 self.versions.c.mtime,
436 self.versions.c.muser,
437 self.versions.c.cluster])
438 s = s.where(and_(self.versions.c.cluster != except_cluster,
439 self.versions.c.serial == select(
440 [func.max(self.versions.c.serial)],
441 and_(self.versions.c.node == node,
442 self.versions.c.mtime < before))))
443 r = self.conn.execute(s)
450 # First level, just under node (get population).
451 v = self.versions.alias('v')
452 s = select([func.count(v.c.serial),
454 func.max(v.c.mtime)])
455 c1 = select([func.max(self.versions.c.serial)],
456 and_(self.versions.c.node == v.c.node,
457 self.versions.c.mtime < before))
458 c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
459 s = s.where(and_(v.c.serial == c1,
460 v.c.cluster != except_cluster,
462 rp = self.conn.execute(s)
468 mtime = max(mtime, r[2])
472 # All children (get size and mtime).
473 # XXX: This is why the full path is stored.
474 s = select([func.count(v.c.serial),
476 func.max(v.c.mtime)])
477 c1 = select([func.max(self.versions.c.serial)],
478 and_(self.versions.c.node == v.c.node,
479 self.versions.c.mtime < before))
480 c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
481 s = s.where(and_(v.c.serial == c1,
482 v.c.cluster != except_cluster,
484 rp = self.conn.execute(s)
489 size = r[1] - props[SIZE]
490 mtime = max(mtime, r[2])
491 return (count, size, mtime)
493 def version_create(self, node, size, source, muser, cluster=0):
494 """Create a new version from the given properties.
495 Return the (serial, mtime) of the new version.
499 props = (node, size, source, mtime, muser, cluster)
502 s = self.versions.insert().values(**props)
503 serial = self.conn.execute(s).inserted_primary_key[0]
504 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
507 def version_lookup(self, node, before=inf, cluster=0):
508 """Lookup the current version of the given node.
509 Return a list with its properties:
510 (serial, node, size, source, mtime, muser, cluster)
511 or None if the current version is not found in the given cluster.
514 v = self.versions.alias('v')
515 s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
516 v.c.muser, v.c.cluster])
517 c = select([func.max(self.versions.c.serial)],
518 and_(self.versions.c.node == node,
519 self.versions.c.mtime < before))
520 s = s.where(and_(v.c.serial == c,
521 v.c.cluster == cluster))
522 r = self.conn.execute(s)
529 def version_get_properties(self, serial, keys=(), propnames=_propnames):
530 """Return a sequence of values for the properties of
531 the version specified by serial and the keys, in the order given.
532 If keys is empty, return all properties in the order
533 (serial, node, size, source, mtime, muser, cluster).
536 v = self.versions.alias()
537 s = select([v.c.serial, v.c.node, v.c.size, v.c.source, v.c.mtime,
538 v.c.muser, v.c.cluster], v.c.serial == serial)
539 rp = self.conn.execute(s)
547 return [r[propnames[k]] for k in keys if k in propnames]
549 def version_recluster(self, serial, cluster):
550 """Move the version into another cluster."""
552 props = self.version_get_properties(serial)
557 oldcluster = props[CLUSTER]
558 if cluster == oldcluster:
562 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
563 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
565 s = self.versions.update()
566 s = s.where(self.versions.c.serial == serial)
567 s = s.values(cluster = cluster)
568 self.conn.execute(s).close()
570 def version_remove(self, serial):
571 """Remove the serial specified."""
573 props = self.node_get_properties(serial)
578 cluster = props[CLUSTER]
581 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
583 s = self.versions.delete().where(self.versions.c.serial == serial)
584 self.conn.execute(s).close()
587 def attribute_get(self, serial, keys=()):
588 """Return a list of (key, value) pairs of the version specified by serial.
589 If keys is empty, return all attributes.
590 Othwerise, return only those specified.
593 execute = self.execute
595 attrs = self.attributes.alias()
596 s = select([attrs.c.key, attrs.c.value])
597 s = s.where(and_(attrs.c.key.in_(keys),
598 attrs.c.serial == serial))
600 attrs = self.attributes.alias()
601 s = select([attrs.c.key, attrs.c.value])
602 s = s.where(attrs.c.serial == serial)
603 r = self.conn.execute(s)
608 def attribute_set(self, serial, items):
609 """Set the attributes of the version specified by serial.
610 Receive attributes as an iterable of (key, value) pairs.
612 values = [{'serial':serial, 'key':k, 'value':v} for k, v in items]
613 self.conn.execute(self.attributes.insert(), values).close()
615 def attribute_del(self, serial, keys=()):
616 """Delete attributes of the version specified by serial.
617 If keys is empty, delete all attributes.
618 Otherwise delete those specified.
622 #TODO more efficient way to do this?
624 s = self.attributes.delete()
625 s = s.where(and_(self.attributes.c.serial == serial,
626 self.attributes.c.key == key))
627 self.conn.execute(s).close()
629 s = self.attributes.delete()
630 s = s.where(self.attributes.c.serial == serial)
631 self.conn.execute(s).close()
633 def attribute_copy(self, source, dest):
634 from sqlalchemy.ext.compiler import compiles
635 from sqlalchemy.sql.expression import UpdateBase
637 class InsertFromSelect(UpdateBase):
638 def __init__(self, table, select):
642 @compiles(InsertFromSelect)
643 def visit_insert_from_select(element, compiler, **kw):
644 return "INSERT INTO %s (%s)" % (
645 compiler.process(element.table, asfrom=True),
646 compiler.process(element.select)
649 s = select([dest, self.attributes.c.key, self.attributes.c.value],
650 self.attributes.c.serial == source)
651 ins = InsertFromSelect(self.attributes, s)
652 self.conn.execute(ins).close()
654 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
655 """Return a list with all keys pairs defined
656 for all latest versions under parent that
657 do not belong to the cluster.
660 # TODO: Use another table to store before=inf results.
661 a = self.attributes.alias('a')
662 v = self.versions.alias('v')
663 n = self.nodes.alias('n')
664 s = select([a.c.key]).distinct()
665 s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
666 and_(self.versions.c.node == v.c.node,
667 self.versions.c.mtime < before)))
668 s = s.where(v.c.cluster != except_cluster)
669 s = s.where(v.c.node.in_(select([self.nodes.c.node],
670 self.nodes.c.parent == parent)))
671 s = s.where(a.c.serial == v.c.serial)
672 s = s.where(n.c.node == v.c.node)
675 conj.append(n.c.path.like(x + '%'))
677 s = s.where(or_(*conj))
678 rp = self.conn.execute(s)
681 return [r[0] for r in self.fetchall()]
683 def latest_version_list(self, parent, prefix='', delimiter=None,
684 start='', limit=10000, before=inf,
685 except_cluster=0, pathq=[], filterq=None):
686 """Return a (list of (path, serial) tuples, list of common prefixes)
687 for the current versions of the paths with the given parent,
688 matching the following criteria.
690 The property tuple for a version is returned if all
691 of these conditions are true:
697 c. path starts with prefix (and paths in pathq)
699 d. version is the max up to before
701 e. version is not in cluster
703 f. the path does not have the delimiter occuring
704 after the prefix, or ends with the delimiter
706 g. serial matches the attribute filter query.
708 A filter query is a comma-separated list of
709 terms in one of these three forms:
712 an attribute with this key must exist
715 an attribute with this key must not exist
718 the attribute with this key satisfies the value
719 where ?op is one of ==, != <=, >=, <, >.
721 The list of common prefixes includes the prefixes
722 matching up to the first delimiter after prefix,
723 and are reported only once, as "virtual directories".
724 The delimiter is included in the prefixes.
726 If arguments are None, then the corresponding matching rule
729 Limit applies to the first list of tuples returned.
733 if not start or start < prefix:
734 start = strprevling(prefix)
735 nextling = strnextling(prefix)
737 a = self.attributes.alias('a')
738 v = self.versions.alias('v')
739 n = self.nodes.alias('n')
740 s = select([n.c.path, v.c.serial]).distinct()
741 s = s.where(v.c.serial == select([func.max(self.versions.c.serial)],
742 and_(self.versions.c.node == v.c.node,
743 self.versions.c.mtime < before)))
744 s = s.where(v.c.cluster != except_cluster)
745 s = s.where(v.c.node.in_(select([self.nodes.c.node],
746 self.nodes.c.parent == parent)))
748 s = s.where(a.c.serial == v.c.serial)
750 s = s.where(n.c.node == v.c.node)
751 s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
754 conj.append(n.c.path.like(x + '%'))
757 s = s.where(or_(*conj))
760 s = s.where(a.c.key.in_(filterq.split(',')))
762 s = s.order_by(n.c.path)
766 rp = self.conn.execute(s, start=start)
775 pappend = prefixes.append
777 mappend = matches.append
779 rp = self.conn.execute(s, start=start)
781 props = rp.fetchone()
785 idx = path.find(delimiter, pfz)
794 if idx + dz == len(path):
797 continue # Get one more, in case there is a path.
803 rp = self.conn.execute(s, start=strnextling(pf)) # New start.
805 return matches, prefixes