1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 from dbworker import DBWorker
41 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
46 def strnextling(prefix):
47 """Return the first unicode string
48 greater than but not starting with given prefix.
49 strnextling('hello') -> 'hellp'
52 ## all strings start with the null string,
53 ## therefore we have to approximate strnextling('')
54 ## with the last unicode character supported by python
55 ## 0x10ffff for wide (32-bit unicode) python builds
56 ## 0x00ffff for narrow (16-bit unicode) python builds
57 ## We will not autodetect. 0xffff is safe enough.
66 def strprevling(prefix):
67 """Return an approximation of the last unicode string
68 less than but not starting with given prefix.
69 strprevling(u'hello') -> u'helln\\xffff'
72 ## There is no prevling for the null string
77 s += unichr(c-1) + unichr(0xffff)
93 """Nodes store path organization and have multiple versions.
94 Versions store object history and have multiple attributes.
95 Attributes store metadata.
98 # TODO: Provide an interface for included and excluded clusters.
100 def __init__(self, **params):
101 DBWorker.__init__(self, **params)
102 execute = self.execute
104 execute(""" pragma foreign_keys = on """)
106 execute(""" create table if not exists nodes
107 ( node integer primary key,
108 parent integer default 0,
109 path text not null default '',
111 references nodes(node)
113 on delete cascade )""")
114 execute(""" create unique index if not exists idx_nodes_path
117 execute(""" create table if not exists statistics
119 population integer not null default 0,
120 size integer not null default 0,
122 cluster integer not null default 0,
123 primary key (node, cluster)
125 references nodes(node)
127 on delete cascade )""")
129 execute(""" create table if not exists versions
130 ( serial integer primary key,
132 size integer not null default 0,
135 muser text not null default '',
136 cluster integer not null default 0,
138 references nodes(node)
140 on delete cascade ) """)
141 execute(""" create index if not exists idx_versions_node
143 # TODO: Sort out if more indexes are needed.
144 # execute(""" create index if not exists idx_versions_mtime
145 # on nodes(mtime) """)
147 execute(""" create table if not exists attributes
151 primary key (serial, key)
153 references versions(serial)
155 on delete cascade ) """)
157 q = "insert or ignore into nodes(node, parent) values (?, ?)"
158 execute(q, (ROOTNODE, ROOTNODE))
160 def node_create(self, parent, path):
161 """Create a new node from the given properties.
162 Return the node identifier of the new node.
165 q = ("insert into nodes (parent, path) "
167 props = (parent, path)
168 return self.execute(q, props).lastrowid
170 def node_lookup(self, path):
171 """Lookup the current node of the given path.
172 Return None if the path is not found.
175 q = "select node from nodes where path = ?"
176 self.execute(q, (path,))
182 def node_get_properties(self, node):
183 """Return the node's (parent, path).
184 Return None if the node is not found.
187 q = "select parent, path from nodes where node = ?"
188 self.execute(q, (node,))
189 return self.fetchone()
191 def node_get_versions(self, node, keys=(), propnames=_propnames):
192 """Return the properties of all versions at node.
193 If keys is empty, return all properties in the order
194 (serial, node, size, source, mtime, muser, cluster).
197 q = ("select serial, node, size, source, mtime, muser, cluster "
200 self.execute(q, (node,))
207 return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
209 def node_count_children(self, node):
210 """Return node's child count."""
212 q = "select count(node) from nodes where parent = ? and node != 0"
213 self.execute(q, (node,))
219 def node_purge_children(self, parent, before=inf, cluster=0):
220 """Delete all versions with the specified
221 parent and cluster, and return
222 the serials of versions deleted.
223 Clears out nodes with no remaining versions.
226 execute = self.execute
227 q = ("select count(serial), sum(size) from versions "
228 "where node in (select node "
233 args = (parent, cluster, before)
235 nr, size = self.fetchone()
239 self.statistics_update(parent, -nr, -size, mtime, cluster)
240 self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
242 q = ("select serial from versions "
243 "where node in (select node "
249 serials = [r[SERIAL] for r in self.fetchall()]
250 q = ("delete from versions "
251 "where node in (select node "
257 q = ("delete from nodes "
258 "where node in (select node from nodes n "
259 "where (select count(serial) "
261 "where node = n.node) = 0 "
263 execute(q, (parent,))
266 def node_purge(self, node, before=inf, cluster=0):
267 """Delete all versions with the specified
268 node and cluster, and return
269 the serials of versions deleted.
270 Clears out the node if it has no remaining versions.
273 execute = self.execute
274 q = ("select count(serial), sum(size) from versions "
278 args = (node, cluster, before)
280 nr, size = self.fetchone()
284 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
286 q = ("select serial from versions "
291 serials = [r[SERIAL] for r in self.fetchall()]
292 q = ("delete from versions "
297 q = ("delete from nodes "
298 "where node in (select node from nodes n "
299 "where (select count(serial) "
301 "where node = n.node) = 0 "
306 def node_remove(self, node):
307 """Remove the node specified.
308 Return false if the node has children or is not found.
311 if self.node_count_children(node):
315 q = ("select count(serial), sum(size), cluster "
319 self.execute(q, (node,))
320 for population, size, cluster in self.fetchall():
321 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
323 q = "delete from nodes where node = ?"
324 self.execute(q, (node,))
327 def statistics_get(self, node, cluster=0):
328 """Return population, total size and last mtime
329 for all versions under node that belong to the cluster.
332 q = ("select population, size, mtime from statistics "
333 "where node = ? and cluster = ?")
334 self.execute(q, (node, cluster))
335 return self.fetchone()
337 def statistics_update(self, node, population, size, mtime, cluster=0):
338 """Update the statistics of the given node.
339 Statistics keep track the population, total
340 size of objects and mtime in the node's namespace.
341 May be zero or positive or negative numbers.
344 qs = ("select population, size from statistics "
345 "where node = ? and cluster = ?")
346 qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
347 "values (?, ?, ?, ?, ?)")
348 self.execute(qs, (node, cluster))
351 prepopulation, presize = (0, 0)
353 prepopulation, presize = r
354 population += prepopulation
356 self.execute(qu, (node, population, size, mtime, cluster))
358 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
359 """Update the statistics of the given node's parent.
360 Then recursively update all parents up to the root.
361 Population is not recursive.
367 props = self.node_get_properties(node)
371 self.statistics_update(parent, population, size, mtime, cluster)
373 population = 0 # Population isn't recursive
375 def statistics_latest(self, node, before=inf, except_cluster=0):
376 """Return population, total size and last mtime
377 for all latest versions under node that
378 do not belong to the cluster.
381 execute = self.execute
382 fetchone = self.fetchone
385 props = self.node_get_properties(node)
390 # The latest version.
391 q = ("select serial, node, size, source, mtime, muser, cluster "
393 "where serial = (select max(serial) "
395 "where node = ? and mtime < ?) "
397 execute(q, (node, before, except_cluster))
403 # First level, just under node (get population).
404 q = ("select count(serial), sum(size), max(mtime) "
406 "where serial = (select max(serial) "
408 "where node = v.node and mtime < ?) "
410 "and node in (select node "
413 execute(q, (before, except_cluster, node))
418 mtime = max(mtime, r[2])
422 # All children (get size and mtime).
423 # XXX: This is why the full path is stored.
424 q = ("select count(serial), sum(size), max(mtime) "
426 "where serial = (select max(serial) "
428 "where node = v.node and mtime < ?) "
430 "and node in (select node "
432 "where path like ?)")
433 execute(q, (before, except_cluster, path + '%'))
437 size = r[1] - props[SIZE]
438 mtime = max(mtime, r[2])
439 return (count, size, mtime)
441 def version_create(self, node, size, source, muser, cluster=0):
442 """Create a new version from the given properties.
443 Return the (serial, mtime) of the new version.
446 q = ("insert into versions (node, size, source, mtime, muser, cluster) "
447 "values (?, ?, ?, ?, ?, ?)")
449 props = (node, size, source, mtime, muser, cluster)
450 serial = self.execute(q, props).lastrowid
451 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
454 def version_lookup(self, node, before=inf, cluster=0):
455 """Lookup the current version of the given node.
456 Return a list with its properties:
457 (serial, node, size, source, mtime, muser, cluster)
458 or None if the current version is not found in the given cluster.
461 q = ("select serial, node, size, source, mtime, muser, cluster "
463 "where serial = (select max(serial) "
465 "where node = ? and mtime < ?) "
467 self.execute(q, (node, before, cluster))
468 props = self.fetchone()
469 if props is not None:
473 def version_get_properties(self, serial, keys=(), propnames=_propnames):
474 """Return a sequence of values for the properties of
475 the version specified by serial and the keys, in the order given.
476 If keys is empty, return all properties in the order
477 (serial, node, size, source, mtime, muser, cluster).
480 q = ("select serial, node, size, source, mtime, muser, cluster "
483 self.execute(q, (serial,))
490 return [r[propnames[k]] for k in keys if k in propnames]
492 def version_recluster(self, serial, cluster):
493 """Move the version into another cluster."""
495 props = self.version_get_properties(serial)
500 oldcluster = props[CLUSTER]
501 if cluster == oldcluster:
505 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
506 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
508 q = "update versions set cluster = ? where serial = ?"
509 self.execute(q, (cluster, serial))
511 def version_remove(self, serial):
512 """Remove the serial specified."""
514 props = self.node_get_properties(serial)
519 cluster = props[CLUSTER]
522 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
524 q = "delete from versions where serial = ?"
525 self.execute(q, (serial,))
528 def attribute_get(self, serial, keys=()):
529 """Return a list of (key, value) pairs of the version specified by serial.
530 If keys is empty, return all attributes.
531 Othwerise, return only those specified.
534 execute = self.execute
536 marks = ','.join('?' for k in keys)
537 q = ("select key, value from attributes "
538 "where key in (%s) and serial = ?" % (marks,))
539 execute(q, keys + (serial,))
541 q = "select key, value from attributes where serial = ?"
542 execute(q, (serial,))
543 return self.fetchall()
545 def attribute_set(self, serial, items):
546 """Set the attributes of the version specified by serial.
547 Receive attributes as an iterable of (key, value) pairs.
550 q = ("insert or replace into attributes (serial, key, value) "
552 self.executemany(q, ((serial, k, v) for k, v in items))
554 def attribute_del(self, serial, keys=()):
555 """Delete attributes of the version specified by serial.
556 If keys is empty, delete all attributes.
557 Otherwise delete those specified.
561 q = "delete from attributes where serial = ? and key = ?"
562 self.executemany(q, ((serial, key) for key in keys))
564 q = "delete from attributes where serial = ?"
565 self.execute(q, (serial,))
567 def attribute_copy(self, source, dest):
568 q = ("insert or replace into attributes "
569 "select ?, key, value from attributes "
571 self.execute(q, (dest, source))
573 def _construct_filters(self, filterq):
577 args = filterq.split(',')
578 subq = " and a.key in ("
579 subq += ','.join(('?' for x in args))
584 def _construct_paths(self, pathq):
589 subq += ' or '.join(('n.path like ?' for x in pathq))
591 args = tuple([x + '%' for x in pathq])
595 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
596 """Return a list with all keys pairs defined
597 for all latest versions under parent that
598 do not belong to the cluster.
601 # TODO: Use another table to store before=inf results.
602 q = ("select distinct a.key "
603 "from attributes a, versions v, nodes n "
604 "where v.serial = (select max(serial) "
606 "where node = v.node and mtime < ?) "
607 "and v.cluster != ? "
608 "and v.node in (select node "
611 "and a.serial = v.serial "
612 "and n.node = v.node")
613 args = (before, except_cluster, parent)
614 subq, subargs = self._construct_paths(pathq)
618 self.execute(q, args)
619 return [r[0] for r in self.fetchall()]
621 def latest_version_list(self, parent, prefix='', delimiter=None,
622 start='', limit=10000, before=inf,
623 except_cluster=0, pathq=[], filterq=None):
624 """Return a (list of (path, serial) tuples, list of common prefixes)
625 for the current versions of the paths with the given parent,
626 matching the following criteria.
628 The property tuple for a version is returned if all
629 of these conditions are true:
635 c. path starts with prefix (and paths in pathq)
637 d. version is the max up to before
639 e. version is not in cluster
641 f. the path does not have the delimiter occuring
642 after the prefix, or ends with the delimiter
644 g. serial matches the attribute filter query.
646 A filter query is a comma-separated list of
647 terms in one of these three forms:
650 an attribute with this key must exist
653 an attribute with this key must not exist
656 the attribute with this key satisfies the value
657 where ?op is one of ==, != <=, >=, <, >.
659 The list of common prefixes includes the prefixes
660 matching up to the first delimiter after prefix,
661 and are reported only once, as "virtual directories".
662 The delimiter is included in the prefixes.
664 If arguments are None, then the corresponding matching rule
667 Limit applies to the first list of tuples returned.
670 execute = self.execute
672 if not start or start < prefix:
673 start = strprevling(prefix)
674 nextling = strnextling(prefix)
676 q = ("select distinct n.path, v.serial "
677 "from attributes a, versions v, nodes n "
678 "where v.serial = (select max(serial) "
680 "where node = v.node and mtime < ?) "
681 "and v.cluster != ? "
682 "and v.node in (select node "
685 "and a.serial = v.serial "
686 "and n.node = v.node "
687 "and n.path > ? and n.path < ?")
688 args = [before, except_cluster, parent, start, nextling]
690 subq, subargs = self._construct_paths(pathq)
694 subq, subargs = self._construct_filters(filterq)
699 q = q.replace("attributes a, ", "")
700 q = q.replace("and a.serial = v.serial ", "")
701 q += " order by n.path"
707 return self.fetchall(), ()
712 fetchone = self.fetchone
714 pappend = prefixes.append
716 mappend = matches.append
724 idx = path.find(delimiter, pfz)
733 if idx + dz == len(path):
736 continue # Get one more, in case there is a path.
742 args[3] = strnextling(pf) # New start.
745 return matches, prefixes