1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 from dbworker import DBWorker
38 from pithos.backends.filter import parse_filters
43 ( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
45 ( MATCH_PREFIX, MATCH_EXACT ) = range(2)
50 def strnextling(prefix):
51 """Return the first unicode string
52 greater than but not starting with given prefix.
53 strnextling('hello') -> 'hellp'
56 ## all strings start with the null string,
57 ## therefore we have to approximate strnextling('')
58 ## with the last unicode character supported by python
59 ## 0x10ffff for wide (32-bit unicode) python builds
60 ## 0x00ffff for narrow (16-bit unicode) python builds
61 ## We will not autodetect. 0xffff is safe enough.
70 def strprevling(prefix):
71 """Return an approximation of the last unicode string
72 less than but not starting with given prefix.
73 strprevling(u'hello') -> u'helln\\xffff'
76 ## There is no prevling for the null string
81 s += unichr(c-1) + unichr(0xffff)
100 class Node(DBWorker):
101 """Nodes store path organization and have multiple versions.
102 Versions store object history and have multiple attributes.
103 Attributes store metadata.
106 # TODO: Provide an interface for included and excluded clusters.
108 def __init__(self, **params):
109 DBWorker.__init__(self, **params)
110 execute = self.execute
112 execute(""" pragma foreign_keys = on """)
114 execute(""" create table if not exists nodes
115 ( node integer primary key,
116 parent integer default 0,
117 path text not null default '',
118 latest_version integer,
120 references nodes(node)
122 on delete cascade ) """)
123 execute(""" create unique index if not exists idx_nodes_path
125 execute(""" create index if not exists idx_nodes_parent
126 on nodes(parent) """)
128 execute(""" create table if not exists policy
132 primary key (node, key)
134 references nodes(node)
136 on delete cascade ) """)
138 execute(""" create table if not exists statistics
140 population integer not null default 0,
141 size integer not null default 0,
143 cluster integer not null default 0,
144 primary key (node, cluster)
146 references nodes(node)
148 on delete cascade ) """)
150 execute(""" create table if not exists versions
151 ( serial integer primary key,
154 size integer not null default 0,
155 type text not null default '',
158 muser text not null default '',
159 uuid text not null default '',
160 checksum text not null default '',
161 cluster integer not null default 0,
163 references nodes(node)
165 on delete cascade ) """)
166 execute(""" create index if not exists idx_versions_node_mtime
167 on versions(node, mtime) """)
168 execute(""" create index if not exists idx_versions_node_uuid
169 on versions(uuid) """)
170 execute(""" create index if not exists idx_versions_serial_cluster
171 on versions(serial, cluster) """)
173 execute(""" create table if not exists attributes
178 primary key (serial, domain, key)
180 references versions(serial)
182 on delete cascade ) """)
184 q = "insert or ignore into nodes(node, parent) values (?, ?)"
185 execute(q, (ROOTNODE, ROOTNODE))
187 def node_create(self, parent, path):
188 """Create a new node from the given properties.
189 Return the node identifier of the new node.
192 q = ("insert into nodes (parent, path) "
194 props = (parent, path)
195 return self.execute(q, props).lastrowid
197 def node_lookup(self, path):
198 """Lookup the current node of the given path.
199 Return None if the path is not found.
202 q = "select node from nodes where path = ?"
203 self.execute(q, (path,))
209 def node_lookup_bulk(self, paths):
210 """Lookup the current nodes for the given paths.
211 Return () if the path is not found.
214 placeholders = ','.join('?' for path in paths)
215 q = "select node from nodes where path in (%s)" % placeholders
216 self.execute(q, paths)
219 return [row[0] for row in r]
222 def node_get_properties(self, node):
223 """Return the node's (parent, path).
224 Return None if the node is not found.
227 q = "select parent, path from nodes where node = ?"
228 self.execute(q, (node,))
229 return self.fetchone()
231 def node_get_versions(self, node, keys=(), propnames=_propnames):
232 """Return the properties of all versions at node.
233 If keys is empty, return all properties in the order
234 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
237 q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
240 self.execute(q, (node,))
247 return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
249 def node_count_children(self, node):
250 """Return node's child count."""
252 q = "select count(node) from nodes where parent = ? and node != 0"
253 self.execute(q, (node,))
259 def node_purge_children(self, parent, before=inf, cluster=0):
260 """Delete all versions with the specified
261 parent and cluster, and return
262 the hashes and size of versions deleted.
263 Clears out nodes with no remaining versions.
266 execute = self.execute
267 q = ("select count(serial), sum(size) from versions "
268 "where node in (select node "
273 args = (parent, cluster, before)
275 nr, size = self.fetchone()
279 self.statistics_update(parent, -nr, -size, mtime, cluster)
280 self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
282 q = ("select hash from versions "
283 "where node in (select node "
289 hashes = [r[0] for r in self.fetchall()]
290 q = ("delete from versions "
291 "where node in (select node "
297 q = ("delete from nodes "
298 "where node in (select node from nodes n "
299 "where (select count(serial) "
301 "where node = n.node) = 0 "
303 execute(q, (parent,))
306 def node_purge(self, node, before=inf, cluster=0):
307 """Delete all versions with the specified
308 node and cluster, and return
309 the hashes and size of versions deleted.
310 Clears out the node if it has no remaining versions.
313 execute = self.execute
314 q = ("select count(serial), sum(size) from versions "
318 args = (node, cluster, before)
320 nr, size = self.fetchone()
324 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
326 q = ("select hash from versions "
331 hashes = [r[0] for r in self.fetchall()]
332 q = ("delete from versions "
337 q = ("delete from nodes "
338 "where node in (select node from nodes n "
339 "where (select count(serial) "
341 "where node = n.node) = 0 "
346 def node_remove(self, node):
347 """Remove the node specified.
348 Return false if the node has children or is not found.
351 if self.node_count_children(node):
355 q = ("select count(serial), sum(size), cluster "
359 self.execute(q, (node,))
360 for population, size, cluster in self.fetchall():
361 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
363 q = "delete from nodes where node = ?"
364 self.execute(q, (node,))
367 def policy_get(self, node):
368 q = "select key, value from policy where node = ?"
369 self.execute(q, (node,))
370 return dict(self.fetchall())
372 def policy_set(self, node, policy):
373 q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
374 self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
376 def statistics_get(self, node, cluster=0):
377 """Return population, total size and last mtime
378 for all versions under node that belong to the cluster.
381 q = ("select population, size, mtime from statistics "
382 "where node = ? and cluster = ?")
383 self.execute(q, (node, cluster))
384 return self.fetchone()
386 def statistics_update(self, node, population, size, mtime, cluster=0):
387 """Update the statistics of the given node.
388 Statistics keep track the population, total
389 size of objects and mtime in the node's namespace.
390 May be zero or positive or negative numbers.
393 qs = ("select population, size from statistics "
394 "where node = ? and cluster = ?")
395 qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
396 "values (?, ?, ?, ?, ?)")
397 self.execute(qs, (node, cluster))
400 prepopulation, presize = (0, 0)
402 prepopulation, presize = r
403 population += prepopulation
405 self.execute(qu, (node, population, size, mtime, cluster))
407 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
408 """Update the statistics of the given node's parent.
409 Then recursively update all parents up to the root.
410 Population is not recursive.
416 props = self.node_get_properties(node)
420 self.statistics_update(parent, population, size, mtime, cluster)
422 population = 0 # Population isn't recursive
424 def statistics_latest(self, node, before=inf, except_cluster=0):
425 """Return population, total size and last mtime
426 for all latest versions under node that
427 do not belong to the cluster.
430 execute = self.execute
431 fetchone = self.fetchone
434 props = self.node_get_properties(node)
439 # The latest version.
440 q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
444 subq, args = self._construct_latest_version_subquery(node=node, before=before)
445 execute(q % subq, args + [except_cluster])
451 # First level, just under node (get population).
452 q = ("select count(serial), sum(size), max(mtime) "
456 "and node in (select node "
459 subq, args = self._construct_latest_version_subquery(node=None, before=before)
460 execute(q % subq, args + [except_cluster, node])
465 mtime = max(mtime, r[2])
469 # All children (get size and mtime).
470 # This is why the full path is stored.
471 q = ("select count(serial), sum(size), max(mtime) "
475 "and node in (select node "
477 "where path like ? escape '\\')")
478 subq, args = self._construct_latest_version_subquery(node=None, before=before)
479 execute(q % subq, args + [except_cluster, self.escape_like(path) + '%'])
483 size = r[1] - props[SIZE]
484 mtime = max(mtime, r[2])
485 return (count, size, mtime)
487 def nodes_set_latest_version(self, node, serial):
488 q = ("update nodes set latest_version = ? where node = ?")
489 props = (serial, node)
490 self.execute(q, props)
492 def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
493 """Create a new version from the given properties.
494 Return the (serial, mtime) of the new version.
497 q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
498 "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
500 props = (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
501 serial = self.execute(q, props).lastrowid
502 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
504 self.nodes_set_latest_version(node, serial)
508 def version_lookup(self, node, before=inf, cluster=0, all_props=True):
509 """Lookup the current version of the given node.
510 Return a list with its properties:
511 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
512 or None if the current version is not found in the given cluster.
519 subq, args = self._construct_latest_version_subquery(node=node, before=before)
521 q = q % ("serial", subq)
523 q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
525 self.execute(q, args + [cluster])
526 props = self.fetchone()
527 if props is not None:
531 def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
532 """Lookup the current versions of the given nodes.
533 Return a list with their properties:
534 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
541 "where serial in %s "
542 "and cluster = ? %s")
543 subq, args = self._construct_latest_versions_subquery(nodes=nodes, before = before)
545 q = q % ("serial", subq, '')
547 q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq, 'order by node')
550 self.execute(q, args)
551 return self.fetchall()
553 def version_get_properties(self, serial, keys=(), propnames=_propnames):
554 """Return a sequence of values for the properties of
555 the version specified by serial and the keys, in the order given.
556 If keys is empty, return all properties in the order
557 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
560 q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
563 self.execute(q, (serial,))
570 return [r[propnames[k]] for k in keys if k in propnames]
572 def version_put_property(self, serial, key, value):
573 """Set value for the property of version specified by key."""
575 if key not in _propnames:
577 q = "update versions set %s = ? where serial = ?" % key
578 self.execute(q, (value, serial))
580 def version_recluster(self, serial, cluster):
581 """Move the version into another cluster."""
583 props = self.version_get_properties(serial)
588 oldcluster = props[CLUSTER]
589 if cluster == oldcluster:
593 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
594 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
596 q = "update versions set cluster = ? where serial = ?"
597 self.execute(q, (cluster, serial))
599 def version_remove(self, serial):
600 """Remove the serial specified."""
602 props = self.version_get_properties(serial)
608 cluster = props[CLUSTER]
611 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
613 q = "delete from versions where serial = ?"
614 self.execute(q, (serial,))
616 props = self.version_lookup(node, cluster=cluster, all_props=False)
618 self.nodes_set_latest_version(node, props[0])
621 def attribute_get(self, serial, domain, keys=()):
622 """Return a list of (key, value) pairs of the version specified by serial.
623 If keys is empty, return all attributes.
624 Othwerise, return only those specified.
627 execute = self.execute
629 marks = ','.join('?' for k in keys)
630 q = ("select key, value from attributes "
631 "where key in (%s) and serial = ? and domain = ?" % (marks,))
632 execute(q, keys + (serial, domain))
634 q = "select key, value from attributes where serial = ? and domain = ?"
635 execute(q, (serial, domain))
636 return self.fetchall()
638 def attribute_set(self, serial, domain, items):
639 """Set the attributes of the version specified by serial.
640 Receive attributes as an iterable of (key, value) pairs.
643 q = ("insert or replace into attributes (serial, domain, key, value) "
644 "values (?, ?, ?, ?)")
645 self.executemany(q, ((serial, domain, k, v) for k, v in items))
647 def attribute_del(self, serial, domain, keys=()):
648 """Delete attributes of the version specified by serial.
649 If keys is empty, delete all attributes.
650 Otherwise delete those specified.
654 q = "delete from attributes where serial = ? and domain = ? and key = ?"
655 self.executemany(q, ((serial, domain, key) for key in keys))
657 q = "delete from attributes where serial = ? and domain = ?"
658 self.execute(q, (serial, domain))
660 def attribute_copy(self, source, dest):
661 q = ("insert or replace into attributes "
662 "select ?, domain, key, value from attributes "
664 self.execute(q, (dest, source))
666 def _construct_filters(self, domain, filterq):
667 if not domain or not filterq:
671 append = subqlist.append
672 included, excluded, opers = parse_filters(filterq)
676 subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
677 subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
684 subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
685 subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
692 for k, o, v in opers:
693 subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
694 subq += "key = ? and value %s ?" % (o,)
696 args += [domain, k, v]
702 subq = ' and ' + ' and '.join(subqlist)
706 def _construct_paths(self, pathq):
712 for path, match in pathq:
713 if match == MATCH_PREFIX:
714 subqlist.append("n.path like ? escape '\\'")
715 args.append(self.escape_like(path) + '%')
716 elif match == MATCH_EXACT:
717 subqlist.append("n.path = ?")
720 subq = ' and (' + ' or '.join(subqlist) + ')'
725 def _construct_size(self, sizeq):
726 if not sizeq or len(sizeq) != 2:
732 subq += " and v.size >= ?"
735 subq += " and v.size < ?"
740 def _construct_versions_nodes_latest_version_subquery(self, before=inf):
742 q = ("n.latest_version ")
745 q = ("(select max(serial) "
747 "where node = v.node and mtime < ?) ")
751 def _construct_latest_version_subquery(self, node=None, before=inf):
752 where_cond = "node = v.node"
755 where_cond = "node = ? "
759 q = ("(select latest_version "
763 q = ("(select max(serial) "
765 "where %s and mtime < ?) ")
767 return q % where_cond, args
769 def _construct_latest_versions_subquery(self, nodes=(), before=inf):
773 where_cond = "node in (%s) " % ','.join('?' for node in nodes)
777 q = ("(select latest_version "
781 q = ("(select max(serial) "
783 "where %s and mtime < ? group by node) ")
785 return q % where_cond, args
787 def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
788 """Return a list with all keys pairs defined
789 for all latest versions under parent that
790 do not belong to the cluster.
793 # TODO: Use another table to store before=inf results.
794 q = ("select distinct a.key "
795 "from attributes a, versions v, nodes n "
796 "where v.serial = %s "
797 "and v.cluster != ? "
798 "and v.node in (select node "
801 "and a.serial = v.serial "
803 "and n.node = v.node")
804 subq, subargs = self._construct_latest_version_subquery(node=None, before=before)
805 args = subargs + [except_cluster, parent, domain]
807 subq, subargs = self._construct_paths(pathq)
811 self.execute(q, args)
812 return [r[0] for r in self.fetchall()]
814 def latest_version_list(self, parent, prefix='', delimiter=None,
815 start='', limit=10000, before=inf,
816 except_cluster=0, pathq=[], domain=None,
817 filterq=[], sizeq=None, all_props=False):
818 """Return a (list of (path, serial) tuples, list of common prefixes)
819 for the current versions of the paths with the given parent,
820 matching the following criteria.
822 The property tuple for a version is returned if all
823 of these conditions are true:
829 c. path starts with prefix (and paths in pathq)
831 d. version is the max up to before
833 e. version is not in cluster
835 f. the path does not have the delimiter occuring
836 after the prefix, or ends with the delimiter
838 g. serial matches the attribute filter query.
840 A filter query is a comma-separated list of
841 terms in one of these three forms:
844 an attribute with this key must exist
847 an attribute with this key must not exist
850 the attribute with this key satisfies the value
851 where ?op is one of =, != <=, >=, <, >.
853 h. the size is in the range set by sizeq
855 The list of common prefixes includes the prefixes
856 matching up to the first delimiter after prefix,
857 and are reported only once, as "virtual directories".
858 The delimiter is included in the prefixes.
860 If arguments are None, then the corresponding matching rule
863 Limit applies to the first list of tuples returned.
865 If all_props is True, return all properties after path, not just serial.
868 execute = self.execute
870 if not start or start < prefix:
871 start = strprevling(prefix)
872 nextling = strnextling(prefix)
874 q = ("select distinct n.path, %s "
875 "from versions v, nodes n "
876 "where v.serial = %s "
877 "and v.cluster != ? "
878 "and v.node in (select node "
881 "and n.node = v.node "
882 "and n.path > ? and n.path < ?")
883 subq, args = self._construct_versions_nodes_latest_version_subquery(before)
885 q = q % ("v.serial", subq)
887 q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
888 args += [except_cluster, parent, start, nextling]
889 start_index = len(args) - 2
891 subq, subargs = self._construct_paths(pathq)
895 subq, subargs = self._construct_size(sizeq)
899 subq, subargs = self._construct_filters(domain, filterq)
904 q = q.replace("attributes a, ", "")
905 q = q.replace("and a.serial = v.serial ", "")
906 q += " order by n.path"
912 return self.fetchall(), ()
917 fetchone = self.fetchone
919 pappend = prefixes.append
921 mappend = matches.append
930 idx = path.find(delimiter, pfz)
939 if idx + dz == len(path):
942 continue # Get one more, in case there is a path.
948 args[start_index] = strnextling(pf) # New start.
951 return matches, prefixes
953 def latest_uuid(self, uuid):
954 """Return a (path, serial) tuple, for the latest version of the given uuid."""
956 q = ("select n.path, v.serial "
957 "from versions v, nodes n "
958 "where v.serial = (select max(serial) "
961 "and n.node = v.node")
962 self.execute(q, (uuid,))
963 return self.fetchone()