Escape catch-all characters in LIKE queries.
[pithos] / pithos / backends / lib / sqlite / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35
36 from dbworker import DBWorker
37
38
39 ROOTNODE  = 0
40
41 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
42
43 inf = float('inf')
44
45
46 def strnextling(prefix):
47     """Return the first unicode string
48        greater than but not starting with given prefix.
49        strnextling('hello') -> 'hellp'
50     """
51     if not prefix:
52         ## all strings start with the null string,
53         ## therefore we have to approximate strnextling('')
54         ## with the last unicode character supported by python
55         ## 0x10ffff for wide (32-bit unicode) python builds
56         ## 0x00ffff for narrow (16-bit unicode) python builds
57         ## We will not autodetect. 0xffff is safe enough.
58         return unichr(0xffff)
59     s = prefix[:-1]
60     c = ord(prefix[-1])
61     if c >= 0xffff:
62         raise RuntimeError
63     s += unichr(c+1)
64     return s
65
66 def strprevling(prefix):
67     """Return an approximation of the last unicode string
68        less than but not starting with given prefix.
69        strprevling(u'hello') -> u'helln\\xffff'
70     """
71     if not prefix:
72         ## There is no prevling for the null string
73         return prefix
74     s = prefix[:-1]
75     c = ord(prefix[-1])
76     if c > 0:
77         s += unichr(c-1) + unichr(0xffff)
78     return s
79
80
81 _propnames = {
82     'serial'    : 0,
83     'node'      : 1,
84     'hash'      : 2,
85     'size'      : 3,
86     'source'    : 4,
87     'mtime'     : 5,
88     'muser'     : 6,
89     'cluster'   : 7,
90 }
91
92
93 class Node(DBWorker):
94     """Nodes store path organization and have multiple versions.
95        Versions store object history and have multiple attributes.
96        Attributes store metadata.
97     """
98     
99     # TODO: Provide an interface for included and excluded clusters.
100     
101     def __init__(self, **params):
102         DBWorker.__init__(self, **params)
103         execute = self.execute
104         
105         execute(""" pragma foreign_keys = on """)
106         
107         execute(""" create table if not exists nodes
108                           ( node       integer primary key,
109                             parent     integer default 0,
110                             path       text    not null default '',
111                             foreign key (parent)
112                             references nodes(node)
113                             on update cascade
114                             on delete cascade ) """)
115         execute(""" create unique index if not exists idx_nodes_path
116                     on nodes(path) """)
117         
118         execute(""" create table if not exists policy
119                           ( node   integer,
120                             key    text,
121                             value  text,
122                             primary key (node, key)
123                             foreign key (node)
124                             references nodes(node)
125                             on update cascade
126                             on delete cascade ) """)
127         
128         execute(""" create table if not exists statistics
129                           ( node       integer,
130                             population integer not null default 0,
131                             size       integer not null default 0,
132                             mtime      integer,
133                             cluster    integer not null default 0,
134                             primary key (node, cluster)
135                             foreign key (node)
136                             references nodes(node)
137                             on update cascade
138                             on delete cascade ) """)
139         
140         execute(""" create table if not exists versions
141                           ( serial     integer primary key,
142                             node       integer,
143                             hash       text,
144                             size       integer not null default 0,
145                             source     integer,
146                             mtime      integer,
147                             muser      text    not null default '',
148                             cluster    integer not null default 0,
149                             foreign key (node)
150                             references nodes(node)
151                             on update cascade
152                             on delete cascade ) """)
153         execute(""" create index if not exists idx_versions_node_mtime
154                     on versions(node, mtime) """)
155         
156         execute(""" create table if not exists attributes
157                           ( serial integer,
158                             key    text,
159                             value  text,
160                             primary key (serial, key)
161                             foreign key (serial)
162                             references versions(serial)
163                             on update cascade
164                             on delete cascade ) """)
165         
166         q = "insert or ignore into nodes(node, parent) values (?, ?)"
167         execute(q, (ROOTNODE, ROOTNODE))
168     
169     def node_create(self, parent, path):
170         """Create a new node from the given properties.
171            Return the node identifier of the new node.
172         """
173         
174         q = ("insert into nodes (parent, path) "
175              "values (?, ?)")
176         props = (parent, path)
177         return self.execute(q, props).lastrowid
178     
179     def node_lookup(self, path):
180         """Lookup the current node of the given path.
181            Return None if the path is not found.
182         """
183         
184         q = "select node from nodes where path = ?"
185         self.execute(q, (path,))
186         r = self.fetchone()
187         if r is not None:
188             return r[0]
189         return None
190     
191     def node_get_properties(self, node):
192         """Return the node's (parent, path).
193            Return None if the node is not found.
194         """
195         
196         q = "select parent, path from nodes where node = ?"
197         self.execute(q, (node,))
198         return self.fetchone()
199     
200     def node_get_versions(self, node, keys=(), propnames=_propnames):
201         """Return the properties of all versions at node.
202            If keys is empty, return all properties in the order
203            (serial, node, size, source, mtime, muser, cluster).
204         """
205         
206         q = ("select serial, node, hash, size, source, mtime, muser, cluster "
207              "from versions "
208              "where node = ?")
209         self.execute(q, (node,))
210         r = self.fetchall()
211         if r is None:
212             return r
213         
214         if not keys:
215             return r
216         return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
217     
218     def node_count_children(self, node):
219         """Return node's child count."""
220         
221         q = "select count(node) from nodes where parent = ? and node != 0"
222         self.execute(q, (node,))
223         r = self.fetchone()
224         if r is None:
225             return 0
226         return r[0]
227     
228     def node_purge_children(self, parent, before=inf, cluster=0):
229         """Delete all versions with the specified
230            parent and cluster, and return
231            the hashes of versions deleted.
232            Clears out nodes with no remaining versions.
233         """
234         
235         execute = self.execute
236         q = ("select count(serial), sum(size) from versions "
237              "where node in (select node "
238                             "from nodes "
239                             "where parent = ?) "
240              "and cluster = ? "
241              "and mtime <= ?")
242         args = (parent, cluster, before)
243         execute(q, args)
244         nr, size = self.fetchone()
245         if not nr:
246             return ()
247         mtime = time()
248         self.statistics_update(parent, -nr, -size, mtime, cluster)
249         self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
250         
251         q = ("select hash from versions "
252              "where node in (select node "
253                             "from nodes "
254                             "where parent = ?) "
255              "and cluster = ? "
256              "and mtime <= ?")
257         execute(q, args)
258         hashes = [r[0] for r in self.fetchall()]
259         q = ("delete from versions "
260              "where node in (select node "
261                             "from nodes "
262                             "where parent = ?) "
263              "and cluster = ? "
264              "and mtime <= ?")
265         execute(q, args)
266         q = ("delete from nodes "
267              "where node in (select node from nodes n "
268                             "where (select count(serial) "
269                                    "from versions "
270                                    "where node = n.node) = 0 "
271                             "and parent = ?)")
272         execute(q, (parent,))
273         return hashes
274     
275     def node_purge(self, node, before=inf, cluster=0):
276         """Delete all versions with the specified
277            node and cluster, and return
278            the hashes of versions deleted.
279            Clears out the node if it has no remaining versions.
280         """
281         
282         execute = self.execute
283         q = ("select count(serial), sum(size) from versions "
284              "where node = ? "
285              "and cluster = ? "
286              "and mtime <= ?")
287         args = (node, cluster, before)
288         execute(q, args)
289         nr, size = self.fetchone()
290         if not nr:
291             return ()
292         mtime = time()
293         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
294         
295         q = ("select hash from versions "
296              "where node = ? "
297              "and cluster = ? "
298              "and mtime <= ?")
299         execute(q, args)
300         hashes = [r[0] for r in self.fetchall()]
301         q = ("delete from versions "
302              "where node = ? "
303              "and cluster = ? "
304              "and mtime <= ?")
305         execute(q, args)
306         q = ("delete from nodes "
307              "where node in (select node from nodes n "
308                             "where (select count(serial) "
309                                    "from versions "
310                                    "where node = n.node) = 0 "
311                             "and node = ?)")
312         execute(q, (node,))
313         return hashes
314     
315     def node_remove(self, node):
316         """Remove the node specified.
317            Return false if the node has children or is not found.
318         """
319         
320         if self.node_count_children(node):
321             return False
322         
323         mtime = time()
324         q = ("select count(serial), sum(size), cluster "
325              "from versions "
326              "where node = ? "
327              "group by cluster")
328         self.execute(q, (node,))
329         for population, size, cluster in self.fetchall():
330             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
331         
332         q = "delete from nodes where node = ?"
333         self.execute(q, (node,))
334         return True
335     
336     def policy_get(self, node):
337         q = "select key, value from policy where node = ?"
338         self.execute(q, (node,))
339         return dict(self.fetchall())
340     
341     def policy_set(self, node, policy):
342         q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
343         self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
344     
345     def statistics_get(self, node, cluster=0):
346         """Return population, total size and last mtime
347            for all versions under node that belong to the cluster.
348         """
349         
350         q = ("select population, size, mtime from statistics "
351              "where node = ? and cluster = ?")
352         self.execute(q, (node, cluster))
353         return self.fetchone()
354     
355     def statistics_update(self, node, population, size, mtime, cluster=0):
356         """Update the statistics of the given node.
357            Statistics keep track the population, total
358            size of objects and mtime in the node's namespace.
359            May be zero or positive or negative numbers.
360         """
361         
362         qs = ("select population, size from statistics "
363               "where node = ? and cluster = ?")
364         qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
365               "values (?, ?, ?, ?, ?)")
366         self.execute(qs, (node, cluster))
367         r = self.fetchone()
368         if r is None:
369             prepopulation, presize = (0, 0)
370         else:
371             prepopulation, presize = r
372         population += prepopulation
373         size += presize
374         self.execute(qu, (node, population, size, mtime, cluster))
375     
376     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
377         """Update the statistics of the given node's parent.
378            Then recursively update all parents up to the root.
379            Population is not recursive.
380         """
381         
382         while True:
383             if node == 0:
384                 break
385             props = self.node_get_properties(node)
386             if props is None:
387                 break
388             parent, path = props
389             self.statistics_update(parent, population, size, mtime, cluster)
390             node = parent
391             population = 0 # Population isn't recursive
392     
393     def statistics_latest(self, node, before=inf, except_cluster=0):
394         """Return population, total size and last mtime
395            for all latest versions under node that
396            do not belong to the cluster.
397         """
398         
399         execute = self.execute
400         fetchone = self.fetchone
401         
402         # The node.
403         props = self.node_get_properties(node)
404         if props is None:
405             return None
406         parent, path = props
407         
408         # The latest version.
409         q = ("select serial, node, hash, size, source, mtime, muser, cluster "
410              "from versions "
411              "where serial = (select max(serial) "
412                              "from versions "
413                              "where node = ? and mtime < ?) "
414              "and cluster != ?")
415         execute(q, (node, before, except_cluster))
416         props = fetchone()
417         if props is None:
418             return None
419         mtime = props[MTIME]
420         
421         # First level, just under node (get population).
422         q = ("select count(serial), sum(size), max(mtime) "
423              "from versions v "
424              "where serial = (select max(serial) "
425                              "from versions "
426                              "where node = v.node and mtime < ?) "
427              "and cluster != ? "
428              "and node in (select node "
429                           "from nodes "
430                           "where parent = ?)")
431         execute(q, (before, except_cluster, node))
432         r = fetchone()
433         if r is None:
434             return None
435         count = r[0]
436         mtime = max(mtime, r[2])
437         if count == 0:
438             return (0, 0, mtime)
439         
440         # All children (get size and mtime).
441         # XXX: This is why the full path is stored.
442         q = ("select count(serial), sum(size), max(mtime) "
443              "from versions v "
444              "where serial = (select max(serial) "
445                              "from versions "
446                              "where node = v.node and mtime < ?) "
447              "and cluster != ? "
448              "and node in (select node "
449                           "from nodes "
450                           "where path like ? escape '\\')")
451         execute(q, (before, except_cluster, self.escape_like(path) + '%'))
452         r = fetchone()
453         if r is None:
454             return None
455         size = r[1] - props[SIZE]
456         mtime = max(mtime, r[2])
457         return (count, size, mtime)
458     
459     def version_create(self, node, hash, size, source, muser, cluster=0):
460         """Create a new version from the given properties.
461            Return the (serial, mtime) of the new version.
462         """
463         
464         q = ("insert into versions (node, hash, size, source, mtime, muser, cluster) "
465              "values (?, ?, ?, ?, ?, ?, ?)")
466         mtime = time()
467         props = (node, hash, size, source, mtime, muser, cluster)
468         serial = self.execute(q, props).lastrowid
469         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
470         return serial, mtime
471     
472     def version_lookup(self, node, before=inf, cluster=0):
473         """Lookup the current version of the given node.
474            Return a list with its properties:
475            (serial, node, hash, size, source, mtime, muser, cluster)
476            or None if the current version is not found in the given cluster.
477         """
478         
479         q = ("select serial, node, hash, size, source, mtime, muser, cluster "
480              "from versions "
481              "where serial = (select max(serial) "
482                              "from versions "
483                              "where node = ? and mtime < ?) "
484              "and cluster = ?")
485         self.execute(q, (node, before, cluster))
486         props = self.fetchone()
487         if props is not None:
488             return props
489         return None
490     
491     def version_get_properties(self, serial, keys=(), propnames=_propnames):
492         """Return a sequence of values for the properties of
493            the version specified by serial and the keys, in the order given.
494            If keys is empty, return all properties in the order
495            (serial, node, hash, size, source, mtime, muser, cluster).
496         """
497         
498         q = ("select serial, node, hash, size, source, mtime, muser, cluster "
499              "from versions "
500              "where serial = ?")
501         self.execute(q, (serial,))
502         r = self.fetchone()
503         if r is None:
504             return r
505         
506         if not keys:
507             return r
508         return [r[propnames[k]] for k in keys if k in propnames]
509     
510     def version_recluster(self, serial, cluster):
511         """Move the version into another cluster."""
512         
513         props = self.version_get_properties(serial)
514         if not props:
515             return
516         node = props[NODE]
517         size = props[SIZE]
518         oldcluster = props[CLUSTER]
519         if cluster == oldcluster:
520             return
521         
522         mtime = time()
523         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
524         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
525         
526         q = "update versions set cluster = ? where serial = ?"
527         self.execute(q, (cluster, serial))
528     
529     def version_remove(self, serial):
530         """Remove the serial specified."""
531         
532         props = self.version_get_properties(serial)
533         if not props:
534             return
535         node = props[NODE]
536         hash = props[HASH]
537         size = props[SIZE]
538         cluster = props[CLUSTER]
539         
540         mtime = time()
541         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
542         
543         q = "delete from versions where serial = ?"
544         self.execute(q, (serial,))
545         return hash
546     
547     def attribute_get(self, serial, keys=()):
548         """Return a list of (key, value) pairs of the version specified by serial.
549            If keys is empty, return all attributes.
550            Othwerise, return only those specified.
551         """
552         
553         execute = self.execute
554         if keys:
555             marks = ','.join('?' for k in keys)
556             q = ("select key, value from attributes "
557                  "where key in (%s) and serial = ?" % (marks,))
558             execute(q, keys + (serial,))
559         else:
560             q = "select key, value from attributes where serial = ?"
561             execute(q, (serial,))
562         return self.fetchall()
563     
564     def attribute_set(self, serial, items):
565         """Set the attributes of the version specified by serial.
566            Receive attributes as an iterable of (key, value) pairs.
567         """
568         
569         q = ("insert or replace into attributes (serial, key, value) "
570              "values (?, ?, ?)")
571         self.executemany(q, ((serial, k, v) for k, v in items))
572     
573     def attribute_del(self, serial, keys=()):
574         """Delete attributes of the version specified by serial.
575            If keys is empty, delete all attributes.
576            Otherwise delete those specified.
577         """
578         
579         if keys:
580             q = "delete from attributes where serial = ? and key = ?"
581             self.executemany(q, ((serial, key) for key in keys))
582         else:
583             q = "delete from attributes where serial = ?"
584             self.execute(q, (serial,))
585     
586     def attribute_copy(self, source, dest):
587         q = ("insert or replace into attributes "
588              "select ?, key, value from attributes "
589              "where serial = ?")
590         self.execute(q, (dest, source))
591     
592     def _construct_filters(self, filterq):
593         if not filterq:
594             return None, None
595         
596         args = filterq.split(',')
597         subq = " and a.key in ("
598         subq += ','.join(('?' for x in args))
599         subq += ")"
600         
601         return subq, args
602     
603     def _construct_paths(self, pathq):
604         if not pathq:
605             return None, None
606         
607         subq = " and ("
608         subq += ' or '.join(("n.path like ? escape '\\'" for x in pathq))
609         subq += ")"
610         args = tuple([self.escape_like(x) + '%' for x in pathq])
611         
612         return subq, args
613     
614     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
615         """Return a list with all keys pairs defined
616            for all latest versions under parent that
617            do not belong to the cluster.
618         """
619         
620         # TODO: Use another table to store before=inf results.
621         q = ("select distinct a.key "
622              "from attributes a, versions v, nodes n "
623              "where v.serial = (select max(serial) "
624                               "from versions "
625                               "where node = v.node and mtime < ?) "
626              "and v.cluster != ? "
627              "and v.node in (select node "
628                            "from nodes "
629                            "where parent = ?) "
630              "and a.serial = v.serial "
631              "and n.node = v.node")
632         args = (before, except_cluster, parent)
633         subq, subargs = self._construct_paths(pathq)
634         if subq is not None:
635             q += subq
636             args += subargs
637         self.execute(q, args)
638         return [r[0] for r in self.fetchall()]
639     
640     def latest_version_list(self, parent, prefix='', delimiter=None,
641                             start='', limit=10000, before=inf,
642                             except_cluster=0, pathq=[], filterq=None):
643         """Return a (list of (path, serial) tuples, list of common prefixes)
644            for the current versions of the paths with the given parent,
645            matching the following criteria.
646            
647            The property tuple for a version is returned if all
648            of these conditions are true:
649                 
650                 a. parent matches
651                 
652                 b. path > start
653                 
654                 c. path starts with prefix (and paths in pathq)
655                 
656                 d. version is the max up to before
657                 
658                 e. version is not in cluster
659                 
660                 f. the path does not have the delimiter occuring
661                    after the prefix, or ends with the delimiter
662                 
663                 g. serial matches the attribute filter query.
664                    
665                    A filter query is a comma-separated list of
666                    terms in one of these three forms:
667                    
668                    key
669                        an attribute with this key must exist
670                    
671                    !key
672                        an attribute with this key must not exist
673                    
674                    key ?op value
675                        the attribute with this key satisfies the value
676                        where ?op is one of ==, != <=, >=, <, >.
677            
678            The list of common prefixes includes the prefixes
679            matching up to the first delimiter after prefix,
680            and are reported only once, as "virtual directories".
681            The delimiter is included in the prefixes.
682            
683            If arguments are None, then the corresponding matching rule
684            will always match.
685            
686            Limit applies to the first list of tuples returned.
687         """
688         
689         execute = self.execute
690         
691         if not start or start < prefix:
692             start = strprevling(prefix)
693         nextling = strnextling(prefix)
694         
695         q = ("select distinct n.path, v.serial "
696              "from attributes a, versions v, nodes n "
697              "where v.serial = (select max(serial) "
698                               "from versions "
699                               "where node = v.node and mtime < ?) "
700              "and v.cluster != ? "
701              "and v.node in (select node "
702                            "from nodes "
703                            "where parent = ?) "
704              "and a.serial = v.serial "
705              "and n.node = v.node "
706              "and n.path > ? and n.path < ?")
707         args = [before, except_cluster, parent, start, nextling]
708         
709         subq, subargs = self._construct_paths(pathq)
710         if subq is not None:
711             q += subq
712             args += subargs
713         subq, subargs = self._construct_filters(filterq)
714         if subq is not None:
715             q += subq
716             args += subargs
717         else:
718             q = q.replace("attributes a, ", "")
719             q = q.replace("and a.serial = v.serial ", "")
720         q += " order by n.path"
721         
722         if not delimiter:
723             q += " limit ?"
724             args.append(limit)
725             execute(q, args)
726             return self.fetchall(), ()
727         
728         pfz = len(prefix)
729         dz = len(delimiter)
730         count = 0
731         fetchone = self.fetchone
732         prefixes = []
733         pappend = prefixes.append
734         matches = []
735         mappend = matches.append
736         
737         execute(q, args)
738         while True:
739             props = fetchone()
740             if props is None:
741                 break
742             path, serial = props
743             idx = path.find(delimiter, pfz)
744             
745             if idx < 0:
746                 mappend(props)
747                 count += 1
748                 if count >= limit:
749                     break
750                 continue
751             
752             if idx + dz == len(path):
753                 mappend(props)
754                 count += 1
755                 continue # Get one more, in case there is a path.
756             pf = path[:idx + dz]
757             pappend(pf)
758             if count >= limit: 
759                 break
760             
761             args[3] = strnextling(pf) # New start.
762             execute(q, args)
763         
764         return matches, prefixes