1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 from dbworker import DBWorker
38 from pithos.backends.filter import parse_filters
43 ( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
45 ( MATCH_PREFIX, MATCH_EXACT ) = range(2)
50 def strnextling(prefix):
51 """Return the first unicode string
52 greater than but not starting with given prefix.
53 strnextling('hello') -> 'hellp'
56 ## all strings start with the null string,
57 ## therefore we have to approximate strnextling('')
58 ## with the last unicode character supported by python
59 ## 0x10ffff for wide (32-bit unicode) python builds
60 ## 0x00ffff for narrow (16-bit unicode) python builds
61 ## We will not autodetect. 0xffff is safe enough.
70 def strprevling(prefix):
71 """Return an approximation of the last unicode string
72 less than but not starting with given prefix.
73 strprevling(u'hello') -> u'helln\\xffff'
76 ## There is no prevling for the null string
81 s += unichr(c-1) + unichr(0xffff)
100 class Node(DBWorker):
101 """Nodes store path organization and have multiple versions.
102 Versions store object history and have multiple attributes.
103 Attributes store metadata.
106 # TODO: Provide an interface for included and excluded clusters.
108 def __init__(self, **params):
109 DBWorker.__init__(self, **params)
110 execute = self.execute
112 execute(""" pragma foreign_keys = on """)
114 execute(""" create table if not exists nodes
115 ( node integer primary key,
116 parent integer default 0,
117 path text not null default '',
119 references nodes(node)
121 on delete cascade ) """)
122 execute(""" create unique index if not exists idx_nodes_path
125 execute(""" create table if not exists policy
129 primary key (node, key)
131 references nodes(node)
133 on delete cascade ) """)
135 execute(""" create table if not exists statistics
137 population integer not null default 0,
138 size integer not null default 0,
140 cluster integer not null default 0,
141 primary key (node, cluster)
143 references nodes(node)
145 on delete cascade ) """)
147 execute(""" create table if not exists versions
148 ( serial integer primary key,
151 size integer not null default 0,
152 type text not null default '',
155 muser text not null default '',
156 uuid text not null default '',
157 checksum text not null default '',
158 cluster integer not null default 0,
160 references nodes(node)
162 on delete cascade ) """)
163 execute(""" create index if not exists idx_versions_node_mtime
164 on versions(node, mtime) """)
165 execute(""" create index if not exists idx_versions_node_uuid
166 on versions(uuid) """)
168 execute(""" create table if not exists attributes
173 primary key (serial, domain, key)
175 references versions(serial)
177 on delete cascade ) """)
179 q = "insert or ignore into nodes(node, parent) values (?, ?)"
180 execute(q, (ROOTNODE, ROOTNODE))
182 def node_create(self, parent, path):
183 """Create a new node from the given properties.
184 Return the node identifier of the new node.
187 q = ("insert into nodes (parent, path) "
189 props = (parent, path)
190 return self.execute(q, props).lastrowid
192 def node_lookup(self, path):
193 """Lookup the current node of the given path.
194 Return None if the path is not found.
197 q = "select node from nodes where path = ?"
198 self.execute(q, (path,))
204 def node_lookup_bulk(self, paths):
205 """Lookup the current nodes for the given paths.
206 Return () if the path is not found.
209 placeholders = ','.join('?' for path in paths)
210 q = "select node from nodes where path in (%s)" % placeholders
211 self.execute(q, paths)
214 return [row[0] for row in r]
217 def node_get_properties(self, node):
218 """Return the node's (parent, path).
219 Return None if the node is not found.
222 q = "select parent, path from nodes where node = ?"
223 self.execute(q, (node,))
224 return self.fetchone()
226 def node_get_versions(self, node, keys=(), propnames=_propnames):
227 """Return the properties of all versions at node.
228 If keys is empty, return all properties in the order
229 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
232 q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
235 self.execute(q, (node,))
242 return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
244 def node_count_children(self, node):
245 """Return node's child count."""
247 q = "select count(node) from nodes where parent = ? and node != 0"
248 self.execute(q, (node,))
254 def node_purge_children(self, parent, before=inf, cluster=0):
255 """Delete all versions with the specified
256 parent and cluster, and return
257 the hashes and size of versions deleted.
258 Clears out nodes with no remaining versions.
261 execute = self.execute
262 q = ("select count(serial), sum(size) from versions "
263 "where node in (select node "
268 args = (parent, cluster, before)
270 nr, size = self.fetchone()
274 self.statistics_update(parent, -nr, -size, mtime, cluster)
275 self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
277 q = ("select hash from versions "
278 "where node in (select node "
284 hashes = [r[0] for r in self.fetchall()]
285 q = ("delete from versions "
286 "where node in (select node "
292 q = ("delete from nodes "
293 "where node in (select node from nodes n "
294 "where (select count(serial) "
296 "where node = n.node) = 0 "
298 execute(q, (parent,))
301 def node_purge(self, node, before=inf, cluster=0):
302 """Delete all versions with the specified
303 node and cluster, and return
304 the hashes and size of versions deleted.
305 Clears out the node if it has no remaining versions.
308 execute = self.execute
309 q = ("select count(serial), sum(size) from versions "
313 args = (node, cluster, before)
315 nr, size = self.fetchone()
319 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
321 q = ("select hash from versions "
326 hashes = [r[0] for r in self.fetchall()]
327 q = ("delete from versions "
332 q = ("delete from nodes "
333 "where node in (select node from nodes n "
334 "where (select count(serial) "
336 "where node = n.node) = 0 "
341 def node_remove(self, node):
342 """Remove the node specified.
343 Return false if the node has children or is not found.
346 if self.node_count_children(node):
350 q = ("select count(serial), sum(size), cluster "
354 self.execute(q, (node,))
355 for population, size, cluster in self.fetchall():
356 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
358 q = "delete from nodes where node = ?"
359 self.execute(q, (node,))
362 def policy_get(self, node):
363 q = "select key, value from policy where node = ?"
364 self.execute(q, (node,))
365 return dict(self.fetchall())
367 def policy_set(self, node, policy):
368 q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
369 self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
371 def statistics_get(self, node, cluster=0):
372 """Return population, total size and last mtime
373 for all versions under node that belong to the cluster.
376 q = ("select population, size, mtime from statistics "
377 "where node = ? and cluster = ?")
378 self.execute(q, (node, cluster))
379 return self.fetchone()
381 def statistics_update(self, node, population, size, mtime, cluster=0):
382 """Update the statistics of the given node.
383 Statistics keep track the population, total
384 size of objects and mtime in the node's namespace.
385 May be zero or positive or negative numbers.
388 qs = ("select population, size from statistics "
389 "where node = ? and cluster = ?")
390 qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
391 "values (?, ?, ?, ?, ?)")
392 self.execute(qs, (node, cluster))
395 prepopulation, presize = (0, 0)
397 prepopulation, presize = r
398 population += prepopulation
400 self.execute(qu, (node, population, size, mtime, cluster))
402 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
403 """Update the statistics of the given node's parent.
404 Then recursively update all parents up to the root.
405 Population is not recursive.
411 props = self.node_get_properties(node)
415 self.statistics_update(parent, population, size, mtime, cluster)
417 population = 0 # Population isn't recursive
419 def statistics_latest(self, node, before=inf, except_cluster=0):
420 """Return population, total size and last mtime
421 for all latest versions under node that
422 do not belong to the cluster.
425 execute = self.execute
426 fetchone = self.fetchone
429 props = self.node_get_properties(node)
434 # The latest version.
435 q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
437 "where serial = (select max(serial) "
439 "where node = ? and mtime < ?) "
441 execute(q, (node, before, except_cluster))
447 # First level, just under node (get population).
448 q = ("select count(serial), sum(size), max(mtime) "
450 "where serial = (select max(serial) "
452 "where node = v.node and mtime < ?) "
454 "and node in (select node "
457 execute(q, (before, except_cluster, node))
462 mtime = max(mtime, r[2])
466 # All children (get size and mtime).
467 # This is why the full path is stored.
468 q = ("select count(serial), sum(size), max(mtime) "
470 "where serial = (select max(serial) "
472 "where node = v.node and mtime < ?) "
474 "and node in (select node "
476 "where path like ? escape '\\')")
477 execute(q, (before, except_cluster, self.escape_like(path) + '%'))
481 size = r[1] - props[SIZE]
482 mtime = max(mtime, r[2])
483 return (count, size, mtime)
485 def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
486 """Create a new version from the given properties.
487 Return the (serial, mtime) of the new version.
490 q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
491 "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
493 props = (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
494 serial = self.execute(q, props).lastrowid
495 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
498 def version_lookup(self, node, before=inf, cluster=0, all_props=True):
499 """Lookup the current version of the given node.
500 Return a list with its properties:
501 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
502 or None if the current version is not found in the given cluster.
507 "where serial = (select max(serial) "
509 "where node = ? and mtime < ?) "
514 q = q % "serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster"
516 self.execute(q, (node, before, cluster))
517 props = self.fetchone()
518 if props is not None:
522 def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
523 """Lookup the current versions of the given nodes.
524 Return a list with their properties:
525 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
530 "where serial in (select max(serial) "
532 "where node in (%s) and mtime < ? group by node) "
534 placeholders = ','.join('?' for node in nodes)
536 q = q % ("serial", placeholders)
538 q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", placeholders)
541 args.extend((before, cluster))
542 self.execute(q, args)
543 return self.fetchall()
545 def version_get_properties(self, serial, keys=(), propnames=_propnames):
546 """Return a sequence of values for the properties of
547 the version specified by serial and the keys, in the order given.
548 If keys is empty, return all properties in the order
549 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
552 q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
555 self.execute(q, (serial,))
562 return [r[propnames[k]] for k in keys if k in propnames]
564 def version_put_property(self, serial, key, value):
565 """Set value for the property of version specified by key."""
567 if key not in _propnames:
569 q = "update versions set %s = ? where serial = ?" % key
570 self.execute(q, (value, serial))
572 def version_recluster(self, serial, cluster):
573 """Move the version into another cluster."""
575 props = self.version_get_properties(serial)
580 oldcluster = props[CLUSTER]
581 if cluster == oldcluster:
585 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
586 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
588 q = "update versions set cluster = ? where serial = ?"
589 self.execute(q, (cluster, serial))
591 def version_remove(self, serial):
592 """Remove the serial specified."""
594 props = self.version_get_properties(serial)
600 cluster = props[CLUSTER]
603 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
605 q = "delete from versions where serial = ?"
606 self.execute(q, (serial,))
609 def attribute_get(self, serial, domain, keys=()):
610 """Return a list of (key, value) pairs of the version specified by serial.
611 If keys is empty, return all attributes.
612 Othwerise, return only those specified.
615 execute = self.execute
617 marks = ','.join('?' for k in keys)
618 q = ("select key, value from attributes "
619 "where key in (%s) and serial = ? and domain = ?" % (marks,))
620 execute(q, keys + (serial, domain))
622 q = "select key, value from attributes where serial = ? and domain = ?"
623 execute(q, (serial, domain))
624 return self.fetchall()
626 def attribute_set(self, serial, domain, items):
627 """Set the attributes of the version specified by serial.
628 Receive attributes as an iterable of (key, value) pairs.
631 q = ("insert or replace into attributes (serial, domain, key, value) "
632 "values (?, ?, ?, ?)")
633 self.executemany(q, ((serial, domain, k, v) for k, v in items))
635 def attribute_del(self, serial, domain, keys=()):
636 """Delete attributes of the version specified by serial.
637 If keys is empty, delete all attributes.
638 Otherwise delete those specified.
642 q = "delete from attributes where serial = ? and domain = ? and key = ?"
643 self.executemany(q, ((serial, domain, key) for key in keys))
645 q = "delete from attributes where serial = ? and domain = ?"
646 self.execute(q, (serial, domain))
648 def attribute_copy(self, source, dest):
649 q = ("insert or replace into attributes "
650 "select ?, domain, key, value from attributes "
652 self.execute(q, (dest, source))
654 def _construct_filters(self, domain, filterq):
655 if not domain or not filterq:
659 append = subqlist.append
660 included, excluded, opers = parse_filters(filterq)
664 subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
665 subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
672 subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
673 subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
680 for k, o, v in opers:
681 subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
682 subq += "key = ? and value %s ?" % (o,)
684 args += [domain, k, v]
690 subq = ' and ' + ' and '.join(subqlist)
694 def _construct_paths(self, pathq):
700 for path, match in pathq:
701 if match == MATCH_PREFIX:
702 subqlist.append("n.path like ? escape '\\'")
703 args.append(self.escape_like(path) + '%')
704 elif match == MATCH_EXACT:
705 subqlist.append("n.path = ?")
708 subq = ' and (' + ' or '.join(subqlist) + ')'
713 def _construct_size(self, sizeq):
714 if not sizeq or len(sizeq) != 2:
720 subq += " and v.size >= ?"
723 subq += " and v.size < ?"
728 def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
729 """Return a list with all keys pairs defined
730 for all latest versions under parent that
731 do not belong to the cluster.
734 # TODO: Use another table to store before=inf results.
735 q = ("select distinct a.key "
736 "from attributes a, versions v, nodes n "
737 "where v.serial = (select max(serial) "
739 "where node = v.node and mtime < ?) "
740 "and v.cluster != ? "
741 "and v.node in (select node "
744 "and a.serial = v.serial "
746 "and n.node = v.node")
747 args = (before, except_cluster, parent, domain)
748 subq, subargs = self._construct_paths(pathq)
752 self.execute(q, args)
753 return [r[0] for r in self.fetchall()]
755 def latest_version_list(self, parent, prefix='', delimiter=None,
756 start='', limit=10000, before=inf,
757 except_cluster=0, pathq=[], domain=None,
758 filterq=[], sizeq=None, all_props=False):
759 """Return a (list of (path, serial) tuples, list of common prefixes)
760 for the current versions of the paths with the given parent,
761 matching the following criteria.
763 The property tuple for a version is returned if all
764 of these conditions are true:
770 c. path starts with prefix (and paths in pathq)
772 d. version is the max up to before
774 e. version is not in cluster
776 f. the path does not have the delimiter occuring
777 after the prefix, or ends with the delimiter
779 g. serial matches the attribute filter query.
781 A filter query is a comma-separated list of
782 terms in one of these three forms:
785 an attribute with this key must exist
788 an attribute with this key must not exist
791 the attribute with this key satisfies the value
792 where ?op is one of =, != <=, >=, <, >.
794 h. the size is in the range set by sizeq
796 The list of common prefixes includes the prefixes
797 matching up to the first delimiter after prefix,
798 and are reported only once, as "virtual directories".
799 The delimiter is included in the prefixes.
801 If arguments are None, then the corresponding matching rule
804 Limit applies to the first list of tuples returned.
806 If all_props is True, return all properties after path, not just serial.
809 execute = self.execute
811 if not start or start < prefix:
812 start = strprevling(prefix)
813 nextling = strnextling(prefix)
815 q = ("select distinct n.path, %s "
816 "from versions v, nodes n "
817 "where v.serial = (select max(serial) "
819 "where node = v.node and mtime < ?) "
820 "and v.cluster != ? "
821 "and v.node in (select node "
824 "and n.node = v.node "
825 "and n.path > ? and n.path < ?")
829 q = q % "v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster"
830 args = [before, except_cluster, parent, start, nextling]
832 subq, subargs = self._construct_paths(pathq)
836 subq, subargs = self._construct_size(sizeq)
840 subq, subargs = self._construct_filters(domain, filterq)
845 q = q.replace("attributes a, ", "")
846 q = q.replace("and a.serial = v.serial ", "")
847 q += " order by n.path"
853 return self.fetchall(), ()
858 fetchone = self.fetchone
860 pappend = prefixes.append
862 mappend = matches.append
871 idx = path.find(delimiter, pfz)
880 if idx + dz == len(path):
883 continue # Get one more, in case there is a path.
889 args[3] = strnextling(pf) # New start.
892 return matches, prefixes
894 def latest_uuid(self, uuid):
895 """Return a (path, serial) tuple, for the latest version of the given uuid."""
897 q = ("select n.path, v.serial "
898 "from versions v, nodes n "
899 "where v.serial = (select max(serial) "
902 "and n.node = v.node")
903 self.execute(q, (uuid,))
904 return self.fetchone()