1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 from dbworker import DBWorker
41 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
46 def strnextling(prefix):
47 """return the first unicode string
48 greater than but not starting with given prefix.
49 strnextling('hello') -> 'hellp'
52 ## all strings start with the null string,
53 ## therefore we have to approximate strnextling('')
54 ## with the last unicode character supported by python
55 ## 0x10ffff for wide (32-bit unicode) python builds
56 ## 0x00ffff for narrow (16-bit unicode) python builds
57 ## We will not autodetect. 0xffff is safe enough.
66 def strprevling(prefix):
67 """return an approximation of the last unicode string
68 less than but not starting with given prefix.
69 strprevling(u'hello') -> u'helln\\xffff'
72 ## There is no prevling for the null string
77 s += unichr(c-1) + unichr(0xffff)
82 _regexfilter = re.compile('(!?)\s*([\w-]+)\s*(=|!=|<=|>=|<|>)?\s*(.*)$', re.UNICODE)
96 """Nodes store path organization.
97 Versions store object history.
98 Attributes store metadata.
101 # TODO: Keep size of object in one place.
103 def __init__(self, **params):
104 DBWorker.__init__(self, **params)
105 execute = self.execute
107 execute(""" pragma foreign_keys = on """)
109 execute(""" create table if not exists nodes
110 ( node integer primary key,
111 parent integer default 0,
112 path text not null default '',
114 references nodes(node)
116 on delete cascade )""")
117 execute(""" create unique index if not exists idx_nodes_path
120 execute(""" create table if not exists statistics
122 population integer not null default 0,
123 size integer not null default 0,
125 cluster integer not null default 0,
126 primary key (node, cluster)
128 references nodes(node)
130 on delete cascade )""")
132 execute(""" create table if not exists versions
133 ( serial integer primary key,
135 size integer not null default 0,
138 muser text not null default '',
139 cluster integer not null default 0,
141 references nodes(node)
143 on delete cascade ) """)
144 # execute(""" create index if not exists idx_versions_path
145 # on nodes(cluster, node, path) """)
146 # execute(""" create index if not exists idx_versions_mtime
147 # on nodes(mtime) """)
149 execute(""" create table if not exists attributes
153 primary key (serial, key)
155 references versions(serial)
157 on delete cascade ) """)
159 q = "insert or ignore into nodes(node, parent) values (?, ?)"
160 execute(q, (ROOTNODE, ROOTNODE))
162 def node_create(self, parent, path):
163 """Create a new node from the given properties.
164 Return the node identifier of the new node.
167 q = ("insert into nodes (parent, path) "
169 props = (parent, path)
170 return self.execute(q, props).lastrowid
172 def node_lookup(self, path):
173 """Lookup the current node of the given path.
174 Return None if the path is not found.
177 q = "select node from nodes where path = ?"
178 self.execute(q, (path,))
184 def node_get_properties(self, node):
185 """Return the node's (parent, path).
186 Return None if the node is not found.
189 q = "select parent, path from nodes where node = ?"
190 self.execute(q, (node,))
191 return self.fetchone()
193 def node_count_children(self, node):
194 """Return node's child count."""
196 q = "select count(node) from nodes where parent = ? and node != 0"
197 self.execute(q, (node,))
203 def node_purge_children(self, parent, before=inf, cluster=0):
204 """Delete all versions with the specified
205 parent and cluster, and return
206 the serials of versions deleted.
207 Clears out nodes with no remaining versions.
210 execute = self.execute
211 q = ("select count(serial), sum(size) from versions "
212 "where node in (select node "
217 args = (parent, cluster, before)
219 nr, size = self.fetchone()
222 self.statistics_update(parent, -nr, -size, cluster)
223 self.statistics_update_ancestors(parent, -nr, -size, cluster)
225 q = ("select serial from versions "
226 "where node in (select node "
232 serials = [r[SERIAL] for r in self.fetchall()]
233 q = ("delete from versions "
234 "where node in (select node "
240 q = ("delete from nodes "
241 "where node in (select node from nodes n "
242 "where (select count(serial) "
244 "where node = n.node) = 0 "
246 execute(q, (parent,))
249 def node_purge(self, node, before=inf, cluster=0):
250 """Delete all versions with the specified
251 node and cluster, and return
252 the serials of versions deleted.
253 Clears out the node if it has no remaining versions.
256 execute = self.execute
257 q = ("select count(serial), sum(size) from versions "
261 args = (node, cluster, before)
263 nr, size = self.fetchone()
266 self.statistics_update_ancestors(node, -nr, -size, cluster)
268 q = ("select serial from versions "
273 serials = [r[SERIAL] for r in self.fetchall()]
274 q = ("delete from versions "
279 q = ("delete from nodes "
280 "where node in (select node from nodes n "
281 "where (select count(serial) "
283 "where node = n.node) = 0 "
288 def node_remove(self, node):
289 """Remove the node specified.
290 Return false if the node has children or is not found.
293 if self.node_count_children(node):
297 q = ("select count(serial), sum(size), cluster "
301 self.execute(q, (node,))
302 for population, size, cluster in self.fetchall():
303 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
305 q = "delete from nodes where node = ?"
306 self.execute(q, (node,))
309 def statistics_get(self, node, cluster=0):
310 """Return population, total size and last mtime
311 for all versions under node that belong to the cluster.
314 q = ("select population, size, mtime from statistics "
315 "where node = ? and cluster = ?")
316 self.execute(q, (node, cluster))
317 return self.fetchone()
319 def statistics_update(self, node, population, size, mtime, cluster=0):
320 """Update the statistics of the given node.
321 Statistics keep track the population, total
322 size of objects and mtime in the node's namespace.
323 May be zero or positive or negative numbers.
326 qs = ("select population, size from statistics "
327 "where node = ? and cluster = ?")
328 qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
329 "values (?, ?, ?, ?, ?)")
330 self.execute(qs, (node, cluster))
333 prepopulation, presize = (0, 0)
335 prepopulation, presize = r
336 population += prepopulation
338 self.execute(qu, (node, population, size, mtime, cluster))
340 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
341 """Update the statistics of the given node's parent.
342 Then recursively update all parents up to the root.
343 Population is not recursive.
349 props = self.node_get_properties(node)
353 self.statistics_update(parent, population, size, mtime, cluster)
355 population = 0 # Population isn't recursive
357 def statistics_latest(self, node, before=inf, except_cluster=0):
358 """Return population, total size and last mtime
359 for all latest versions under node that
360 do not belong to the cluster.
364 props = self.node_get_properties(node)
369 # The latest version.
370 q = ("select serial, node, size, source, mtime, muser, cluster "
372 "where serial = (select max(serial) "
374 "where node = ? and mtime < ?) "
376 self.execute(q, (node, before, except_cluster))
377 props = self.fetchone()
382 # First level, just under node (get population).
383 q = ("select count(serial), sum(size), max(mtime) "
385 "where serial = (select max(serial) "
387 "where node = v.node and mtime < ?) "
389 "and node in (select node "
392 self.execute(q, (before, except_cluster, parent))
397 mtime = max(mtime, r[2])
401 # All children (get size and mtime).
402 q = ("select count(serial), sum(size), max(mtime) "
404 "where serial = (select max(serial) "
406 "where node = v.node and mtime < ?) "
408 "and node in (select node "
410 "where path like ?)")
411 self.execute(q, (before, except_cluster, path + '%'))
415 size = r[1] - props[SIZE]
416 mtime = max(mtime, r[2])
417 return (count, size, mtime)
419 def version_create(self, node, size, source, muser, cluster=0):
420 """Create a new version from the given properties.
421 Return the (serial, mtime) of the new version.
424 q = ("insert into versions (node, size, source, mtime, muser, cluster) "
425 "values (?, ?, ?, ?, ?, ?)")
427 props = (node, size, source, mtime, muser, cluster)
428 serial = self.execute(q, props).lastrowid
429 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
432 def version_lookup(self, node, before=inf, cluster=0):
433 """Lookup the current version of the given node.
434 Return a list with its properties:
435 (serial, node, size, source, mtime, muser, cluster)
436 or None if the current version is not found in the given cluster.
439 q = ("select serial, node, size, source, mtime, muser, cluster "
441 "where serial = (select max(serial) "
443 "where node = ? and mtime < ?) "
445 self.execute(q, (node, before, cluster))
446 props = self.fetchone()
447 if props is not None:
451 def version_get_properties(self, serial, keys=(), propnames=_propnames):
452 """Return a sequence of values for the properties of
453 the version specified by serial and the keys, in the order given.
454 If keys is empty, return all properties in the order
455 (serial, node, size, source, mtime, muser, cluster).
458 q = ("select serial, node, size, source, mtime, muser, cluster "
461 self.execute(q, (serial,))
468 return [r[propnames[k]] for k in keys if k in propnames]
470 def version_recluster(self, serial, cluster):
471 """Move the version into another cluster."""
473 props = self.version_get_properties(serial)
478 oldcluster = props[CLUSTER]
479 if cluster == oldcluster:
483 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
484 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
486 q = "update versions set cluster = ? where serial = ?"
487 self.execute(q, (cluster, serial))
489 def version_remove(self, serial):
490 """Remove the serial specified."""
492 props = self.node_get_properties(serial)
497 cluster = props[CLUSTER]
500 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
502 q = "delete from versions where serial = ?"
503 self.execute(q, (serial,))
506 def parse_filters(self, filterq):
507 preterms = filterq.split(',')
511 match = _regexfilter.match
512 for term in preterms:
516 neg, key, op, value = m.groups()
522 opers.append((key, op, value))
524 return included, excluded, opers
526 def construct_filters(self, filterq):
528 append = subqlist.append
529 included, excluded, opers = self.parse_filters(filterq)
534 subq += ','.join(('?' for x in included)) + ")"
539 subq = "key not in ("
540 subq += ','.join(('?' for x in exluded)) + ")"
545 t = (("(key = %s and value %s %s)" % (k, o, v)) for k, o, v in opers)
546 subq = "(" + ' or '.join(t) + ")"
552 subq = " and serial in (select serial from attributes where "
553 subq += ' and '.join(subqlist)
558 # def node_list(self, parent, prefix='',
559 # start='', delimiter=None,
560 # after=0.0, before=inf,
561 # filterq=None, versions=0,
562 # cluster=0, limit=10000):
563 # """Return (a list of property tuples, a list of common prefixes)
564 # for the current versions of the paths with the given parent,
565 # matching the following criteria.
567 # The property tuple for a version is returned if all
568 # of these conditions are true:
570 # a. parent (and cluster) matches
574 # c. path starts with prefix
576 # d. i [versions=true] version is in (after, before)
577 # ii [versions=false] version is the max in (after, before)
579 # e. the path does not have the delimiter occuring
582 # f. serial matches the attribute filter query.
584 # A filter query is a comma-separated list of
585 # terms in one of these three forms:
588 # an attribute with this key must exist
591 # an attribute with this key must not exist
594 # the attribute with this key satisfies the value
595 # where ?op is one of ==, != <=, >=, <, >.
597 # matching up to the first delimiter after prefix,
598 # and are reported only once, as "virtual directories".
599 # The delimiter is included in the prefixes.
600 # Prefixes do appear from (e) even if no paths would match in (f).
602 # If arguments are None, then the corresponding matching rule
606 # execute = self.execute
609 # start = strprevling(prefix)
611 # nextling = strnextling(prefix)
613 # q = ("select serial, parent, path, size, "
614 # "population, popsize, source, mtime, cluster "
616 # "where parent = ? and path > ? and path < ? "
617 # "and mtime > ? and mtime < ? and cluster = ?")
618 # args = [parent, start, nextling, after, before, cluster]
621 # subq, subargs = self.construct_filters(filterq)
622 # if subq is not None:
625 # q += " order by path"
627 # if delimiter is None:
631 # return self.fetchall(), ()
634 # dz = len(delimiter)
636 # fetchone = self.fetchone
638 # pappend = prefixes.append
640 # mappend = matches.append
648 # idx = path.find(delimiter, pfz)
656 # pf = path[:idx + dz]
659 # ## XXX: if we break here due to limit,
660 # ## but a path would also be matched below,
661 # ## the path match would be lost since the
662 # ## next call with start=path would skip both of them.
663 # ## In this case, it is impossible to obey the limit,
664 # ## therefore we will break later, at limit + 1.
665 # if idx + dz == len(path):
672 # args[1] = strnextling(pf) # new start
675 # return matches, prefixes
677 def attribute_get(self, serial, keys=()):
678 """Return a list of (key, value) pairs of the version specified by serial.
679 If keys is empty, return all attributes.
680 Othwerise, return only those specified.
683 execute = self.execute
685 marks = ','.join('?' for k in keys)
686 q = ("select key, value from attributes "
687 "where key in (%s) and serial = ?" % (marks,))
688 execute(q, keys + (serial,))
690 q = "select key, value from attributes where serial = ?"
691 execute(q, (serial,))
692 return self.fetchall()
694 def attribute_set(self, serial, items):
695 """Set the attributes of the version specified by serial.
696 Receive attributes as an iterable of (key, value) pairs.
699 q = ("insert or replace into attributes (serial, key, value) "
701 self.executemany(q, ((serial, k, v) for k, v in items))
703 def attribute_del(self, serial, keys=()):
704 """Delete attributes of the version specified by serial.
705 If keys is empty, delete all attributes.
706 Otherwise delete those specified.
710 q = "delete from attributes where serial = ? and key = ?"
711 self.executemany(q, ((serial, key) for key in keys))
713 q = "delete from attributes where serial = ?"
714 self.execute(q, (serial,))
716 # def node_get_attribute_keys(self, parent):
717 # """Return a list with all keys pairs defined
718 # for the namespace of the node specified.
721 # q = ("select distinct key from attributes a, versions v, nodes n "
722 # "where a.serial = v.serial and v.node = n.node and n.parent = ?")
723 # self.execute(q, (parent,))
724 # return [r[0] for r in self.fetchall()]
726 def attribute_copy(self, source, dest):
727 q = ("insert or replace into attributes "
728 "select ?, key, value from attributes "
730 self.execute(q, (dest, source))