1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from sqlalchemy import Table, Integer, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, null
38 from sqlalchemy.sql import select
40 from dbworker import DBWorker
45 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
50 def strnextling(prefix):
51 """Return the first unicode string
52 greater than but not starting with given prefix.
53 strnextling('hello') -> 'hellp'
56 ## all strings start with the null string,
57 ## therefore we have to approximate strnextling('')
58 ## with the last unicode character supported by python
59 ## 0x10ffff for wide (32-bit unicode) python builds
60 ## 0x00ffff for narrow (16-bit unicode) python builds
61 ## We will not autodetect. 0xffff is safe enough.
70 def strprevling(prefix):
71 """Return an approximation of the last unicode string
72 less than but not starting with given prefix.
73 strprevling(u'hello') -> u'helln\\xffff'
76 ## There is no prevling for the null string
81 s += unichr(c-1) + unichr(0xffff)
97 """Nodes store path organization and have multiple versions.
98 Versions store object history and have multiple attributes.
99 Attributes store metadata.
102 # TODO: Provide an interface for included and excluded clusters.
104 def __init__(self, **params):
105 DBWorker.__init__(self, **params)
106 metadata = MetaData()
110 columns.append(Column('node', Integer, primary_key=True))
111 columns.append(Column('parent', Integer,
112 ForeignKey('nodes.node',
115 autoincrement=False))
116 #columns.append(Column('path', String(2048), default='', nullable=False))
117 columns.append(Column('path', String(255), default='', nullable=False))
118 self.nodes = Table('nodes', metadata, *columns)
119 # place an index on path
120 Index('idx_nodes_path', self.nodes.c.path, unique=True)
122 #create statistics table
124 columns.append(Column('node', Integer,
125 ForeignKey('nodes.node',
129 columns.append(Column('population', Integer, nullable=False, default=0))
130 columns.append(Column('size', Integer, nullable=False, default=0))
131 columns.append(Column('mtime', Integer))
132 columns.append(Column('cluster', Integer, nullable=False, default=0,
134 self.statistics = Table('statistics', metadata, *columns)
136 #create versions table
138 columns.append(Column('serial', Integer, primary_key=True))
139 columns.append(Column('node', Integer,
140 ForeignKey('nodes.node',
142 onupdate='CASCADE')))
143 columns.append(Column('size', Integer, nullable=False, default=0))
144 columns.append(Column('source', Integer))
145 columns.append(Column('mtime', Integer))
146 columns.append(Column('muser', String(255), nullable=False, default=''))
147 columns.append(Column('cluster', Integer, nullable=False, default=0))
148 self.versions = Table('versions', metadata, *columns)
149 # place an index on node
150 Index('idx_versions_node', self.versions.c.node)
151 # TODO: Sort out if more indexes are needed.
152 #Index('idx_versions_node', self.versions.c.mtime)
154 #create attributes table
156 columns.append(Column('serial', Integer,
157 ForeignKey('versions.serial',
161 columns.append(Column('key', String(255), primary_key=True))
162 columns.append(Column('value', String(255)))
163 self.attributes = Table('attributes', metadata, *columns)
165 metadata.create_all(self.engine)
167 s = self.nodes.select().where(and_(self.nodes.c.node == 1,
168 self.nodes.c.parent == 1))
169 r = self.conn.execute(s).fetchone()
171 s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
174 def node_create(self, parent, path):
175 """Create a new node from the given properties.
176 Return the node identifier of the new node.
178 #TODO catch IntegrityError?
179 s = self.nodes.insert().values(parent=parent, path=path)
180 r = self.conn.execute(s)
181 inserted_primary_key = r.inserted_primary_key[0]
183 return inserted_primary_key
185 def node_lookup(self, path):
186 """Lookup the current node of the given path.
187 Return None if the path is not found.
190 s = select([self.nodes.c.node], self.nodes.c.path == path)
191 r = self.conn.execute(s)
198 def node_get_properties(self, node):
199 """Return the node's (parent, path).
200 Return None if the node is not found.
203 s = select([self.nodes.c.parent, self.nodes.c.path])
204 s = s.where(self.nodes.c.node == node)
205 r = self.conn.execute(s)
210 def node_get_versions(self, node, keys=(), propnames=_propnames):
211 """Return the properties of all versions at node.
212 If keys is empty, return all properties in the order
213 (serial, node, size, source, mtime, muser, cluster).
216 s = select(['*'], self.versions.c.node == node)
217 r = self.conn.execute(s)
225 return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
227 def node_count_children(self, node):
228 """Return node's child count."""
230 s = select([func.count(self.nodes.c.node)])
231 s = s.where(and_(self.nodes.c.parent == node,
232 self.nodes.c.node != ROOTNODE))
233 r = self.conn.execute(s)
238 def node_purge_children(self, parent, before=inf, cluster=0):
239 """Delete all versions with the specified
240 parent and cluster, and return
241 the serials of versions deleted.
242 Clears out nodes with no remaining versions.
245 scalar = select([self.nodes.c.node],
246 self.nodes.c.parent == parent).as_scalar()
247 where_clause = and_(self.versions.c.node.in_(scalar),
248 self.versions.c.cluster == cluster,
249 "versions.mtime <= %f" %before)
250 s = select([func.count(self.versions.c.serial),
251 func.sum(self.versions.c.size)])
252 s = s.where(where_clause)
253 r = self.conn.execute(s)
258 nr, size = row[0], -row[1] if row[1] else 0
260 print '#', parent, -nr, size, mtime, cluster
261 self.statistics_update(parent, -nr, size, mtime, cluster)
262 self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
264 s = select([self.versions.c.serial])
265 s = s.where(where_clause)
266 r = self.conn.execute(s)
267 serials = [row[SERIAL] for row in r.fetchall()]
271 s = self.versions.delete().where(where_clause)
272 r = self.conn.execute(s)
276 a = self.nodes.alias()
277 no_versions = select([func.count(self.versions.c.serial)],
278 self.versions.c.node == a.c.node).as_scalar() == 0
279 n = select([self.nodes.c.node],
280 and_(no_versions, self.nodes.c.parent == parent))
281 s = s.where(self.nodes.c.node.in_(n))
282 s = self.nodes.delete().where(self.nodes.c.node == s)
284 self.conn.execute(s).close()
287 def node_purge(self, node, before=inf, cluster=0):
288 """Delete all versions with the specified
289 node and cluster, and return
290 the serials of versions deleted.
291 Clears out the node if it has no remaining versions.
294 execute = self.execute
295 q = ("select count(serial), sum(size) from versions "
299 args = (node, cluster, before)
301 nr, size = self.fetchone()
305 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
307 q = ("select serial from versions "
312 serials = [r[SERIAL] for r in self.fetchall()]
313 q = ("delete from versions "
318 q = ("delete from nodes "
319 "where node in (select node from nodes n "
320 "where (select count(serial) "
322 "where node = n.node) = 0 "
327 def node_remove(self, node):
328 """Remove the node specified.
329 Return false if the node has children or is not found.
332 if self.node_count_children(node):
336 q = ("select count(serial), sum(size), cluster "
340 self.execute(q, (node,))
341 for population, size, cluster in self.fetchall():
342 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
344 q = "delete from nodes where node = ?"
345 self.execute(q, (node,))
348 def statistics_get(self, node, cluster=0):
349 """Return population, total size and last mtime
350 for all versions under node that belong to the cluster.
353 q = ("select population, size, mtime from statistics "
354 "where node = ? and cluster = ?")
355 self.execute(q, (node, cluster))
356 return self.fetchone()
358 def statistics_update(self, node, population, size, mtime, cluster=0):
359 """Update the statistics of the given node.
360 Statistics keep track the population, total
361 size of objects and mtime in the node's namespace.
362 May be zero or positive or negative numbers.
365 s = select([self.statistics.c.population, self.statistics.c.size],
366 and_(self.statistics.c.node == node,
367 self.statistics.c.cluster == cluster))
368 res = self.conn.execute(s)
372 prepopulation, presize = (0, 0)
374 prepopulation, presize = r
375 population += prepopulation
378 self.statistics.insert().values(node=node, population=population,
379 size=size, mtime=mtime, cluster=cluster)
380 self.conn.execute(s).close()
382 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
383 """Update the statistics of the given node's parent.
384 Then recursively update all parents up to the root.
385 Population is not recursive.
391 props = self.node_get_properties(node)
395 self.statistics_update(parent, population, size, mtime, cluster)
397 population = 0 # Population isn't recursive
399 def statistics_latest(self, node, before=inf, except_cluster=0):
400 """Return population, total size and last mtime
401 for all latest versions under node that
402 do not belong to the cluster.
405 execute = self.execute
406 fetchone = self.fetchone
409 props = self.node_get_properties(node)
414 # The latest version.
415 q = ("select serial, node, size, source, mtime, muser, cluster "
417 "where serial = (select max(serial) "
419 "where node = ? and mtime < ?) "
421 execute(q, (node, before, except_cluster))
427 # First level, just under node (get population).
428 q = ("select count(serial), sum(size), max(mtime) "
430 "where serial = (select max(serial) "
432 "where node = v.node and mtime < ?) "
434 "and node in (select node "
437 execute(q, (before, except_cluster, node))
442 mtime = max(mtime, r[2])
446 # All children (get size and mtime).
447 # XXX: This is why the full path is stored.
448 q = ("select count(serial), sum(size), max(mtime) "
450 "where serial = (select max(serial) "
452 "where node = v.node and mtime < ?) "
454 "and node in (select node "
456 "where path like ?)")
457 execute(q, (before, except_cluster, path + '%'))
461 size = r[1] - props[SIZE]
462 mtime = max(mtime, r[2])
463 return (count, size, mtime)
465 def version_create(self, node, size, source, muser, cluster=0):
466 """Create a new version from the given properties.
467 Return the (serial, mtime) of the new version.
470 q = ("insert into versions (node, size, source, mtime, muser, cluster) "
471 "values (?, ?, ?, ?, ?, ?)")
473 props = (node, size, source, mtime, muser, cluster)
474 serial = self.execute(q, props).lastrowid
475 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
478 def version_lookup(self, node, before=inf, cluster=0):
479 """Lookup the current version of the given node.
480 Return a list with its properties:
481 (serial, node, size, source, mtime, muser, cluster)
482 or None if the current version is not found in the given cluster.
485 q = ("select serial, node, size, source, mtime, muser, cluster "
487 "where serial = (select max(serial) "
489 "where node = ? and mtime < ?) "
491 self.execute(q, (node, before, cluster))
492 props = self.fetchone()
493 if props is not None:
497 def version_get_properties(self, serial, keys=(), propnames=_propnames):
498 """Return a sequence of values for the properties of
499 the version specified by serial and the keys, in the order given.
500 If keys is empty, return all properties in the order
501 (serial, node, size, source, mtime, muser, cluster).
504 q = ("select serial, node, size, source, mtime, muser, cluster "
507 self.execute(q, (serial,))
514 return [r[propnames[k]] for k in keys if k in propnames]
516 def version_recluster(self, serial, cluster):
517 """Move the version into another cluster."""
519 props = self.version_get_properties(serial)
524 oldcluster = props[CLUSTER]
525 if cluster == oldcluster:
529 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
530 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
532 q = "update versions set cluster = ? where serial = ?"
533 self.execute(q, (cluster, serial))
535 def version_remove(self, serial):
536 """Remove the serial specified."""
538 props = self.node_get_properties(serial)
543 cluster = props[CLUSTER]
546 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
548 q = "delete from versions where serial = ?"
549 self.execute(q, (serial,))
552 def attribute_get(self, serial, keys=()):
553 """Return a list of (key, value) pairs of the version specified by serial.
554 If keys is empty, return all attributes.
555 Othwerise, return only those specified.
558 execute = self.execute
560 marks = ','.join('?' for k in keys)
561 q = ("select key, value from attributes "
562 "where key in (%s) and serial = ?" % (marks,))
563 execute(q, keys + (serial,))
565 q = "select key, value from attributes where serial = ?"
566 execute(q, (serial,))
567 return self.fetchall()
569 def attribute_set(self, serial, items):
570 """Set the attributes of the version specified by serial.
571 Receive attributes as an iterable of (key, value) pairs.
574 q = ("insert or replace into attributes (serial, key, value) "
576 self.executemany(q, ((serial, k, v) for k, v in items))
578 def attribute_del(self, serial, keys=()):
579 """Delete attributes of the version specified by serial.
580 If keys is empty, delete all attributes.
581 Otherwise delete those specified.
585 q = "delete from attributes where serial = ? and key = ?"
586 self.executemany(q, ((serial, key) for key in keys))
588 q = "delete from attributes where serial = ?"
589 self.execute(q, (serial,))
591 def attribute_copy(self, source, dest):
592 q = ("insert or replace into attributes "
593 "select ?, key, value from attributes "
595 self.execute(q, (dest, source))
597 def _construct_filters(self, filterq):
601 args = filterq.split(',')
602 subq = " and a.key in ("
603 subq += ','.join(('?' for x in args))
608 def _construct_paths(self, pathq):
613 subq += ' or '.join(('n.path like ?' for x in pathq))
615 args = tuple([x + '%' for x in pathq])
619 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
620 """Return a list with all keys pairs defined
621 for all latest versions under parent that
622 do not belong to the cluster.
625 # TODO: Use another table to store before=inf results.
626 q = ("select distinct a.key "
627 "from attributes a, versions v, nodes n "
628 "where v.serial = (select max(serial) "
630 "where node = v.node and mtime < ?) "
631 "and v.cluster != ? "
632 "and v.node in (select node "
635 "and a.serial = v.serial "
636 "and n.node = v.node")
637 args = (before, except_cluster, parent)
638 subq, subargs = self._construct_paths(pathq)
642 self.execute(q, args)
643 return [r[0] for r in self.fetchall()]
645 def latest_version_list(self, parent, prefix='', delimiter=None,
646 start='', limit=10000, before=inf,
647 except_cluster=0, pathq=[], filterq=None):
648 """Return a (list of (path, serial) tuples, list of common prefixes)
649 for the current versions of the paths with the given parent,
650 matching the following criteria.
652 The property tuple for a version is returned if all
653 of these conditions are true:
659 c. path starts with prefix (and paths in pathq)
661 d. version is the max up to before
663 e. version is not in cluster
665 f. the path does not have the delimiter occuring
666 after the prefix, or ends with the delimiter
668 g. serial matches the attribute filter query.
670 A filter query is a comma-separated list of
671 terms in one of these three forms:
674 an attribute with this key must exist
677 an attribute with this key must not exist
680 the attribute with this key satisfies the value
681 where ?op is one of ==, != <=, >=, <, >.
683 The list of common prefixes includes the prefixes
684 matching up to the first delimiter after prefix,
685 and are reported only once, as "virtual directories".
686 The delimiter is included in the prefixes.
688 If arguments are None, then the corresponding matching rule
691 Limit applies to the first list of tuples returned.
694 execute = self.execute
696 if not start or start < prefix:
697 start = strprevling(prefix)
698 nextling = strnextling(prefix)
700 q = ("select distinct n.path, v.serial "
701 "from attributes a, versions v, nodes n "
702 "where v.serial = (select max(serial) "
704 "where node = v.node and mtime < ?) "
705 "and v.cluster != ? "
706 "and v.node in (select node "
709 "and a.serial = v.serial "
710 "and n.node = v.node "
711 "and n.path > ? and n.path < ?")
712 args = [before, except_cluster, parent, start, nextling]
714 subq, subargs = self._construct_paths(pathq)
718 subq, subargs = self._construct_filters(filterq)
723 q = q.replace("attributes a, ", "")
724 q = q.replace("and a.serial = v.serial ", "")
725 q += " order by n.path"
731 return self.fetchall(), ()
736 fetchone = self.fetchone
738 pappend = prefixes.append
740 mappend = matches.append
748 idx = path.find(delimiter, pfz)
759 if idx + dz == len(path):
765 args[3] = strnextling(pf) # New start.
768 return matches, prefixes