1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 from dbworker import DBWorker
38 from pithos.backends.filter import parse_filters
43 ( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
45 ( MATCH_PREFIX, MATCH_EXACT ) = range(2)
50 def strnextling(prefix):
51 """Return the first unicode string
52 greater than but not starting with given prefix.
53 strnextling('hello') -> 'hellp'
56 ## all strings start with the null string,
57 ## therefore we have to approximate strnextling('')
58 ## with the last unicode character supported by python
59 ## 0x10ffff for wide (32-bit unicode) python builds
60 ## 0x00ffff for narrow (16-bit unicode) python builds
61 ## We will not autodetect. 0xffff is safe enough.
70 def strprevling(prefix):
71 """Return an approximation of the last unicode string
72 less than but not starting with given prefix.
73 strprevling(u'hello') -> u'helln\\xffff'
76 ## There is no prevling for the null string
81 s += unichr(c-1) + unichr(0xffff)
100 class Node(DBWorker):
101 """Nodes store path organization and have multiple versions.
102 Versions store object history and have multiple attributes.
103 Attributes store metadata.
106 # TODO: Provide an interface for included and excluded clusters.
108 def __init__(self, **params):
109 DBWorker.__init__(self, **params)
110 execute = self.execute
112 execute(""" pragma foreign_keys = on """)
114 execute(""" create table if not exists nodes
115 ( node integer primary key,
116 parent integer default 0,
117 path text not null default '',
119 references nodes(node)
121 on delete cascade ) """)
122 execute(""" create unique index if not exists idx_nodes_path
125 execute(""" create table if not exists policy
129 primary key (node, key)
131 references nodes(node)
133 on delete cascade ) """)
135 execute(""" create table if not exists statistics
137 population integer not null default 0,
138 size integer not null default 0,
140 cluster integer not null default 0,
141 primary key (node, cluster)
143 references nodes(node)
145 on delete cascade ) """)
147 execute(""" create table if not exists versions
148 ( serial integer primary key,
151 size integer not null default 0,
152 type text not null default '',
155 muser text not null default '',
156 uuid text not null default '',
157 checksum text not null default '',
158 cluster integer not null default 0,
160 references nodes(node)
162 on delete cascade ) """)
163 execute(""" create index if not exists idx_versions_node_mtime
164 on versions(node, mtime) """)
165 execute(""" create index if not exists idx_versions_node_uuid
166 on versions(uuid) """)
168 execute(""" create table if not exists attributes
173 primary key (serial, domain, key)
175 references versions(serial)
177 on delete cascade ) """)
179 q = "insert or ignore into nodes(node, parent) values (?, ?)"
180 execute(q, (ROOTNODE, ROOTNODE))
182 def node_create(self, parent, path):
183 """Create a new node from the given properties.
184 Return the node identifier of the new node.
187 q = ("insert into nodes (parent, path) "
189 props = (parent, path)
190 return self.execute(q, props).lastrowid
192 def node_lookup(self, path):
193 """Lookup the current node of the given path.
194 Return None if the path is not found.
197 q = "select node from nodes where path = ?"
198 self.execute(q, (path,))
204 def node_get_properties(self, node):
205 """Return the node's (parent, path).
206 Return None if the node is not found.
209 q = "select parent, path from nodes where node = ?"
210 self.execute(q, (node,))
211 return self.fetchone()
213 def node_get_versions(self, node, keys=(), propnames=_propnames):
214 """Return the properties of all versions at node.
215 If keys is empty, return all properties in the order
216 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
219 q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
222 self.execute(q, (node,))
229 return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
231 def node_count_children(self, node):
232 """Return node's child count."""
234 q = "select count(node) from nodes where parent = ? and node != 0"
235 self.execute(q, (node,))
241 def node_purge_children(self, parent, before=inf, cluster=0):
242 """Delete all versions with the specified
243 parent and cluster, and return
244 the hashes and size of versions deleted.
245 Clears out nodes with no remaining versions.
248 execute = self.execute
249 q = ("select count(serial), sum(size) from versions "
250 "where node in (select node "
255 args = (parent, cluster, before)
257 nr, size = self.fetchone()
261 self.statistics_update(parent, -nr, -size, mtime, cluster)
262 self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
264 q = ("select hash from versions "
265 "where node in (select node "
271 hashes = [r[0] for r in self.fetchall()]
272 q = ("delete from versions "
273 "where node in (select node "
279 q = ("delete from nodes "
280 "where node in (select node from nodes n "
281 "where (select count(serial) "
283 "where node = n.node) = 0 "
285 execute(q, (parent,))
288 def node_purge(self, node, before=inf, cluster=0):
289 """Delete all versions with the specified
290 node and cluster, and return
291 the hashes and size of versions deleted.
292 Clears out the node if it has no remaining versions.
295 execute = self.execute
296 q = ("select count(serial), sum(size) from versions "
300 args = (node, cluster, before)
302 nr, size = self.fetchone()
306 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
308 q = ("select hash from versions "
313 hashes = [r[0] for r in self.fetchall()]
314 q = ("delete from versions "
319 q = ("delete from nodes "
320 "where node in (select node from nodes n "
321 "where (select count(serial) "
323 "where node = n.node) = 0 "
328 def node_remove(self, node):
329 """Remove the node specified.
330 Return false if the node has children or is not found.
333 if self.node_count_children(node):
337 q = ("select count(serial), sum(size), cluster "
341 self.execute(q, (node,))
342 for population, size, cluster in self.fetchall():
343 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
345 q = "delete from nodes where node = ?"
346 self.execute(q, (node,))
349 def policy_get(self, node):
350 q = "select key, value from policy where node = ?"
351 self.execute(q, (node,))
352 return dict(self.fetchall())
354 def policy_set(self, node, policy):
355 q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
356 self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
358 def statistics_get(self, node, cluster=0):
359 """Return population, total size and last mtime
360 for all versions under node that belong to the cluster.
363 q = ("select population, size, mtime from statistics "
364 "where node = ? and cluster = ?")
365 self.execute(q, (node, cluster))
366 return self.fetchone()
368 def statistics_update(self, node, population, size, mtime, cluster=0):
369 """Update the statistics of the given node.
370 Statistics keep track the population, total
371 size of objects and mtime in the node's namespace.
372 May be zero or positive or negative numbers.
375 qs = ("select population, size from statistics "
376 "where node = ? and cluster = ?")
377 qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
378 "values (?, ?, ?, ?, ?)")
379 self.execute(qs, (node, cluster))
382 prepopulation, presize = (0, 0)
384 prepopulation, presize = r
385 population += prepopulation
387 self.execute(qu, (node, population, size, mtime, cluster))
389 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
390 """Update the statistics of the given node's parent.
391 Then recursively update all parents up to the root.
392 Population is not recursive.
398 props = self.node_get_properties(node)
402 self.statistics_update(parent, population, size, mtime, cluster)
404 population = 0 # Population isn't recursive
406 def statistics_latest(self, node, before=inf, except_cluster=0):
407 """Return population, total size and last mtime
408 for all latest versions under node that
409 do not belong to the cluster.
412 execute = self.execute
413 fetchone = self.fetchone
416 props = self.node_get_properties(node)
421 # The latest version.
422 q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
424 "where serial = (select max(serial) "
426 "where node = ? and mtime < ?) "
428 execute(q, (node, before, except_cluster))
434 # First level, just under node (get population).
435 q = ("select count(serial), sum(size), max(mtime) "
437 "where serial = (select max(serial) "
439 "where node = v.node and mtime < ?) "
441 "and node in (select node "
444 execute(q, (before, except_cluster, node))
449 mtime = max(mtime, r[2])
453 # All children (get size and mtime).
454 # This is why the full path is stored.
455 q = ("select count(serial), sum(size), max(mtime) "
457 "where serial = (select max(serial) "
459 "where node = v.node and mtime < ?) "
461 "and node in (select node "
463 "where path like ? escape '\\')")
464 execute(q, (before, except_cluster, self.escape_like(path) + '%'))
468 size = r[1] - props[SIZE]
469 mtime = max(mtime, r[2])
470 return (count, size, mtime)
472 def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
473 """Create a new version from the given properties.
474 Return the (serial, mtime) of the new version.
477 q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
478 "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
480 props = (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
481 serial = self.execute(q, props).lastrowid
482 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
485 def version_lookup(self, node, before=inf, cluster=0):
486 """Lookup the current version of the given node.
487 Return a list with its properties:
488 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
489 or None if the current version is not found in the given cluster.
492 q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
494 "where serial = (select max(serial) "
496 "where node = ? and mtime < ?) "
498 self.execute(q, (node, before, cluster))
499 props = self.fetchone()
500 if props is not None:
504 def version_get_properties(self, serial, keys=(), propnames=_propnames):
505 """Return a sequence of values for the properties of
506 the version specified by serial and the keys, in the order given.
507 If keys is empty, return all properties in the order
508 (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
511 q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
514 self.execute(q, (serial,))
521 return [r[propnames[k]] for k in keys if k in propnames]
523 def version_put_property(self, serial, key, value):
524 """Set value for the property of version specified by key."""
526 if key not in _propnames:
528 q = "update versions set %s = ? where serial = ?" % key
529 self.execute(q, (value, serial))
531 def version_recluster(self, serial, cluster):
532 """Move the version into another cluster."""
534 props = self.version_get_properties(serial)
539 oldcluster = props[CLUSTER]
540 if cluster == oldcluster:
544 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
545 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
547 q = "update versions set cluster = ? where serial = ?"
548 self.execute(q, (cluster, serial))
550 def version_remove(self, serial):
551 """Remove the serial specified."""
553 props = self.version_get_properties(serial)
559 cluster = props[CLUSTER]
562 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
564 q = "delete from versions where serial = ?"
565 self.execute(q, (serial,))
568 def attribute_get(self, serial, domain, keys=()):
569 """Return a list of (key, value) pairs of the version specified by serial.
570 If keys is empty, return all attributes.
571 Othwerise, return only those specified.
574 execute = self.execute
576 marks = ','.join('?' for k in keys)
577 q = ("select key, value from attributes "
578 "where key in (%s) and serial = ? and domain = ?" % (marks,))
579 execute(q, keys + (serial, domain))
581 q = "select key, value from attributes where serial = ? and domain = ?"
582 execute(q, (serial, domain))
583 return self.fetchall()
585 def attribute_set(self, serial, domain, items):
586 """Set the attributes of the version specified by serial.
587 Receive attributes as an iterable of (key, value) pairs.
590 q = ("insert or replace into attributes (serial, domain, key, value) "
591 "values (?, ?, ?, ?)")
592 self.executemany(q, ((serial, domain, k, v) for k, v in items))
594 def attribute_del(self, serial, domain, keys=()):
595 """Delete attributes of the version specified by serial.
596 If keys is empty, delete all attributes.
597 Otherwise delete those specified.
601 q = "delete from attributes where serial = ? and domain = ? and key = ?"
602 self.executemany(q, ((serial, domain, key) for key in keys))
604 q = "delete from attributes where serial = ? and domain = ?"
605 self.execute(q, (serial, domain))
607 def attribute_copy(self, source, dest):
608 q = ("insert or replace into attributes "
609 "select ?, domain, key, value from attributes "
611 self.execute(q, (dest, source))
613 def _construct_filters(self, domain, filterq):
614 if not domain or not filterq:
618 append = subqlist.append
619 included, excluded, opers = parse_filters(filterq)
623 subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
624 subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
631 subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
632 subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
639 for k, o, v in opers:
640 subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
641 subq += "key = ? and value %s ?" % (o,)
643 args += [domain, k, v]
649 subq = ' and ' + ' and '.join(subqlist)
653 def _construct_paths(self, pathq):
660 for path, match in pathq:
661 if match == MATCH_PREFIX:
662 subqlist.append("n.path like ? escape '\\'")
663 args.append(self.escape_like(path) + '%')
664 elif match == MATCH_EXACT:
665 subqlist.append("n.path = ?")
668 subq = ' and (' + ' or '.join(subqlist) + ')'
673 def _construct_size(self, sizeq):
674 if not sizeq or len(sizeq) != 2:
680 subq += " and v.size >= ?"
683 subq += " and v.size < ?"
688 def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
689 """Return a list with all keys pairs defined
690 for all latest versions under parent that
691 do not belong to the cluster.
694 # TODO: Use another table to store before=inf results.
695 q = ("select distinct a.key "
696 "from attributes a, versions v, nodes n "
697 "where v.serial = (select max(serial) "
699 "where node = v.node and mtime < ?) "
700 "and v.cluster != ? "
701 "and v.node in (select node "
704 "and a.serial = v.serial "
706 "and n.node = v.node")
707 args = (before, except_cluster, parent, domain)
708 subq, subargs = self._construct_paths(pathq)
712 self.execute(q, args)
713 return [r[0] for r in self.fetchall()]
715 def latest_version_list(self, parent, prefix='', delimiter=None,
716 start='', limit=10000, before=inf,
717 except_cluster=0, pathq=[], domain=None,
718 filterq=[], sizeq=None, all_props=False):
719 """Return a (list of (path, serial) tuples, list of common prefixes)
720 for the current versions of the paths with the given parent,
721 matching the following criteria.
723 The property tuple for a version is returned if all
724 of these conditions are true:
730 c. path starts with prefix (and paths in pathq)
732 d. version is the max up to before
734 e. version is not in cluster
736 f. the path does not have the delimiter occuring
737 after the prefix, or ends with the delimiter
739 g. serial matches the attribute filter query.
741 A filter query is a comma-separated list of
742 terms in one of these three forms:
745 an attribute with this key must exist
748 an attribute with this key must not exist
751 the attribute with this key satisfies the value
752 where ?op is one of =, != <=, >=, <, >.
754 h. the size is in the range set by sizeq
756 The list of common prefixes includes the prefixes
757 matching up to the first delimiter after prefix,
758 and are reported only once, as "virtual directories".
759 The delimiter is included in the prefixes.
761 If arguments are None, then the corresponding matching rule
764 Limit applies to the first list of tuples returned.
766 If all_props is True, return all properties after path, not just serial.
769 execute = self.execute
771 if not start or start < prefix:
772 start = strprevling(prefix)
773 nextling = strnextling(prefix)
775 q = ("select distinct n.path, %s "
776 "from versions v, nodes n "
777 "where v.serial = (select max(serial) "
779 "where node = v.node and mtime < ?) "
780 "and v.cluster != ? "
781 "and v.node in (select node "
784 "and n.node = v.node "
785 "and n.path > ? and n.path < ?")
789 q = q % "v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster"
790 args = [before, except_cluster, parent, start, nextling]
792 subq, subargs = self._construct_paths(pathq)
796 subq, subargs = self._construct_size(sizeq)
800 subq, subargs = self._construct_filters(domain, filterq)
805 q = q.replace("attributes a, ", "")
806 q = q.replace("and a.serial = v.serial ", "")
807 q += " order by n.path"
813 return self.fetchall(), ()
818 fetchone = self.fetchone
820 pappend = prefixes.append
822 mappend = matches.append
831 idx = path.find(delimiter, pfz)
840 if idx + dz == len(path):
843 continue # Get one more, in case there is a path.
849 args[3] = strnextling(pf) # New start.
852 return matches, prefixes
854 def latest_uuid(self, uuid):
855 """Return a (path, serial) tuple, for the latest version of the given uuid."""
857 q = ("select n.path, v.serial "
858 "from versions v, nodes n "
859 "where v.serial = (select max(serial) "
862 "and n.node = v.node")
863 self.execute(q, (uuid,))
864 return self.fetchone()