36dfb9a0ea8a44629045e3e5178ab00319191619
[pithos] / snf-pithos-backend / pithos / backends / lib / sqlite / node.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35
36 from dbworker import DBWorker
37
38 from pithos.backends.filter import parse_filters
39
40
41 ROOTNODE  = 0
42
43 ( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
44
45 ( MATCH_PREFIX, MATCH_EXACT ) = range(2)
46
47 inf = float('inf')
48
49
50 def strnextling(prefix):
51     """Return the first unicode string
52        greater than but not starting with given prefix.
53        strnextling('hello') -> 'hellp'
54     """
55     if not prefix:
56         ## all strings start with the null string,
57         ## therefore we have to approximate strnextling('')
58         ## with the last unicode character supported by python
59         ## 0x10ffff for wide (32-bit unicode) python builds
60         ## 0x00ffff for narrow (16-bit unicode) python builds
61         ## We will not autodetect. 0xffff is safe enough.
62         return unichr(0xffff)
63     s = prefix[:-1]
64     c = ord(prefix[-1])
65     if c >= 0xffff:
66         raise RuntimeError
67     s += unichr(c+1)
68     return s
69
70 def strprevling(prefix):
71     """Return an approximation of the last unicode string
72        less than but not starting with given prefix.
73        strprevling(u'hello') -> u'helln\\xffff'
74     """
75     if not prefix:
76         ## There is no prevling for the null string
77         return prefix
78     s = prefix[:-1]
79     c = ord(prefix[-1])
80     if c > 0:
81         s += unichr(c-1) + unichr(0xffff)
82     return s
83
84
85 _propnames = {
86     'serial'    : 0,
87     'node'      : 1,
88     'hash'      : 2,
89     'size'      : 3,
90     'type'      : 4,
91     'source'    : 5,
92     'mtime'     : 6,
93     'muser'     : 7,
94     'uuid'      : 8,
95     'checksum'  : 9,
96     'cluster'   : 10
97 }
98
99
100 class Node(DBWorker):
101     """Nodes store path organization and have multiple versions.
102        Versions store object history and have multiple attributes.
103        Attributes store metadata.
104     """
105     
106     # TODO: Provide an interface for included and excluded clusters.
107     
108     def __init__(self, **params):
109         DBWorker.__init__(self, **params)
110         execute = self.execute
111         
112         execute(""" pragma foreign_keys = on """)
113         
114         execute(""" create table if not exists nodes
115                           ( node       integer primary key,
116                             parent     integer default 0,
117                             path       text    not null default '',
118                             latest_version     integer,
119                             foreign key (parent)
120                             references nodes(node)
121                             on update cascade
122                             on delete cascade ) """)
123         execute(""" create unique index if not exists idx_nodes_path
124                     on nodes(path) """)
125         execute(""" create index if not exists idx_nodes_parent
126                     on nodes(parent) """)
127         
128         execute(""" create table if not exists policy
129                           ( node   integer,
130                             key    text,
131                             value  text,
132                             primary key (node, key)
133                             foreign key (node)
134                             references nodes(node)
135                             on update cascade
136                             on delete cascade ) """)
137         
138         execute(""" create table if not exists statistics
139                           ( node       integer,
140                             population integer not null default 0,
141                             size       integer not null default 0,
142                             mtime      integer,
143                             cluster    integer not null default 0,
144                             primary key (node, cluster)
145                             foreign key (node)
146                             references nodes(node)
147                             on update cascade
148                             on delete cascade ) """)
149         
150         execute(""" create table if not exists versions
151                           ( serial     integer primary key,
152                             node       integer,
153                             hash       text,
154                             size       integer not null default 0,
155                             type       text    not null default '',
156                             source     integer,
157                             mtime      integer,
158                             muser      text    not null default '',
159                             uuid       text    not null default '',
160                             checksum   text    not null default '',
161                             cluster    integer not null default 0,
162                             foreign key (node)
163                             references nodes(node)
164                             on update cascade
165                             on delete cascade ) """)
166         execute(""" create index if not exists idx_versions_node_mtime
167                     on versions(node, mtime) """)
168         execute(""" create index if not exists idx_versions_node_uuid
169                     on versions(uuid) """)
170         execute(""" create index if not exists idx_versions_serial_cluster
171                     on versions(serial, cluster) """)
172         
173         execute(""" create table if not exists attributes
174                           ( serial integer,
175                             domain text,
176                             key    text,
177                             value  text,
178                             primary key (serial, domain, key)
179                             foreign key (serial)
180                             references versions(serial)
181                             on update cascade
182                             on delete cascade ) """)
183         
184         q = "insert or ignore into nodes(node, parent) values (?, ?)"
185         execute(q, (ROOTNODE, ROOTNODE))
186     
187     def node_create(self, parent, path):
188         """Create a new node from the given properties.
189            Return the node identifier of the new node.
190         """
191         
192         q = ("insert into nodes (parent, path) "
193              "values (?, ?)")
194         props = (parent, path)
195         return self.execute(q, props).lastrowid
196     
197     def node_lookup(self, path):
198         """Lookup the current node of the given path.
199            Return None if the path is not found.
200         """
201         
202         q = "select node from nodes where path = ?"
203         self.execute(q, (path,))
204         r = self.fetchone()
205         if r is not None:
206             return r[0]
207         return None
208     
209     def node_lookup_bulk(self, paths):
210         """Lookup the current nodes for the given paths.
211            Return () if the path is not found.
212         """
213         
214         placeholders = ','.join('?' for path in paths)
215         q = "select node from nodes where path in (%s)" % placeholders
216         self.execute(q, paths)
217         r = self.fetchall()
218         if r is not None:
219                 return [row[0] for row in r]
220         return None
221     
222     def node_get_properties(self, node):
223         """Return the node's (parent, path).
224            Return None if the node is not found.
225         """
226         
227         q = "select parent, path from nodes where node = ?"
228         self.execute(q, (node,))
229         return self.fetchone()
230     
231     def node_get_versions(self, node, keys=(), propnames=_propnames):
232         """Return the properties of all versions at node.
233            If keys is empty, return all properties in the order
234            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
235         """
236         
237         q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
238              "from versions "
239              "where node = ?")
240         self.execute(q, (node,))
241         r = self.fetchall()
242         if r is None:
243             return r
244         
245         if not keys:
246             return r
247         return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
248     
249     def node_count_children(self, node):
250         """Return node's child count."""
251         
252         q = "select count(node) from nodes where parent = ? and node != 0"
253         self.execute(q, (node,))
254         r = self.fetchone()
255         if r is None:
256             return 0
257         return r[0]
258     
259     def node_purge_children(self, parent, before=inf, cluster=0):
260         """Delete all versions with the specified
261            parent and cluster, and return
262            the hashes and size of versions deleted.
263            Clears out nodes with no remaining versions.
264         """
265         
266         execute = self.execute
267         q = ("select count(serial), sum(size) from versions "
268              "where node in (select node "
269                             "from nodes "
270                             "where parent = ?) "
271              "and cluster = ? "
272              "and mtime <= ?")
273         args = (parent, cluster, before)
274         execute(q, args)
275         nr, size = self.fetchone()
276         if not nr:
277             return (), 0
278         mtime = time()
279         self.statistics_update(parent, -nr, -size, mtime, cluster)
280         self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
281         
282         q = ("select hash from versions "
283              "where node in (select node "
284                             "from nodes "
285                             "where parent = ?) "
286              "and cluster = ? "
287              "and mtime <= ?")
288         execute(q, args)
289         hashes = [r[0] for r in self.fetchall()]
290         q = ("delete from versions "
291              "where node in (select node "
292                             "from nodes "
293                             "where parent = ?) "
294              "and cluster = ? "
295              "and mtime <= ?")
296         execute(q, args)
297         q = ("delete from nodes "
298              "where node in (select node from nodes n "
299                             "where (select count(serial) "
300                                    "from versions "
301                                    "where node = n.node) = 0 "
302                             "and parent = ?)")
303         execute(q, (parent,))
304         return hashes, size
305     
306     def node_purge(self, node, before=inf, cluster=0):
307         """Delete all versions with the specified
308            node and cluster, and return
309            the hashes and size of versions deleted.
310            Clears out the node if it has no remaining versions.
311         """
312         
313         execute = self.execute
314         q = ("select count(serial), sum(size) from versions "
315              "where node = ? "
316              "and cluster = ? "
317              "and mtime <= ?")
318         args = (node, cluster, before)
319         execute(q, args)
320         nr, size = self.fetchone()
321         if not nr:
322             return (), 0
323         mtime = time()
324         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
325         
326         q = ("select hash from versions "
327              "where node = ? "
328              "and cluster = ? "
329              "and mtime <= ?")
330         execute(q, args)
331         hashes = [r[0] for r in self.fetchall()]
332         q = ("delete from versions "
333              "where node = ? "
334              "and cluster = ? "
335              "and mtime <= ?")
336         execute(q, args)
337         q = ("delete from nodes "
338              "where node in (select node from nodes n "
339                             "where (select count(serial) "
340                                    "from versions "
341                                    "where node = n.node) = 0 "
342                             "and node = ?)")
343         execute(q, (node,))
344         return hashes, size
345     
346     def node_remove(self, node):
347         """Remove the node specified.
348            Return false if the node has children or is not found.
349         """
350         
351         if self.node_count_children(node):
352             return False
353         
354         mtime = time()
355         q = ("select count(serial), sum(size), cluster "
356              "from versions "
357              "where node = ? "
358              "group by cluster")
359         self.execute(q, (node,))
360         for population, size, cluster in self.fetchall():
361             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
362         
363         q = "delete from nodes where node = ?"
364         self.execute(q, (node,))
365         return True
366     
367     def policy_get(self, node):
368         q = "select key, value from policy where node = ?"
369         self.execute(q, (node,))
370         return dict(self.fetchall())
371     
372     def policy_set(self, node, policy):
373         q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
374         self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
375     
376     def statistics_get(self, node, cluster=0):
377         """Return population, total size and last mtime
378            for all versions under node that belong to the cluster.
379         """
380         
381         q = ("select population, size, mtime from statistics "
382              "where node = ? and cluster = ?")
383         self.execute(q, (node, cluster))
384         return self.fetchone()
385     
386     def statistics_update(self, node, population, size, mtime, cluster=0):
387         """Update the statistics of the given node.
388            Statistics keep track the population, total
389            size of objects and mtime in the node's namespace.
390            May be zero or positive or negative numbers.
391         """
392         
393         qs = ("select population, size from statistics "
394               "where node = ? and cluster = ?")
395         qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
396               "values (?, ?, ?, ?, ?)")
397         self.execute(qs, (node, cluster))
398         r = self.fetchone()
399         if r is None:
400             prepopulation, presize = (0, 0)
401         else:
402             prepopulation, presize = r
403         population += prepopulation
404         size += presize
405         self.execute(qu, (node, population, size, mtime, cluster))
406     
407     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
408         """Update the statistics of the given node's parent.
409            Then recursively update all parents up to the root.
410            Population is not recursive.
411         """
412         
413         while True:
414             if node == 0:
415                 break
416             props = self.node_get_properties(node)
417             if props is None:
418                 break
419             parent, path = props
420             self.statistics_update(parent, population, size, mtime, cluster)
421             node = parent
422             population = 0 # Population isn't recursive
423     
424     def statistics_latest(self, node, before=inf, except_cluster=0):
425         """Return population, total size and last mtime
426            for all latest versions under node that
427            do not belong to the cluster.
428         """
429         
430         execute = self.execute
431         fetchone = self.fetchone
432         
433         # The node.
434         props = self.node_get_properties(node)
435         if props is None:
436             return None
437         parent, path = props
438         
439         # The latest version.
440         q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
441              "from versions v "
442              "where serial = %s "
443              "and cluster != ?")
444         subq, args = self._construct_latest_version_subquery(node=node, before=before)
445         execute(q % subq, args + [except_cluster])
446         props = fetchone()
447         if props is None:
448             return None
449         mtime = props[MTIME]
450         
451         # First level, just under node (get population).
452         q = ("select count(serial), sum(size), max(mtime) "
453              "from versions v "
454              "where serial = %s "
455              "and cluster != ? "
456              "and node in (select node "
457                           "from nodes "
458                           "where parent = ?)")
459         subq, args = self._construct_latest_version_subquery(node=None, before=before)
460         execute(q % subq, args + [except_cluster, node])
461         r = fetchone()
462         if r is None:
463             return None
464         count = r[0]
465         mtime = max(mtime, r[2])
466         if count == 0:
467             return (0, 0, mtime)
468         
469         # All children (get size and mtime).
470         # This is why the full path is stored.
471         q = ("select count(serial), sum(size), max(mtime) "
472              "from versions v "
473              "where serial = %s "
474              "and cluster != ? "
475              "and node in (select node "
476                           "from nodes "
477                           "where path like ? escape '\\')")
478         subq, args = self._construct_latest_version_subquery(node=None, before=before)
479         execute(q % subq, args + [except_cluster, self.escape_like(path) + '%'])
480         r = fetchone()
481         if r is None:
482             return None
483         size = r[1] - props[SIZE]
484         mtime = max(mtime, r[2])
485         return (count, size, mtime)
486     
487     def nodes_set_latest_version(self, node, serial):
488         q = ("update nodes set latest_version = ? where node = ?")
489         props = (serial, node)
490         self.execute(q, props)
491     
492     def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
493         """Create a new version from the given properties.
494            Return the (serial, mtime) of the new version.
495         """
496         
497         q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
498              "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
499         mtime = time()
500         props = (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
501         serial = self.execute(q, props).lastrowid
502         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
503         
504         self.nodes_set_latest_version(node, serial)
505         
506         return serial, mtime
507     
508     def version_lookup(self, node, before=inf, cluster=0, all_props=True):
509         """Lookup the current version of the given node.
510            Return a list with its properties:
511            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
512            or None if the current version is not found in the given cluster.
513         """
514         
515         q = ("select %s "
516              "from versions v "
517              "where serial = %s "
518              "and cluster = ?")
519         subq, args = self._construct_latest_version_subquery(node=node, before=before)
520         if not all_props:
521             q = q % ("serial", subq)
522         else:
523             q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
524         
525         self.execute(q, args + [cluster])
526         props = self.fetchone()
527         if props is not None:
528             return props
529         return None
530
531     def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
532         """Lookup the current versions of the given nodes.
533            Return a list with their properties:
534            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
535         """
536         
537         if not nodes:
538                 return ()
539         q = ("select %s "
540              "from versions "
541              "where serial in %s "
542              "and cluster = ? %s")
543         subq, args = self._construct_latest_versions_subquery(nodes=nodes, before = before)
544         if not all_props:
545             q = q % ("serial", subq, '')
546         else:
547             q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster",  subq, 'order by node')
548         
549         args += [cluster]
550         self.execute(q, args)
551         return self.fetchall()
552     
553     def version_get_properties(self, serial, keys=(), propnames=_propnames):
554         """Return a sequence of values for the properties of
555            the version specified by serial and the keys, in the order given.
556            If keys is empty, return all properties in the order
557            (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
558         """
559         
560         q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
561              "from versions "
562              "where serial = ?")
563         self.execute(q, (serial,))
564         r = self.fetchone()
565         if r is None:
566             return r
567         
568         if not keys:
569             return r
570         return [r[propnames[k]] for k in keys if k in propnames]
571     
572     def version_put_property(self, serial, key, value):
573         """Set value for the property of version specified by key."""
574         
575         if key not in _propnames:
576             return
577         q = "update versions set %s = ? where serial = ?" % key
578         self.execute(q, (value, serial))
579     
580     def version_recluster(self, serial, cluster):
581         """Move the version into another cluster."""
582         
583         props = self.version_get_properties(serial)
584         if not props:
585             return
586         node = props[NODE]
587         size = props[SIZE]
588         oldcluster = props[CLUSTER]
589         if cluster == oldcluster:
590             return
591         
592         mtime = time()
593         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
594         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
595         
596         q = "update versions set cluster = ? where serial = ?"
597         self.execute(q, (cluster, serial))
598     
599     def version_remove(self, serial):
600         """Remove the serial specified."""
601         
602         props = self.version_get_properties(serial)
603         if not props:
604             return
605         node = props[NODE]
606         hash = props[HASH]
607         size = props[SIZE]
608         cluster = props[CLUSTER]
609         
610         mtime = time()
611         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
612         
613         q = "delete from versions where serial = ?"
614         self.execute(q, (serial,))
615         
616         props = self.version_lookup(node, cluster=cluster, all_props=False)
617         if props:
618                 self.nodes_set_latest_version(node, props[0])
619         return hash, size
620     
621     def attribute_get(self, serial, domain, keys=()):
622         """Return a list of (key, value) pairs of the version specified by serial.
623            If keys is empty, return all attributes.
624            Othwerise, return only those specified.
625         """
626         
627         execute = self.execute
628         if keys:
629             marks = ','.join('?' for k in keys)
630             q = ("select key, value from attributes "
631                  "where key in (%s) and serial = ? and domain = ?" % (marks,))
632             execute(q, keys + (serial, domain))
633         else:
634             q = "select key, value from attributes where serial = ? and domain = ?"
635             execute(q, (serial, domain))
636         return self.fetchall()
637     
638     def attribute_set(self, serial, domain, items):
639         """Set the attributes of the version specified by serial.
640            Receive attributes as an iterable of (key, value) pairs.
641         """
642         
643         q = ("insert or replace into attributes (serial, domain, key, value) "
644              "values (?, ?, ?, ?)")
645         self.executemany(q, ((serial, domain, k, v) for k, v in items))
646     
647     def attribute_del(self, serial, domain, keys=()):
648         """Delete attributes of the version specified by serial.
649            If keys is empty, delete all attributes.
650            Otherwise delete those specified.
651         """
652         
653         if keys:
654             q = "delete from attributes where serial = ? and domain = ? and key = ?"
655             self.executemany(q, ((serial, domain, key) for key in keys))
656         else:
657             q = "delete from attributes where serial = ? and domain = ?"
658             self.execute(q, (serial, domain))
659     
660     def attribute_copy(self, source, dest):
661         q = ("insert or replace into attributes "
662              "select ?, domain, key, value from attributes "
663              "where serial = ?")
664         self.execute(q, (dest, source))
665     
666     def _construct_filters(self, domain, filterq):
667         if not domain or not filterq:
668             return None, None
669         
670         subqlist = []
671         append = subqlist.append
672         included, excluded, opers = parse_filters(filterq)
673         args = []
674         
675         if included:
676             subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
677             subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
678             subq += ")"
679             args += [domain]
680             args += included
681             append(subq)
682         
683         if excluded:
684             subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
685             subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
686             subq += ")"
687             args += [domain]
688             args += excluded
689             append(subq)
690         
691         if opers:
692             for k, o, v in opers:
693                 subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
694                 subq += "key = ? and value %s ?" % (o,)
695                 subq += ")"
696                 args += [domain, k, v]
697                 append(subq)
698         
699         if not subqlist:
700             return None, None
701         
702         subq = ' and ' + ' and '.join(subqlist)
703         
704         return subq, args
705     
706     def _construct_paths(self, pathq):
707         if not pathq:
708             return None, None
709         
710         subqlist = []
711         args = []
712         for path, match in pathq:
713             if match == MATCH_PREFIX:
714                 subqlist.append("n.path like ? escape '\\'")
715                 args.append(self.escape_like(path) + '%')
716             elif match == MATCH_EXACT:
717                 subqlist.append("n.path = ?")
718                 args.append(path)
719         
720         subq = ' and (' + ' or '.join(subqlist) + ')'
721         args = tuple(args)
722         
723         return subq, args
724     
725     def _construct_size(self, sizeq):
726         if not sizeq or len(sizeq) != 2:
727             return None, None
728         
729         subq = ''
730         args = []
731         if sizeq[0]:
732             subq += " and v.size >= ?"
733             args += [sizeq[0]]
734         if sizeq[1]:
735             subq += " and v.size < ?"
736             args += [sizeq[1]]
737         
738         return subq, args
739     
740     def _construct_versions_nodes_latest_version_subquery(self, before=inf):
741         if before == inf:
742             q = ("n.latest_version ")
743             args = []
744         else:
745             q = ("(select max(serial) "
746                                    "from versions "
747                                    "where node = v.node and mtime < ?) ")
748             args = [before]
749         return q, args
750     
751     def _construct_latest_version_subquery(self, node=None, before=inf):
752         where_cond = "node = v.node"
753         args = []
754         if node:
755             where_cond = "node = ? "
756             args = [node]
757         
758         if before == inf:
759             q = ("(select latest_version "
760                    "from nodes "
761                    "where %s) ")
762         else:
763             q = ("(select max(serial) "
764                    "from versions "
765                    "where %s and mtime < ?) ")
766             args += [before]
767         return q % where_cond, args
768     
769     def _construct_latest_versions_subquery(self, nodes=(), before=inf):
770         where_cond = ""
771         args = []
772         if nodes:
773             where_cond = "node in (%s) " % ','.join('?' for node in nodes)
774             args = nodes
775         
776         if before == inf:
777             q = ("(select latest_version "
778                    "from nodes "
779                    "where %s ) ")
780         else:
781             q = ("(select max(serial) "
782                                  "from versions "
783                                  "where %s and mtime < ? group by node) ")
784             args += [before]
785         return q % where_cond, args
786     
787     def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
788         """Return a list with all keys pairs defined
789            for all latest versions under parent that
790            do not belong to the cluster.
791         """
792         
793         # TODO: Use another table to store before=inf results.
794         q = ("select distinct a.key "
795              "from attributes a, versions v, nodes n "
796              "where v.serial = %s "
797              "and v.cluster != ? "
798              "and v.node in (select node "
799                             "from nodes "
800                             "where parent = ?) "
801              "and a.serial = v.serial "
802              "and a.domain = ? "
803              "and n.node = v.node")
804         subq, subargs = self._construct_latest_version_subquery(node=None, before=before)
805         args = subargs + [except_cluster, parent, domain]
806         q = q % subq
807         subq, subargs = self._construct_paths(pathq)
808         if subq is not None:
809             q += subq
810             args += subargs
811         self.execute(q, args)
812         return [r[0] for r in self.fetchall()]
813     
814     def latest_version_list(self, parent, prefix='', delimiter=None,
815                             start='', limit=10000, before=inf,
816                             except_cluster=0, pathq=[], domain=None,
817                             filterq=[], sizeq=None, all_props=False):
818         """Return a (list of (path, serial) tuples, list of common prefixes)
819            for the current versions of the paths with the given parent,
820            matching the following criteria.
821            
822            The property tuple for a version is returned if all
823            of these conditions are true:
824                 
825                 a. parent matches
826                 
827                 b. path > start
828                 
829                 c. path starts with prefix (and paths in pathq)
830                 
831                 d. version is the max up to before
832                 
833                 e. version is not in cluster
834                 
835                 f. the path does not have the delimiter occuring
836                    after the prefix, or ends with the delimiter
837                 
838                 g. serial matches the attribute filter query.
839                    
840                    A filter query is a comma-separated list of
841                    terms in one of these three forms:
842                    
843                    key
844                        an attribute with this key must exist
845                    
846                    !key
847                        an attribute with this key must not exist
848                    
849                    key ?op value
850                        the attribute with this key satisfies the value
851                        where ?op is one of =, != <=, >=, <, >.
852                 
853                 h. the size is in the range set by sizeq
854            
855            The list of common prefixes includes the prefixes
856            matching up to the first delimiter after prefix,
857            and are reported only once, as "virtual directories".
858            The delimiter is included in the prefixes.
859            
860            If arguments are None, then the corresponding matching rule
861            will always match.
862            
863            Limit applies to the first list of tuples returned.
864            
865            If all_props is True, return all properties after path, not just serial.
866         """
867         
868         execute = self.execute
869         
870         if not start or start < prefix:
871             start = strprevling(prefix)
872         nextling = strnextling(prefix)
873         
874         q = ("select distinct n.path, %s "
875              "from versions v, nodes n "
876              "where v.serial = %s "
877              "and v.cluster != ? "
878              "and v.node in (select node "
879                             "from nodes "
880                             "where parent = ?) "
881              "and n.node = v.node "
882              "and n.path > ? and n.path < ?")
883         subq, args = self._construct_versions_nodes_latest_version_subquery(before)
884         if not all_props:
885             q = q % ("v.serial", subq)
886         else:
887             q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
888         args += [except_cluster, parent, start, nextling]
889         start_index = len(args) - 2
890         
891         subq, subargs = self._construct_paths(pathq)
892         if subq is not None:
893             q += subq
894             args += subargs
895         subq, subargs = self._construct_size(sizeq)
896         if subq is not None:
897             q += subq
898             args += subargs
899         subq, subargs = self._construct_filters(domain, filterq)
900         if subq is not None:
901             q += subq
902             args += subargs
903         else:
904             q = q.replace("attributes a, ", "")
905             q = q.replace("and a.serial = v.serial ", "")
906         q += " order by n.path"
907         
908         if not delimiter:
909             q += " limit ?"
910             args.append(limit)
911             execute(q, args)
912             return self.fetchall(), ()
913         
914         pfz = len(prefix)
915         dz = len(delimiter)
916         count = 0
917         fetchone = self.fetchone
918         prefixes = []
919         pappend = prefixes.append
920         matches = []
921         mappend = matches.append
922         
923         execute(q, args)
924         while True:
925             props = fetchone()
926             if props is None:
927                 break
928             path = props[0]
929             serial = props[1]
930             idx = path.find(delimiter, pfz)
931             
932             if idx < 0:
933                 mappend(props)
934                 count += 1
935                 if count >= limit:
936                     break
937                 continue
938             
939             if idx + dz == len(path):
940                 mappend(props)
941                 count += 1
942                 continue # Get one more, in case there is a path.
943             pf = path[:idx + dz]
944             pappend(pf)
945             if count >= limit: 
946                 break
947             
948             args[start_index] = strnextling(pf) # New start.
949             execute(q, args)
950         
951         return matches, prefixes
952     
953     def latest_uuid(self, uuid):
954         """Return a (path, serial) tuple, for the latest version of the given uuid."""
955         
956         q = ("select n.path, v.serial "
957              "from versions v, nodes n "
958              "where v.serial = (select max(serial) "
959                                "from versions "
960                                "where uuid = ?) "
961              "and n.node = v.node")
962         self.execute(q, (uuid,))
963         return self.fetchone()