1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 from dbworker import DBWorker
41 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
46 def strnextling(prefix):
47 """return the first unicode string
48 greater than but not starting with given prefix.
49 strnextling('hello') -> 'hellp'
52 ## all strings start with the null string,
53 ## therefore we have to approximate strnextling('')
54 ## with the last unicode character supported by python
55 ## 0x10ffff for wide (32-bit unicode) python builds
56 ## 0x00ffff for narrow (16-bit unicode) python builds
57 ## We will not autodetect. 0xffff is safe enough.
66 def strprevling(prefix):
67 """return an approximation of the last unicode string
68 less than but not starting with given prefix.
69 strprevling(u'hello') -> u'helln\\xffff'
72 ## There is no prevling for the null string
77 s += unichr(c-1) + unichr(0xffff)
82 _regexfilter = re.compile('(!?)\s*([\w-]+)\s*(=|!=|<=|>=|<|>)?\s*(.*)$', re.UNICODE)
96 """Nodes store path organization.
97 Versions store object history.
98 Attributes store metadata.
101 # TODO: Keep size of object in one place.
103 def __init__(self, **params):
104 execute = self.execute
106 execute(""" pragma foreign_keys = on """)
108 execute(""" create table if not exists nodes
109 ( node integer primary key,
110 parent integer not null default 0,
111 path text not null default '',
113 references nodes(node)
115 on delete cascade )""")
116 execute(""" create unique index if not exists idx_nodes_path
119 execute(""" create table if not exists statistics
120 ( node integer not null,
121 population integer not null default 0,
122 size integer not null default 0,
124 muser text not null default '',
125 cluster integer not null default 0,
126 primary key (node, cluster)
128 references nodes(node)
130 on delete cascade )""")
132 execute(""" create table if not exists versions
133 ( serial integer primary key,
134 node integer not null,
135 size integer not null default 0,
138 cluster integer not null default 0,
140 references nodes(node)
142 on delete cascade ) """)
143 # execute(""" create index if not exists idx_versions_path
144 # on nodes(cluster, node, path) """)
145 # execute(""" create index if not exists idx_versions_mtime
146 # on nodes(mtime) """)
148 execute(""" create table if not exists attributes
152 primary key (serial, key)
154 references versions(serial)
156 on delete cascade ) """)
158 q = "insert or ignore into nodes(node, parent) values (?, ?)"
159 execute(q, (ROOTNODE, ROOTNODE))
161 def node_create(self, parent, path):
162 """Create a new node from the given properties.
163 Return the node identifier of the new node.
166 q = ("insert into nodes (parent, path) "
168 props = (parent, path)
169 return self.execute(q, props).lastrowid
171 def node_lookup(self, path):
172 """Lookup the current node of the given path.
173 Return None if the path is not found.
176 q = ("select node from nodes where path = ?")
177 self.execute(q, (path,))
183 def node_update_ancestors(self, node, population, size, mtime, cluster=0):
184 """Update the population properties of the given node.
185 Population properties keep track the population and total
186 size of objects in the node's namespace.
187 May be zero or positive or negative numbers.
190 qs = ("select population, size from statistics"
191 "where node = ? and cluster = ?")
192 qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
193 "values (?, ?, ?, ?, ?)")
194 qp = "select parent from nodes where serial = ?"
195 execute = self.execute
196 fetchone = self.fetchone
198 execute(qs, (node, cluster))
201 prepopulation, presize = (0, 0)
203 prepopulation, presize = r
204 population += prepopulation
207 execute(qu, (node, population, size, mtime, cluster))
211 population = 0 # Population isn't recursive
218 def node_statistics(self, node, cluster=0):
219 """Return population, total size and last mtime
220 for all versions under node that belong to the cluster.
223 q = ("select population, size, mtime from statistics"
224 "where node = ? and cluster = ?")
225 self.execute(q, (node, cluster))
231 def node_count_children(self, node):
232 """Return node's child count."""
234 q = "select count(node) from nodes where parent = ? and node != 0"
235 self.execute(q, (node,))
241 def node_purge_children(self, parent, before=inf, cluster=0):
242 """Delete all versions with the specified
243 parent and cluster, and return
244 the serials of versions deleted.
245 Clears out nodes with no remaining versions.
248 execute = self.execute
249 q = ("select count(serial), sum(size) from versions "
250 "where node in (select node "
255 args = (parent, cluster, before)
257 nr, size = self.fetchone()
260 # TODO: Statistics for nodes (children) will be wrong.
261 self.node_update_ancestors(parent, -nr, -size, cluster)
263 q = ("select serial from versions "
264 "where node in (select node "
270 serials = [r[SERIAL] for r in self.fetchall()]
271 q = ("delete from versions "
272 "where node in (select node "
278 q = ("delete from nodes n "
279 "where (select count(serial) "
281 "where node = n.node) = 0 "
286 def node_purge(self, node, before=inf, cluster=0):
287 """Delete all versions with the specified
288 node and cluster, and return
289 the serials of versions deleted.
290 Clears out the node if it has no remaining versions.
293 execute = self.execute
294 q = ("select count(serial), sum(size) from versions "
298 args = (node, cluster, before)
300 nr, size = self.fetchone()
303 self.node_update_ancestors(node, -nr, -size, cluster)
305 q = ("select serial from versions "
310 serials = [r[SERIAL] for r in self.fetchall()]
311 q = ("delete from versions "
316 q = ("delete from nodes n "
317 "where (select count(serial) "
319 "where node = n.node) = 0 "
324 def node_remove(self, node):
325 """Remove the node specified.
326 Return false if the node has children or is not found.
329 if self.node_children(node):
332 q = "select parent from node where node = ?"
333 self.execute(q, (node,))
340 q = "select population, size, cluster from statistics where node = ?"
341 self.execute(q, (node,))
342 for population, size, cluster in self.fetchall():
343 self.node_update_ancestors(parent, -population, -size, mtime, cluster)
345 q = "delete from nodes where node = ?"
346 self.execute(q, (node,))
349 # def node_remove(self, serial, recursive=0):
350 # """Remove the node specified by serial.
351 # Return false if the node is not found,
352 # or has ancestors and recursive is not set.
355 # props = self.node_get_properties(serial)
359 # parent = props[PARENT]
360 # pop = props[POPULATION]
361 # popsize = props[POPSIZE]
362 # if pop and not recursive:
365 # q = ("delete from nodes where serial = ?")
366 # self.execute(q, (serial,))
367 # self.node_update_ancestors(parent, -pop-1, -size-popsize)
370 def version_create(self, node, size, source, muser, cluster=0):
371 """Create a new version from the given properties.
372 Return the (serial, mtime) of the new version.
375 q = ("insert into nodes (node, size, source, mtime, muser, cluster) "
376 "values (?, ?, ?, ?, ?)")
378 props = (node, path, size, source, mtime, muser, cluster)
379 serial = self.execute(q, props).lastrowid
380 self.node_update_ancestors(node, 1, size, mtime, cluster)
383 def version_lookup(self, node, before=inf, cluster=0):
384 """Lookup the current version of the given node.
385 Return a list with its properties:
386 (serial, node, size, source, mtime, muser, cluster)
387 or None if the current version is not found in the given cluster.
390 q = ("select serial, node, size, source, mtime, muser, cluster "
392 "where serial = (select max(serial) "
394 "where node = ? and mtime < ?) "
396 self.execute(q, (node, before, cluster))
397 props = self.fetchone()
398 if props is not None:
402 def version_get_properties(self, serial, keys=(), propnames=_propnames):
403 """Return a sequence of values for the properties of
404 the version specified by serial and the keys, in the order given.
405 If keys is empty, return all properties in the order
406 (serial, node, size, source, mtime, muser, cluster).
409 q = ("select serial, node, path, size, source, mtime, muser, cluster "
412 self.execute(q, (serial,))
419 return [r[propnames[k]] for k in keys if k in propnames]
421 # def node_set_properties(self, serial, items, propnames=_mutablepropnames):
422 # """Set the properties of a node specified by the node serial and
423 # the items iterable of (name, value) pairs.
424 # Mutable properties are %s.
425 # Invalid property names and 'serial' are not set.
431 # keys, vals = zip(*items)
432 # keystr = ','.join(("%s = ?" % k) for k in keys if k in propnames)
435 # q = "update nodes set %s where serial = ?" % keystr
437 # self.execute(q, vals)
439 def version_recluster(self, serial, cluster):
440 """Move the version into another cluster."""
442 props = self.node_get_properties(source)
446 oldcluster = props[CLUSTER]
447 if cluster == oldcluster:
450 self.node_update_ancestors(node, -1, -size, mtime, oldcluster)
451 self.node_update_ancestors(node, 1, size, mtime, cluster)
453 q = "update nodes set cluster = ? where serial = ?"
454 self.execute(q, (cluster, serial))
456 # def version_copy(self, serial, node, muser, copy_attr=True):
457 # """Copy the version specified by serial into
458 # a new version of node. Optionally copy attributes.
459 # Return the (serial, mtime) of the new version.
462 # props = self.version_get_properties(serial)
466 # cluster = props[CLUSTER]
467 # new_serial, mtime = self.version_create(node, size, serial, muser, cluster)
469 # self.attribute_copy(serial, new_serial)
470 # return (new_serial, mtime)
472 def path_statistics(self, prefix, before=inf, except_cluster=0):
473 """Return population, total size and last mtime
474 for all latest versions under prefix that
475 do not belong to the cluster.
478 q = ("select count(serial), sum(size), max(mtime) "
480 "where serial = (select max(serial) "
482 "where node = v.node and mtime < ?) "
484 "and node in (select node "
486 "where path like ?)")
487 self.execute(q, (before, except_cluster, prefix + '%'))
493 def parse_filters(self, filterq):
494 preterms = filterq.split(',')
498 match = _regexfilter.match
499 for term in preterms:
503 neg, key, op, value = m.groups()
509 opers.append((key, op, value))
511 return included, excluded, opers
513 def construct_filters(self, filterq):
515 append = subqlist.append
516 included, excluded, opers = self.parse_filters(filterq)
521 subq += ','.join(('?' for x in included)) + ")"
526 subq = "key not in ("
527 subq += ','.join(('?' for x in exluded)) + ")"
532 t = (("(key = %s and value %s %s)" % (k, o, v)) for k, o, v in opers)
533 subq = "(" + ' or '.join(t) + ")"
539 subq = " and serial in (select serial from attributes where "
540 subq += ' and '.join(subqlist)
545 # def node_list(self, parent, prefix='',
546 # start='', delimiter=None,
547 # after=0.0, before=inf,
548 # filterq=None, versions=0,
549 # cluster=0, limit=10000):
550 # """Return (a list of property tuples, a list of common prefixes)
551 # for the current versions of the paths with the given parent,
552 # matching the following criteria.
554 # The property tuple for a version is returned if all
555 # of these conditions are true:
557 # a. parent (and cluster) matches
561 # c. path starts with prefix
563 # d. i [versions=true] version is in (after, before)
564 # ii [versions=false] version is the max in (after, before)
566 # e. the path does not have the delimiter occuring
569 # f. serial matches the attribute filter query.
571 # A filter query is a comma-separated list of
572 # terms in one of these three forms:
575 # an attribute with this key must exist
578 # an attribute with this key must not exist
581 # the attribute with this key satisfies the value
582 # where ?op is one of ==, != <=, >=, <, >.
584 # matching up to the first delimiter after prefix,
585 # and are reported only once, as "virtual directories".
586 # The delimiter is included in the prefixes.
587 # Prefixes do appear from (e) even if no paths would match in (f).
589 # If arguments are None, then the corresponding matching rule
593 # execute = self.execute
596 # start = strprevling(prefix)
598 # nextling = strnextling(prefix)
600 # q = ("select serial, parent, path, size, "
601 # "population, popsize, source, mtime, cluster "
603 # "where parent = ? and path > ? and path < ? "
604 # "and mtime > ? and mtime < ? and cluster = ?")
605 # args = [parent, start, nextling, after, before, cluster]
608 # subq, subargs = self.construct_filters(filterq)
609 # if subq is not None:
612 # q += " order by path"
614 # if delimiter is None:
618 # return self.fetchall(), ()
621 # dz = len(delimiter)
623 # fetchone = self.fetchone
625 # pappend = prefixes.append
627 # mappend = matches.append
635 # idx = path.find(delimiter, pfz)
643 # pf = path[:idx + dz]
646 # ## XXX: if we break here due to limit,
647 # ## but a path would also be matched below,
648 # ## the path match would be lost since the
649 # ## next call with start=path would skip both of them.
650 # ## In this case, it is impossible to obey the limit,
651 # ## therefore we will break later, at limit + 1.
652 # if idx + dz == len(path):
659 # args[1] = strnextling(pf) # new start
662 # return matches, prefixes
664 def attribute_get(self, serial, keys=()):
665 """Return a list of (key, value) pairs of the version specified by serial.
666 If keys is empty, return all attributes.
667 Othwerise, return only those specified.
670 execute = self.execute
672 marks = ','.join('?' for k in keys)
673 q = ("select key, value from attributes "
674 "where key in (%s) and serial = ?" % (marks,))
675 execute(q, keys + (serial,))
677 q = "select key, value from attributes where serial = ?"
678 execute(q, (serial,))
679 return self.fetchall()
681 def attribute_set(self, serial, items):
682 """Set the attributes of the version specified by serial.
683 Receive attributes as an iterable of (key, value) pairs.
686 q = ("insert or replace into attributes (serial, key, value) "
688 self.executemany(q, ((serial, k, v) for k, v in items))
690 def attribute_del(self, serial, keys=()):
691 """Delete attributes of the version specified by serial.
692 If keys is empty, delete all attributes.
693 Otherwise delete those specified.
697 q = "delete from attributes where serial = ? and key = ?"
698 self.executemany(q, ((serial, key) for key in keys))
700 q = "delete from attributes where serial = ?"
701 self.execute(q, (serial,))
703 # def node_get_attribute_keys(self, parent):
704 # """Return a list with all keys pairs defined
705 # for the namespace of the node specified.
708 # q = ("select distinct key from attributes a, versions v, nodes n "
709 # "where a.serial = v.serial and v.node = n.node and n.parent = ?")
710 # self.execute(q, (parent,))
711 # return [r[0] for r in self.fetchall()]
713 def attribute_copy(self, source, dest):
714 q = ("insert or replace into attributes "
715 "select ?, key, value from attributes "
717 self.execute(q, (dest, source))