1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 from dbworker import DBWorker
41 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
46 def strnextling(prefix):
47 """Return the first unicode string
48 greater than but not starting with given prefix.
49 strnextling('hello') -> 'hellp'
52 ## all strings start with the null string,
53 ## therefore we have to approximate strnextling('')
54 ## with the last unicode character supported by python
55 ## 0x10ffff for wide (32-bit unicode) python builds
56 ## 0x00ffff for narrow (16-bit unicode) python builds
57 ## We will not autodetect. 0xffff is safe enough.
66 def strprevling(prefix):
67 """Return an approximation of the last unicode string
68 less than but not starting with given prefix.
69 strprevling(u'hello') -> u'helln\\xffff'
72 ## There is no prevling for the null string
77 s += unichr(c-1) + unichr(0xffff)
94 """Nodes store path organization and have multiple versions.
95 Versions store object history and have multiple attributes.
96 Attributes store metadata.
99 # TODO: Provide an interface for included and excluded clusters.
101 def __init__(self, **params):
102 DBWorker.__init__(self, **params)
103 execute = self.execute
105 execute(""" pragma foreign_keys = on """)
107 execute(""" create table if not exists nodes
108 ( node integer primary key,
109 parent integer default 0,
110 path text not null default '',
112 references nodes(node)
114 on delete cascade )""")
115 execute(""" create unique index if not exists idx_nodes_path
118 execute(""" create table if not exists statistics
120 population integer not null default 0,
121 size integer not null default 0,
123 cluster integer not null default 0,
124 primary key (node, cluster)
126 references nodes(node)
128 on delete cascade )""")
130 execute(""" create table if not exists versions
131 ( serial integer primary key,
134 size integer not null default 0,
137 muser text not null default '',
138 cluster integer not null default 0,
140 references nodes(node)
142 on delete cascade ) """)
143 execute(""" create index if not exists idx_versions_node_mtime
144 on versions(node, mtime) """)
146 execute(""" create table if not exists attributes
150 primary key (serial, key)
152 references versions(serial)
154 on delete cascade ) """)
156 q = "insert or ignore into nodes(node, parent) values (?, ?)"
157 execute(q, (ROOTNODE, ROOTNODE))
159 def node_create(self, parent, path):
160 """Create a new node from the given properties.
161 Return the node identifier of the new node.
164 q = ("insert into nodes (parent, path) "
166 props = (parent, path)
167 return self.execute(q, props).lastrowid
169 def node_lookup(self, path):
170 """Lookup the current node of the given path.
171 Return None if the path is not found.
174 q = "select node from nodes where path = ?"
175 self.execute(q, (path,))
181 def node_get_properties(self, node):
182 """Return the node's (parent, path).
183 Return None if the node is not found.
186 q = "select parent, path from nodes where node = ?"
187 self.execute(q, (node,))
188 return self.fetchone()
190 def node_get_versions(self, node, keys=(), propnames=_propnames):
191 """Return the properties of all versions at node.
192 If keys is empty, return all properties in the order
193 (serial, node, size, source, mtime, muser, cluster).
196 q = ("select serial, node, hash, size, source, mtime, muser, cluster "
199 self.execute(q, (node,))
206 return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
208 def node_count_children(self, node):
209 """Return node's child count."""
211 q = "select count(node) from nodes where parent = ? and node != 0"
212 self.execute(q, (node,))
218 def node_purge_children(self, parent, before=inf, cluster=0):
219 """Delete all versions with the specified
220 parent and cluster, and return
221 the serials of versions deleted.
222 Clears out nodes with no remaining versions.
225 execute = self.execute
226 q = ("select count(serial), sum(size) from versions "
227 "where node in (select node "
232 args = (parent, cluster, before)
234 nr, size = self.fetchone()
238 self.statistics_update(parent, -nr, -size, mtime, cluster)
239 self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
241 q = ("select serial from versions "
242 "where node in (select node "
248 serials = [r[SERIAL] for r in self.fetchall()]
249 q = ("delete from versions "
250 "where node in (select node "
256 q = ("delete from nodes "
257 "where node in (select node from nodes n "
258 "where (select count(serial) "
260 "where node = n.node) = 0 "
262 execute(q, (parent,))
265 def node_purge(self, node, before=inf, cluster=0):
266 """Delete all versions with the specified
267 node and cluster, and return
268 the serials of versions deleted.
269 Clears out the node if it has no remaining versions.
272 execute = self.execute
273 q = ("select count(serial), sum(size) from versions "
277 args = (node, cluster, before)
279 nr, size = self.fetchone()
283 self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
285 q = ("select serial from versions "
290 serials = [r[SERIAL] for r in self.fetchall()]
291 q = ("delete from versions "
296 q = ("delete from nodes "
297 "where node in (select node from nodes n "
298 "where (select count(serial) "
300 "where node = n.node) = 0 "
305 def node_remove(self, node):
306 """Remove the node specified.
307 Return false if the node has children or is not found.
310 if self.node_count_children(node):
314 q = ("select count(serial), sum(size), cluster "
318 self.execute(q, (node,))
319 for population, size, cluster in self.fetchall():
320 self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
322 q = "delete from nodes where node = ?"
323 self.execute(q, (node,))
326 def statistics_get(self, node, cluster=0):
327 """Return population, total size and last mtime
328 for all versions under node that belong to the cluster.
331 q = ("select population, size, mtime from statistics "
332 "where node = ? and cluster = ?")
333 self.execute(q, (node, cluster))
334 return self.fetchone()
336 def statistics_update(self, node, population, size, mtime, cluster=0):
337 """Update the statistics of the given node.
338 Statistics keep track the population, total
339 size of objects and mtime in the node's namespace.
340 May be zero or positive or negative numbers.
343 qs = ("select population, size from statistics "
344 "where node = ? and cluster = ?")
345 qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
346 "values (?, ?, ?, ?, ?)")
347 self.execute(qs, (node, cluster))
350 prepopulation, presize = (0, 0)
352 prepopulation, presize = r
353 population += prepopulation
355 self.execute(qu, (node, population, size, mtime, cluster))
357 def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
358 """Update the statistics of the given node's parent.
359 Then recursively update all parents up to the root.
360 Population is not recursive.
366 props = self.node_get_properties(node)
370 self.statistics_update(parent, population, size, mtime, cluster)
372 population = 0 # Population isn't recursive
374 def statistics_latest(self, node, before=inf, except_cluster=0):
375 """Return population, total size and last mtime
376 for all latest versions under node that
377 do not belong to the cluster.
380 execute = self.execute
381 fetchone = self.fetchone
384 props = self.node_get_properties(node)
389 # The latest version.
390 q = ("select serial, node, hash, size, source, mtime, muser, cluster "
392 "where serial = (select max(serial) "
394 "where node = ? and mtime < ?) "
396 execute(q, (node, before, except_cluster))
402 # First level, just under node (get population).
403 q = ("select count(serial), sum(size), max(mtime) "
405 "where serial = (select max(serial) "
407 "where node = v.node and mtime < ?) "
409 "and node in (select node "
412 execute(q, (before, except_cluster, node))
417 mtime = max(mtime, r[2])
421 # All children (get size and mtime).
422 # XXX: This is why the full path is stored.
423 q = ("select count(serial), sum(size), max(mtime) "
425 "where serial = (select max(serial) "
427 "where node = v.node and mtime < ?) "
429 "and node in (select node "
431 "where path like ?)")
432 execute(q, (before, except_cluster, path + '%'))
436 size = r[1] - props[SIZE]
437 mtime = max(mtime, r[2])
438 return (count, size, mtime)
440 def version_create(self, node, hash, size, source, muser, cluster=0):
441 """Create a new version from the given properties.
442 Return the (serial, mtime) of the new version.
445 q = ("insert into versions (node, hash, size, source, mtime, muser, cluster) "
446 "values (?, ?, ?, ?, ?, ?, ?)")
448 props = (node, hash, size, source, mtime, muser, cluster)
449 serial = self.execute(q, props).lastrowid
450 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
453 def version_lookup(self, node, before=inf, cluster=0):
454 """Lookup the current version of the given node.
455 Return a list with its properties:
456 (serial, node, hash, size, source, mtime, muser, cluster)
457 or None if the current version is not found in the given cluster.
460 q = ("select serial, node, hash, size, source, mtime, muser, cluster "
462 "where serial = (select max(serial) "
464 "where node = ? and mtime < ?) "
466 self.execute(q, (node, before, cluster))
467 props = self.fetchone()
468 if props is not None:
472 def version_get_properties(self, serial, keys=(), propnames=_propnames):
473 """Return a sequence of values for the properties of
474 the version specified by serial and the keys, in the order given.
475 If keys is empty, return all properties in the order
476 (serial, node, hash, size, source, mtime, muser, cluster).
479 q = ("select serial, node, hash, size, source, mtime, muser, cluster "
482 self.execute(q, (serial,))
489 return [r[propnames[k]] for k in keys if k in propnames]
491 def version_recluster(self, serial, cluster):
492 """Move the version into another cluster."""
494 props = self.version_get_properties(serial)
499 oldcluster = props[CLUSTER]
500 if cluster == oldcluster:
504 self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
505 self.statistics_update_ancestors(node, 1, size, mtime, cluster)
507 q = "update versions set cluster = ? where serial = ?"
508 self.execute(q, (cluster, serial))
510 def version_remove(self, serial):
511 """Remove the serial specified."""
513 props = self.node_get_properties(serial)
518 cluster = props[CLUSTER]
521 self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
523 q = "delete from versions where serial = ?"
524 self.execute(q, (serial,))
527 def attribute_get(self, serial, keys=()):
528 """Return a list of (key, value) pairs of the version specified by serial.
529 If keys is empty, return all attributes.
530 Othwerise, return only those specified.
533 execute = self.execute
535 marks = ','.join('?' for k in keys)
536 q = ("select key, value from attributes "
537 "where key in (%s) and serial = ?" % (marks,))
538 execute(q, keys + (serial,))
540 q = "select key, value from attributes where serial = ?"
541 execute(q, (serial,))
542 return self.fetchall()
544 def attribute_set(self, serial, items):
545 """Set the attributes of the version specified by serial.
546 Receive attributes as an iterable of (key, value) pairs.
549 q = ("insert or replace into attributes (serial, key, value) "
551 self.executemany(q, ((serial, k, v) for k, v in items))
553 def attribute_del(self, serial, keys=()):
554 """Delete attributes of the version specified by serial.
555 If keys is empty, delete all attributes.
556 Otherwise delete those specified.
560 q = "delete from attributes where serial = ? and key = ?"
561 self.executemany(q, ((serial, key) for key in keys))
563 q = "delete from attributes where serial = ?"
564 self.execute(q, (serial,))
566 def attribute_copy(self, source, dest):
567 q = ("insert or replace into attributes "
568 "select ?, key, value from attributes "
570 self.execute(q, (dest, source))
572 def _construct_filters(self, filterq):
576 args = filterq.split(',')
577 subq = " and a.key in ("
578 subq += ','.join(('?' for x in args))
583 def _construct_paths(self, pathq):
588 subq += ' or '.join(('n.path like ?' for x in pathq))
590 args = tuple([x + '%' for x in pathq])
594 def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
595 """Return a list with all keys pairs defined
596 for all latest versions under parent that
597 do not belong to the cluster.
600 # TODO: Use another table to store before=inf results.
601 q = ("select distinct a.key "
602 "from attributes a, versions v, nodes n "
603 "where v.serial = (select max(serial) "
605 "where node = v.node and mtime < ?) "
606 "and v.cluster != ? "
607 "and v.node in (select node "
610 "and a.serial = v.serial "
611 "and n.node = v.node")
612 args = (before, except_cluster, parent)
613 subq, subargs = self._construct_paths(pathq)
617 self.execute(q, args)
618 return [r[0] for r in self.fetchall()]
620 def latest_version_list(self, parent, prefix='', delimiter=None,
621 start='', limit=10000, before=inf,
622 except_cluster=0, pathq=[], filterq=None):
623 """Return a (list of (path, serial) tuples, list of common prefixes)
624 for the current versions of the paths with the given parent,
625 matching the following criteria.
627 The property tuple for a version is returned if all
628 of these conditions are true:
634 c. path starts with prefix (and paths in pathq)
636 d. version is the max up to before
638 e. version is not in cluster
640 f. the path does not have the delimiter occuring
641 after the prefix, or ends with the delimiter
643 g. serial matches the attribute filter query.
645 A filter query is a comma-separated list of
646 terms in one of these three forms:
649 an attribute with this key must exist
652 an attribute with this key must not exist
655 the attribute with this key satisfies the value
656 where ?op is one of ==, != <=, >=, <, >.
658 The list of common prefixes includes the prefixes
659 matching up to the first delimiter after prefix,
660 and are reported only once, as "virtual directories".
661 The delimiter is included in the prefixes.
663 If arguments are None, then the corresponding matching rule
666 Limit applies to the first list of tuples returned.
669 execute = self.execute
671 if not start or start < prefix:
672 start = strprevling(prefix)
673 nextling = strnextling(prefix)
675 q = ("select distinct n.path, v.serial "
676 "from attributes a, versions v, nodes n "
677 "where v.serial = (select max(serial) "
679 "where node = v.node and mtime < ?) "
680 "and v.cluster != ? "
681 "and v.node in (select node "
684 "and a.serial = v.serial "
685 "and n.node = v.node "
686 "and n.path > ? and n.path < ?")
687 args = [before, except_cluster, parent, start, nextling]
689 subq, subargs = self._construct_paths(pathq)
693 subq, subargs = self._construct_filters(filterq)
698 q = q.replace("attributes a, ", "")
699 q = q.replace("and a.serial = v.serial ", "")
700 q += " order by n.path"
706 return self.fetchall(), ()
711 fetchone = self.fetchone
713 pappend = prefixes.append
715 mappend = matches.append
723 idx = path.find(delimiter, pfz)
732 if idx + dz == len(path):
735 continue # Get one more, in case there is a path.
741 args[3] = strnextling(pf) # New start.
744 return matches, prefixes