1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 from dbworker import DBWorker
38 from pithos.lib.filter import parse_filters
43 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
48 def strnextling(prefix):
49 """Return the first unicode string
50 greater than but not starting with given prefix.
51 strnextling('hello') -> 'hellp'
54 ## all strings start with the null string,
55 ## therefore we have to approximate strnextling('')
56 ## with the last unicode character supported by python
57 ## 0x10ffff for wide (32-bit unicode) python builds
58 ## 0x00ffff for narrow (16-bit unicode) python builds
59 ## We will not autodetect. 0xffff is safe enough.
68 def strprevling(prefix):
69 """Return an approximation of the last unicode string
70 less than but not starting with given prefix.
71 strprevling(u'hello') -> u'helln\\xffff'
74 ## There is no prevling for the null string
79 s += unichr(c-1) + unichr(0xffff)
97 """Nodes store path organization and have multiple versions.
98 Versions store object history and have multiple attributes.
99 Attributes store metadata.
102 # TODO: Provide an interface for included and excluded clusters.
104 def __init__(self, **params):
105 DBWorker.__init__(self, **params)
106 execute = self.execute
108 execute(""" pragma foreign_keys = on """)
110 execute(""" create table if not exists nodes
111 ( node integer primary key,
112 parent integer default 0,
113 path text not null default '',
115 references nodes(node)
117 on delete cascade ) """)
118 execute(""" create unique index if not exists idx_nodes_path
121 execute(""" create table if not exists policy
125 primary key (node, key)
127 references nodes(node)
129 on delete cascade ) """)
131 execute(""" create table if not exists statistics
133 population integer not null default 0,
134 size integer not null default 0,
136 cluster integer not null default 0,
137 primary key (node, cluster)
139 references nodes(node)
141 on delete cascade ) """)
143 execute(""" create table if not exists versions
144 ( serial integer primary key,
147 size integer not null default 0,
150 muser text not null default '',
151 uuid text not null default '',
152 cluster integer not null default 0,
154 references nodes(node)
156 on delete cascade ) """)
157 execute(""" create index if not exists idx_versions_node_mtime
158 on versions(node, mtime) """)
159 execute(""" create index if not exists idx_versions_node_uuid
160 on versions(uuid) """)
162 execute(""" create table if not exists attributes
167 primary key (serial, domain, key)
169 references versions(serial)
171 on delete cascade ) """)
173 q = "insert or ignore into nodes(node, parent) values (?, ?)"
174 execute(q, (ROOTNODE, ROOTNODE))
176 def node_create(self, parent, path):
177 """Create a new node from the given properties.
178 Return the node identifier of the new node.
181 q = ("insert into nodes (parent, path) "
183 props = (parent, path)
184 return self.execute(q, props).lastrowid
186 def node_lookup(self, path):
187 """Lookup the current node of the given path.
188 Return None if the path is not found.
191 q = "select node from nodes where path = ?"
192 self.execute(q, (path,))
198 def node_get_properties(self, node):
199 """Return the node's (parent, path).
200 Return None if the node is not found.
203 q = "select parent, path from nodes where node = ?"
204 self.execute(q, (node,))
205 return self.fetchone()
207 def node_get_versions(self, node, keys=(), propnames=_propnames):
208 """Return the properties of all versions at node.
209 If keys is empty, return all properties in the order
210 (serial, node, hash, size, source, mtime, muser, uuid, cluster).
213 q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
216 self.execute(q, (node,))
223 return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
225 def node_count_children(self, node):
226 """Return node's child count."""
228 q = "select count(node) from nodes where parent = ? and node != 0"
229 self.execute(q, (node,))
235 def node_purge_children(self, parent, before=inf, cluster=0):
236 """Delete all versions with the specified
237 parent and cluster, and return
238 the hashes of versions deleted.
239 Clears out nodes with no remaining versions.
242 execute = self.execute
243 q = ("select count(serial), sum(size) from versions "
244 "where node in (select node "
249 args = (parent, cluster, before)
251 nr, size = self.fetchone()
255 self.statistics_update(parent, -nr, -size, mtime, cluster)
256 self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
258 q = ("select hash from versions "
259 "where node in (select node "
265 hashes = [r[0] for r in self.fetchall()]
266 q = ("delete from versions "
267 "where node in (select node "
273 q = ("delete from nodes "
274 "where node in (select node from nodes n "
275 "where (select count(serial) "
277 "where node = n.node) = 0 "
279 execute(q, (parent,))
282 def node_purge(self, node, before=inf, cluster=0):
283 """Delete all versions with the specified
284 node and cluster, and return
285 the hashes of versions deleted.
286 Clears out the node if it has no remaining versions.
289 execute = self.execute
290 q = ("select count(serial), sum(size) from versions "
294 args = (node, cluster, before)
296 nr, size = self.fetchone()
300 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
302 q = ("select hash from versions "
307 hashes = [r[0] for r in self.fetchall()]
308 q = ("delete from versions "
313 q = ("delete from nodes "
314 "where node in (select node from nodes n "
315 "where (select count(serial) "
317 "where node = n.node) = 0 "
322 def node_remove(self, node):
323 """Remove the node specified.
324 Return false if the node has children or is not found.
327 if self.node_count_children(node):
331 q = ("select count(serial), sum(size), cluster "
335 self.execute(q, (node,))
336 for population, size, cluster in self.fetchall():
337 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
339 q = "delete from nodes where node = ?"
340 self.execute(q, (node,))
343 def policy_get(self, node):
344 q = "select key, value from policy where node = ?"
345 self.execute(q, (node,))
346 return dict(self.fetchall())
348 def policy_set(self, node, policy):
349 q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
350 self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
352 def statistics_get(self, node, cluster=0):
353 """Return population, total size and last mtime
354 for all versions under node that belong to the cluster.
357 q = ("select population, size, mtime from statistics "
358 "where node = ? and cluster = ?")
359 self.execute(q, (node, cluster))
360 return self.fetchone()
362 def statistics_update(self, node, population, size, mtime, cluster=0):
363 """Update the statistics of the given node.
364 Statistics keep track the population, total
365 size of objects and mtime in the node's namespace.
366 May be zero or positive or negative numbers.
369 qs = ("select population, size from statistics "
370 "where node = ? and cluster = ?")
371 qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
372 "values (?, ?, ?, ?, ?)")
373 self.execute(qs, (node, cluster))
376 prepopulation, presize = (0, 0)
378 prepopulation, presize = r
379 population += prepopulation
381 self.execute(qu, (node, population, size, mtime, cluster))
383 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
384 """Update the statistics of the given node's parent.
385 Then recursively update all parents up to the root.
386 Population is not recursive.
392 props = self.node_get_properties(node)
396 self.statistics_update(parent, population, size, mtime, cluster)
398 population = 0 # Population isn't recursive
400 def statistics_latest(self, node, before=inf, except_cluster=0):
401 """Return population, total size and last mtime
402 for all latest versions under node that
403 do not belong to the cluster.
406 execute = self.execute
407 fetchone = self.fetchone
410 props = self.node_get_properties(node)
415 # The latest version.
416 q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
418 "where serial = (select max(serial) "
420 "where node = ? and mtime < ?) "
422 execute(q, (node, before, except_cluster))
428 # First level, just under node (get population).
429 q = ("select count(serial), sum(size), max(mtime) "
431 "where serial = (select max(serial) "
433 "where node = v.node and mtime < ?) "
435 "and node in (select node "
438 execute(q, (before, except_cluster, node))
443 mtime = max(mtime, r[2])
447 # All children (get size and mtime).
448 # This is why the full path is stored.
449 q = ("select count(serial), sum(size), max(mtime) "
451 "where serial = (select max(serial) "
453 "where node = v.node and mtime < ?) "
455 "and node in (select node "
457 "where path like ? escape '\\')")
458 execute(q, (before, except_cluster, self.escape_like(path) + '%'))
462 size = r[1] - props[SIZE]
463 mtime = max(mtime, r[2])
464 return (count, size, mtime)
466 def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
467 """Create a new version from the given properties.
468 Return the (serial, mtime) of the new version.
471 q = ("insert into versions (node, hash, size, source, mtime, muser, uuid, cluster) "
472 "values (?, ?, ?, ?, ?, ?, ?, ?)")
474 props = (node, hash, size, source, mtime, muser, uuid, cluster)
475 serial = self.execute(q, props).lastrowid
476 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
479 def version_lookup(self, node, before=inf, cluster=0):
480 """Lookup the current version of the given node.
481 Return a list with its properties:
482 (serial, node, hash, size, source, mtime, muser, uuid, cluster)
483 or None if the current version is not found in the given cluster.
486 q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
488 "where serial = (select max(serial) "
490 "where node = ? and mtime < ?) "
492 self.execute(q, (node, before, cluster))
493 props = self.fetchone()
494 if props is not None:
498 def version_get_properties(self, serial, keys=(), propnames=_propnames):
499 """Return a sequence of values for the properties of
500 the version specified by serial and the keys, in the order given.
501 If keys is empty, return all properties in the order
502 (serial, node, hash, size, source, mtime, muser, uuid, cluster).
505 q = ("select serial, node, hash, size, source, mtime, muser, uuid, cluster "
508 self.execute(q, (serial,))
515 return [r[propnames[k]] for k in keys if k in propnames]
517 def version_recluster(self, serial, cluster):
518 """Move the version into another cluster."""
520 props = self.version_get_properties(serial)
525 oldcluster = props[CLUSTER]
526 if cluster == oldcluster:
530 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
531 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
533 q = "update versions set cluster = ? where serial = ?"
534 self.execute(q, (cluster, serial))
536 def version_remove(self, serial):
537 """Remove the serial specified."""
539 props = self.version_get_properties(serial)
545 cluster = props[CLUSTER]
548 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
550 q = "delete from versions where serial = ?"
551 self.execute(q, (serial,))
554 def attribute_get(self, serial, domain, keys=()):
555 """Return a list of (key, value) pairs of the version specified by serial.
556 If keys is empty, return all attributes.
557 Othwerise, return only those specified.
560 execute = self.execute
562 marks = ','.join('?' for k in keys)
563 q = ("select key, value from attributes "
564 "where key in (%s) and serial = ? and domain = ?" % (marks,))
565 execute(q, keys + (serial, domain))
567 q = "select key, value from attributes where serial = ? and domain = ?"
568 execute(q, (serial, domain))
569 return self.fetchall()
571 def attribute_set(self, serial, domain, items):
572 """Set the attributes of the version specified by serial.
573 Receive attributes as an iterable of (key, value) pairs.
576 q = ("insert or replace into attributes (serial, domain, key, value) "
577 "values (?, ?, ?, ?)")
578 self.executemany(q, ((serial, domain, k, v) for k, v in items))
580 def attribute_del(self, serial, domain, keys=()):
581 """Delete attributes of the version specified by serial.
582 If keys is empty, delete all attributes.
583 Otherwise delete those specified.
587 q = "delete from attributes where serial = ? and domain = ? and key = ?"
588 self.executemany(q, ((serial, domain, key) for key in keys))
590 q = "delete from attributes where serial = ? and domain = ?"
591 self.execute(q, (serial, domain))
593 def attribute_copy(self, source, dest):
594 q = ("insert or replace into attributes "
595 "select ?, domain, key, value from attributes "
597 self.execute(q, (dest, source))
599 def _construct_filters(self, domain, filterq):
600 if not domain or not filterq:
604 append = subqlist.append
605 included, excluded, opers = parse_filters(filterq)
609 subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
610 subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
617 subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
618 subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
625 for k, o, v in opers:
626 subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
627 subq += "key = ? and value %s ?" % (o,)
629 args += [domain, k, v]
635 subq = ' and ' + ' and '.join(subqlist)
639 def _construct_paths(self, pathq):
644 subq += ' or '.join(("n.path like ? escape '\\'" for x in pathq))
646 args = tuple([self.escape_like(x) + '%' for x in pathq])
650 def _construct_size(self, sizeq):
651 if not sizeq or len(sizeq) != 2:
657 subq += " and v.size >= ?"
660 subq += " and v.size < ?"
665 def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
666 """Return a list with all keys pairs defined
667 for all latest versions under parent that
668 do not belong to the cluster.
671 # TODO: Use another table to store before=inf results.
672 q = ("select distinct a.key "
673 "from attributes a, versions v, nodes n "
674 "where v.serial = (select max(serial) "
676 "where node = v.node and mtime < ?) "
677 "and v.cluster != ? "
678 "and v.node in (select node "
681 "and a.serial = v.serial "
683 "and n.node = v.node")
684 args = (before, except_cluster, parent, domain)
685 subq, subargs = self._construct_paths(pathq)
689 self.execute(q, args)
690 return [r[0] for r in self.fetchall()]
692 def latest_version_list(self, parent, prefix='', delimiter=None,
693 start='', limit=10000, before=inf,
694 except_cluster=0, pathq=[], domain=None, filterq=[], sizeq=None):
695 """Return a (list of (path, serial) tuples, list of common prefixes)
696 for the current versions of the paths with the given parent,
697 matching the following criteria.
699 The property tuple for a version is returned if all
700 of these conditions are true:
706 c. path starts with prefix (and paths in pathq)
708 d. version is the max up to before
710 e. version is not in cluster
712 f. the path does not have the delimiter occuring
713 after the prefix, or ends with the delimiter
715 g. serial matches the attribute filter query.
717 A filter query is a comma-separated list of
718 terms in one of these three forms:
721 an attribute with this key must exist
724 an attribute with this key must not exist
727 the attribute with this key satisfies the value
728 where ?op is one of =, != <=, >=, <, >.
730 h. the size is in the range set by sizeq
732 The list of common prefixes includes the prefixes
733 matching up to the first delimiter after prefix,
734 and are reported only once, as "virtual directories".
735 The delimiter is included in the prefixes.
737 If arguments are None, then the corresponding matching rule
740 Limit applies to the first list of tuples returned.
743 execute = self.execute
745 if not start or start < prefix:
746 start = strprevling(prefix)
747 nextling = strnextling(prefix)
749 q = ("select distinct n.path, v.serial "
750 "from versions v, nodes n "
751 "where v.serial = (select max(serial) "
753 "where node = v.node and mtime < ?) "
754 "and v.cluster != ? "
755 "and v.node in (select node "
758 "and n.node = v.node "
759 "and n.path > ? and n.path < ?")
760 args = [before, except_cluster, parent, start, nextling]
762 subq, subargs = self._construct_paths(pathq)
766 subq, subargs = self._construct_size(sizeq)
770 subq, subargs = self._construct_filters(domain, filterq)
775 q = q.replace("attributes a, ", "")
776 q = q.replace("and a.serial = v.serial ", "")
777 q += " order by n.path"
783 return self.fetchall(), ()
788 fetchone = self.fetchone
790 pappend = prefixes.append
792 mappend = matches.append
800 idx = path.find(delimiter, pfz)
809 if idx + dz == len(path):
812 continue # Get one more, in case there is a path.
818 args[3] = strnextling(pf) # New start.
821 return matches, prefixes
823 def latest_uuid(self, uuid):
824 """Return a (path, serial) tuple, for the latest version of the given uuid."""
826 q = ("select n.path, v.serial "
827 "from versions v, nodes n "
828 "where v.serial = (select max(serial) "
831 "and n.node = v.node")
832 self.execute(q, (uuid,))
833 return self.fetchone()