1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 from dbworker import DBWorker
41 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
46 def strnextling(prefix):
47 """Return the first unicode string
48 greater than but not starting with given prefix.
49 strnextling('hello') -> 'hellp'
52 ## all strings start with the null string,
53 ## therefore we have to approximate strnextling('')
54 ## with the last unicode character supported by python
55 ## 0x10ffff for wide (32-bit unicode) python builds
56 ## 0x00ffff for narrow (16-bit unicode) python builds
57 ## We will not autodetect. 0xffff is safe enough.
66 def strprevling(prefix):
67 """Return an approximation of the last unicode string
68 less than but not starting with given prefix.
69 strprevling(u'hello') -> u'helln\\xffff'
72 ## There is no prevling for the null string
77 s += unichr(c-1) + unichr(0xffff)
94 """Nodes store path organization and have multiple versions.
95 Versions store object history and have multiple attributes.
96 Attributes store metadata.
99 # TODO: Provide an interface for included and excluded clusters.
101 def __init__(self, **params):
102 DBWorker.__init__(self, **params)
103 execute = self.execute
105 execute(""" pragma foreign_keys = on """)
107 execute(""" create table if not exists nodes
108 ( node integer primary key,
109 parent integer default 0,
110 path text not null default '',
112 references nodes(node)
114 on delete cascade ) """)
115 execute(""" create unique index if not exists idx_nodes_path
118 execute(""" create table if not exists policy
122 primary key (node, key)
124 references nodes(node)
126 on delete cascade ) """)
128 execute(""" create table if not exists statistics
130 population integer not null default 0,
131 size integer not null default 0,
133 cluster integer not null default 0,
134 primary key (node, cluster)
136 references nodes(node)
138 on delete cascade ) """)
140 execute(""" create table if not exists versions
141 ( serial integer primary key,
144 size integer not null default 0,
147 muser text not null default '',
148 cluster integer not null default 0,
150 references nodes(node)
152 on delete cascade ) """)
153 execute(""" create index if not exists idx_versions_node_mtime
154 on versions(node, mtime) """)
156 execute(""" create table if not exists attributes
160 primary key (serial, key)
162 references versions(serial)
164 on delete cascade ) """)
166 q = "insert or ignore into nodes(node, parent) values (?, ?)"
167 execute(q, (ROOTNODE, ROOTNODE))
169 def node_create(self, parent, path):
170 """Create a new node from the given properties.
171 Return the node identifier of the new node.
174 q = ("insert into nodes (parent, path) "
176 props = (parent, path)
177 return self.execute(q, props).lastrowid
179 def node_lookup(self, path):
180 """Lookup the current node of the given path.
181 Return None if the path is not found.
184 q = "select node from nodes where path = ?"
185 self.execute(q, (path,))
191 def node_get_properties(self, node):
192 """Return the node's (parent, path).
193 Return None if the node is not found.
196 q = "select parent, path from nodes where node = ?"
197 self.execute(q, (node,))
198 return self.fetchone()
200 def node_get_versions(self, node, keys=(), propnames=_propnames):
201 """Return the properties of all versions at node.
202 If keys is empty, return all properties in the order
203 (serial, node, size, source, mtime, muser, cluster).
206 q = ("select serial, node, hash, size, source, mtime, muser, cluster "
209 self.execute(q, (node,))
216 return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
218 def node_count_children(self, node):
219 """Return node's child count."""
221 q = "select count(node) from nodes where parent = ? and node != 0"
222 self.execute(q, (node,))
228 def node_purge_children(self, parent, before=inf, cluster=0):
229 """Delete all versions with the specified
230 parent and cluster, and return
231 the hashes of versions deleted.
232 Clears out nodes with no remaining versions.
235 execute = self.execute
236 q = ("select count(serial), sum(size) from versions "
237 "where node in (select node "
242 args = (parent, cluster, before)
244 nr, size = self.fetchone()
248 self.statistics_update(parent, -nr, -size, mtime, cluster)
249 self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
251 q = ("select hash from versions "
252 "where node in (select node "
258 hashes = [r[0] for r in self.fetchall()]
259 q = ("delete from versions "
260 "where node in (select node "
266 q = ("delete from nodes "
267 "where node in (select node from nodes n "
268 "where (select count(serial) "
270 "where node = n.node) = 0 "
272 execute(q, (parent,))
275 def node_purge(self, node, before=inf, cluster=0):
276 """Delete all versions with the specified
277 node and cluster, and return
278 the hashes of versions deleted.
279 Clears out the node if it has no remaining versions.
282 execute = self.execute
283 q = ("select count(serial), sum(size) from versions "
287 args = (node, cluster, before)
289 nr, size = self.fetchone()
293 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
295 q = ("select hash from versions "
300 hashes = [r[0] for r in self.fetchall()]
301 q = ("delete from versions "
306 q = ("delete from nodes "
307 "where node in (select node from nodes n "
308 "where (select count(serial) "
310 "where node = n.node) = 0 "
315 def node_remove(self, node):
316 """Remove the node specified.
317 Return false if the node has children or is not found.
320 if self.node_count_children(node):
324 q = ("select count(serial), sum(size), cluster "
328 self.execute(q, (node,))
329 for population, size, cluster in self.fetchall():
330 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
332 q = "delete from nodes where node = ?"
333 self.execute(q, (node,))
336 def policy_get(self, node):
337 q = "select key, value from policy where node = ?"
338 self.execute(q, (node,))
339 return dict(self.fetchall())
341 def policy_set(self, node, policy):
342 q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
343 self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
345 def statistics_get(self, node, cluster=0):
346 """Return population, total size and last mtime
347 for all versions under node that belong to the cluster.
350 q = ("select population, size, mtime from statistics "
351 "where node = ? and cluster = ?")
352 self.execute(q, (node, cluster))
353 return self.fetchone()
355 def statistics_update(self, node, population, size, mtime, cluster=0):
356 """Update the statistics of the given node.
357 Statistics keep track the population, total
358 size of objects and mtime in the node's namespace.
359 May be zero or positive or negative numbers.
362 qs = ("select population, size from statistics "
363 "where node = ? and cluster = ?")
364 qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
365 "values (?, ?, ?, ?, ?)")
366 self.execute(qs, (node, cluster))
369 prepopulation, presize = (0, 0)
371 prepopulation, presize = r
372 population += prepopulation
374 self.execute(qu, (node, population, size, mtime, cluster))
376 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
377 """Update the statistics of the given node's parent.
378 Then recursively update all parents up to the root.
379 Population is not recursive.
385 props = self.node_get_properties(node)
389 self.statistics_update(parent, population, size, mtime, cluster)
391 population = 0 # Population isn't recursive
393 def statistics_latest(self, node, before=inf, except_cluster=0):
394 """Return population, total size and last mtime
395 for all latest versions under node that
396 do not belong to the cluster.
399 execute = self.execute
400 fetchone = self.fetchone
403 props = self.node_get_properties(node)
408 # The latest version.
409 q = ("select serial, node, hash, size, source, mtime, muser, cluster "
411 "where serial = (select max(serial) "
413 "where node = ? and mtime < ?) "
415 execute(q, (node, before, except_cluster))
421 # First level, just under node (get population).
422 q = ("select count(serial), sum(size), max(mtime) "
424 "where serial = (select max(serial) "
426 "where node = v.node and mtime < ?) "
428 "and node in (select node "
431 execute(q, (before, except_cluster, node))
436 mtime = max(mtime, r[2])
440 # All children (get size and mtime).
441 # XXX: This is why the full path is stored.
442 q = ("select count(serial), sum(size), max(mtime) "
444 "where serial = (select max(serial) "
446 "where node = v.node and mtime < ?) "
448 "and node in (select node "
450 "where path like ? escape '\\')")
451 execute(q, (before, except_cluster, self.escape_like(path) + '%'))
455 size = r[1] - props[SIZE]
456 mtime = max(mtime, r[2])
457 return (count, size, mtime)
459 def version_create(self, node, hash, size, source, muser, cluster=0):
460 """Create a new version from the given properties.
461 Return the (serial, mtime) of the new version.
464 q = ("insert into versions (node, hash, size, source, mtime, muser, cluster) "
465 "values (?, ?, ?, ?, ?, ?, ?)")
467 props = (node, hash, size, source, mtime, muser, cluster)
468 serial = self.execute(q, props).lastrowid
469 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
472 def version_lookup(self, node, before=inf, cluster=0):
473 """Lookup the current version of the given node.
474 Return a list with its properties:
475 (serial, node, hash, size, source, mtime, muser, cluster)
476 or None if the current version is not found in the given cluster.
479 q = ("select serial, node, hash, size, source, mtime, muser, cluster "
481 "where serial = (select max(serial) "
483 "where node = ? and mtime < ?) "
485 self.execute(q, (node, before, cluster))
486 props = self.fetchone()
487 if props is not None:
491 def version_get_properties(self, serial, keys=(), propnames=_propnames):
492 """Return a sequence of values for the properties of
493 the version specified by serial and the keys, in the order given.
494 If keys is empty, return all properties in the order
495 (serial, node, hash, size, source, mtime, muser, cluster).
498 q = ("select serial, node, hash, size, source, mtime, muser, cluster "
501 self.execute(q, (serial,))
508 return [r[propnames[k]] for k in keys if k in propnames]
510 def version_recluster(self, serial, cluster):
511 """Move the version into another cluster."""
513 props = self.version_get_properties(serial)
518 oldcluster = props[CLUSTER]
519 if cluster == oldcluster:
523 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
524 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
526 q = "update versions set cluster = ? where serial = ?"
527 self.execute(q, (cluster, serial))
529 def version_remove(self, serial):
530 """Remove the serial specified."""
532 props = self.version_get_properties(serial)
538 cluster = props[CLUSTER]
541 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
543 q = "delete from versions where serial = ?"
544 self.execute(q, (serial,))
547 def attribute_get(self, serial, keys=()):
548 """Return a list of (key, value) pairs of the version specified by serial.
549 If keys is empty, return all attributes.
550 Othwerise, return only those specified.
553 execute = self.execute
555 marks = ','.join('?' for k in keys)
556 q = ("select key, value from attributes "
557 "where key in (%s) and serial = ?" % (marks,))
558 execute(q, keys + (serial,))
560 q = "select key, value from attributes where serial = ?"
561 execute(q, (serial,))
562 return self.fetchall()
564 def attribute_set(self, serial, items):
565 """Set the attributes of the version specified by serial.
566 Receive attributes as an iterable of (key, value) pairs.
569 q = ("insert or replace into attributes (serial, key, value) "
571 self.executemany(q, ((serial, k, v) for k, v in items))
573 def attribute_del(self, serial, keys=()):
574 """Delete attributes of the version specified by serial.
575 If keys is empty, delete all attributes.
576 Otherwise delete those specified.
580 q = "delete from attributes where serial = ? and key = ?"
581 self.executemany(q, ((serial, key) for key in keys))
583 q = "delete from attributes where serial = ?"
584 self.execute(q, (serial,))
586 def attribute_copy(self, source, dest):
587 q = ("insert or replace into attributes "
588 "select ?, key, value from attributes "
590 self.execute(q, (dest, source))
592 def _construct_filters(self, filterq):
596 args = filterq.split(',')
597 subq = " and a.key in ("
598 subq += ','.join(('?' for x in args))
603 def _construct_paths(self, pathq):
608 subq += ' or '.join(("n.path like ? escape '\\'" for x in pathq))
610 args = tuple([self.escape_like(x) + '%' for x in pathq])
614 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
615 """Return a list with all keys pairs defined
616 for all latest versions under parent that
617 do not belong to the cluster.
620 # TODO: Use another table to store before=inf results.
621 q = ("select distinct a.key "
622 "from attributes a, versions v, nodes n "
623 "where v.serial = (select max(serial) "
625 "where node = v.node and mtime < ?) "
626 "and v.cluster != ? "
627 "and v.node in (select node "
630 "and a.serial = v.serial "
631 "and n.node = v.node")
632 args = (before, except_cluster, parent)
633 subq, subargs = self._construct_paths(pathq)
637 self.execute(q, args)
638 return [r[0] for r in self.fetchall()]
640 def latest_version_list(self, parent, prefix='', delimiter=None,
641 start='', limit=10000, before=inf,
642 except_cluster=0, pathq=[], filterq=None):
643 """Return a (list of (path, serial) tuples, list of common prefixes)
644 for the current versions of the paths with the given parent,
645 matching the following criteria.
647 The property tuple for a version is returned if all
648 of these conditions are true:
654 c. path starts with prefix (and paths in pathq)
656 d. version is the max up to before
658 e. version is not in cluster
660 f. the path does not have the delimiter occuring
661 after the prefix, or ends with the delimiter
663 g. serial matches the attribute filter query.
665 A filter query is a comma-separated list of
666 terms in one of these three forms:
669 an attribute with this key must exist
672 an attribute with this key must not exist
675 the attribute with this key satisfies the value
676 where ?op is one of ==, != <=, >=, <, >.
678 The list of common prefixes includes the prefixes
679 matching up to the first delimiter after prefix,
680 and are reported only once, as "virtual directories".
681 The delimiter is included in the prefixes.
683 If arguments are None, then the corresponding matching rule
686 Limit applies to the first list of tuples returned.
689 execute = self.execute
691 if not start or start < prefix:
692 start = strprevling(prefix)
693 nextling = strnextling(prefix)
695 q = ("select distinct n.path, v.serial "
696 "from attributes a, versions v, nodes n "
697 "where v.serial = (select max(serial) "
699 "where node = v.node and mtime < ?) "
700 "and v.cluster != ? "
701 "and v.node in (select node "
704 "and a.serial = v.serial "
705 "and n.node = v.node "
706 "and n.path > ? and n.path < ?")
707 args = [before, except_cluster, parent, start, nextling]
709 subq, subargs = self._construct_paths(pathq)
713 subq, subargs = self._construct_filters(filterq)
718 q = q.replace("attributes a, ", "")
719 q = q.replace("and a.serial = v.serial ", "")
720 q += " order by n.path"
726 return self.fetchall(), ()
731 fetchone = self.fetchone
733 pappend = prefixes.append
735 mappend = matches.append
743 idx = path.find(delimiter, pfz)
752 if idx + dz == len(path):
755 continue # Get one more, in case there is a path.
761 args[3] = strnextling(pf) # New start.
764 return matches, prefixes