0e5cfc046da34960a2d20b39ed6b2fd9f59cddd5
[pithos] / pithos / backends / lib / sqlite / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35
36 from dbworker import DBWorker
37
38
39 ROOTNODE  = 0
40
41 ( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
42
43 inf = float('inf')
44
45
46 def strnextling(prefix):
47     """Return the first unicode string
48        greater than but not starting with given prefix.
49        strnextling('hello') -> 'hellp'
50     """
51     if not prefix:
52         ## all strings start with the null string,
53         ## therefore we have to approximate strnextling('')
54         ## with the last unicode character supported by python
55         ## 0x10ffff for wide (32-bit unicode) python builds
56         ## 0x00ffff for narrow (16-bit unicode) python builds
57         ## We will not autodetect. 0xffff is safe enough.
58         return unichr(0xffff)
59     s = prefix[:-1]
60     c = ord(prefix[-1])
61     if c >= 0xffff:
62         raise RuntimeError
63     s += unichr(c+1)
64     return s
65
66 def strprevling(prefix):
67     """Return an approximation of the last unicode string
68        less than but not starting with given prefix.
69        strprevling(u'hello') -> u'helln\\xffff'
70     """
71     if not prefix:
72         ## There is no prevling for the null string
73         return prefix
74     s = prefix[:-1]
75     c = ord(prefix[-1])
76     if c > 0:
77         s += unichr(c-1) + unichr(0xffff)
78     return s
79
80
81 _propnames = {
82     'serial'    : 0,
83     'node'      : 1,
84     'hash'      : 2,
85     'size'      : 3,
86     'source'    : 4,
87     'mtime'     : 5,
88     'muser'     : 6,
89     'cluster'   : 7,
90 }
91
92
93 class Node(DBWorker):
94     """Nodes store path organization and have multiple versions.
95        Versions store object history and have multiple attributes.
96        Attributes store metadata.
97     """
98     
99     # TODO: Provide an interface for included and excluded clusters.
100     
101     def __init__(self, **params):
102         DBWorker.__init__(self, **params)
103         execute = self.execute
104         
105         execute(""" pragma foreign_keys = on """)
106         
107         execute(""" create table if not exists nodes
108                           ( node       integer primary key,
109                             parent     integer default 0,
110                             path       text    not null default '',
111                             foreign key (parent)
112                             references nodes(node)
113                             on update cascade
114                             on delete cascade )""")
115         execute(""" create unique index if not exists idx_nodes_path
116                     on nodes(path) """)
117         
118         execute(""" create table if not exists statistics
119                           ( node       integer,
120                             population integer not null default 0,
121                             size       integer not null default 0,
122                             mtime      integer,
123                             cluster    integer not null default 0,
124                             primary key (node, cluster)
125                             foreign key (node)
126                             references nodes(node)
127                             on update cascade
128                             on delete cascade )""")
129         
130         execute(""" create table if not exists versions
131                           ( serial     integer primary key,
132                             node       integer,
133                             hash       text,
134                             size       integer not null default 0,
135                             source     integer,
136                             mtime      integer,
137                             muser      text    not null default '',
138                             cluster    integer not null default 0,
139                             foreign key (node)
140                             references nodes(node)
141                             on update cascade
142                             on delete cascade ) """)
143         execute(""" create index if not exists idx_versions_node_mtime
144                     on versions(node, mtime) """)
145         
146         execute(""" create table if not exists attributes
147                           ( serial integer,
148                             key    text,
149                             value  text,
150                             primary key (serial, key)
151                             foreign key (serial)
152                             references versions(serial)
153                             on update cascade
154                             on delete cascade ) """)
155         
156         q = "insert or ignore into nodes(node, parent) values (?, ?)"
157         execute(q, (ROOTNODE, ROOTNODE))
158     
159     def node_create(self, parent, path):
160         """Create a new node from the given properties.
161            Return the node identifier of the new node.
162         """
163         
164         q = ("insert into nodes (parent, path) "
165              "values (?, ?)")
166         props = (parent, path)
167         return self.execute(q, props).lastrowid
168     
169     def node_lookup(self, path):
170         """Lookup the current node of the given path.
171            Return None if the path is not found.
172         """
173         
174         q = "select node from nodes where path = ?"
175         self.execute(q, (path,))
176         r = self.fetchone()
177         if r is not None:
178             return r[0]
179         return None
180     
181     def node_get_properties(self, node):
182         """Return the node's (parent, path).
183            Return None if the node is not found.
184         """
185         
186         q = "select parent, path from nodes where node = ?"
187         self.execute(q, (node,))
188         return self.fetchone()
189     
190     def node_get_versions(self, node, keys=(), propnames=_propnames):
191         """Return the properties of all versions at node.
192            If keys is empty, return all properties in the order
193            (serial, node, size, source, mtime, muser, cluster).
194         """
195         
196         q = ("select serial, node, hash, size, source, mtime, muser, cluster "
197              "from versions "
198              "where node = ?")
199         self.execute(q, (node,))
200         r = self.fetchall()
201         if r is None:
202             return r
203         
204         if not keys:
205             return r
206         return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
207     
208     def node_count_children(self, node):
209         """Return node's child count."""
210         
211         q = "select count(node) from nodes where parent = ? and node != 0"
212         self.execute(q, (node,))
213         r = self.fetchone()
214         if r is None:
215             return 0
216         return r[0]
217     
218     def node_purge_children(self, parent, before=inf, cluster=0):
219         """Delete all versions with the specified
220            parent and cluster, and return
221            the serials of versions deleted.
222            Clears out nodes with no remaining versions.
223         """
224         
225         execute = self.execute
226         q = ("select count(serial), sum(size) from versions "
227              "where node in (select node "
228                             "from nodes "
229                             "where parent = ?) "
230              "and cluster = ? "
231              "and mtime <= ?")
232         args = (parent, cluster, before)
233         execute(q, args)
234         nr, size = self.fetchone()
235         if not nr:
236             return ()
237         mtime = time()
238         self.statistics_update(parent, -nr, -size, mtime, cluster)
239         self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
240         
241         q = ("select serial from versions "
242              "where node in (select node "
243                             "from nodes "
244                             "where parent = ?) "
245              "and cluster = ? "
246              "and mtime <= ?")
247         execute(q, args)
248         serials = [r[SERIAL] for r in self.fetchall()]
249         q = ("delete from versions "
250              "where node in (select node "
251                             "from nodes "
252                             "where parent = ?) "
253              "and cluster = ? "
254              "and mtime <= ?")
255         execute(q, args)
256         q = ("delete from nodes "
257              "where node in (select node from nodes n "
258                             "where (select count(serial) "
259                                    "from versions "
260                                    "where node = n.node) = 0 "
261                             "and parent = ?)")
262         execute(q, (parent,))
263         return serials
264     
265     def node_purge(self, node, before=inf, cluster=0):
266         """Delete all versions with the specified
267            node and cluster, and return
268            the serials of versions deleted.
269            Clears out the node if it has no remaining versions.
270         """
271         
272         execute = self.execute
273         q = ("select count(serial), sum(size) from versions "
274              "where node = ? "
275              "and cluster = ? "
276              "and mtime <= ?")
277         args = (node, cluster, before)
278         execute(q, args)
279         nr, size = self.fetchone()
280         if not nr:
281             return ()
282         mtime = time()
283         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
284         
285         q = ("select serial from versions "
286              "where node = ? "
287              "and cluster = ? "
288              "and mtime <= ?")
289         execute(q, args)
290         serials = [r[SERIAL] for r in self.fetchall()]
291         q = ("delete from versions "
292              "where node = ? "
293              "and cluster = ? "
294              "and mtime <= ?")
295         execute(q, args)
296         q = ("delete from nodes "
297              "where node in (select node from nodes n "
298                             "where (select count(serial) "
299                                    "from versions "
300                                    "where node = n.node) = 0 "
301                             "and node = ?)")
302         execute(q, (node,))
303         return serials
304     
305     def node_remove(self, node):
306         """Remove the node specified.
307            Return false if the node has children or is not found.
308         """
309         
310         if self.node_count_children(node):
311             return False
312         
313         mtime = time()
314         q = ("select count(serial), sum(size), cluster "
315              "from versions "
316              "where node = ? "
317              "group by cluster")
318         self.execute(q, (node,))
319         for population, size, cluster in self.fetchall():
320             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
321         
322         q = "delete from nodes where node = ?"
323         self.execute(q, (node,))
324         return True
325     
326     def statistics_get(self, node, cluster=0):
327         """Return population, total size and last mtime
328            for all versions under node that belong to the cluster.
329         """
330         
331         q = ("select population, size, mtime from statistics "
332              "where node = ? and cluster = ?")
333         self.execute(q, (node, cluster))
334         return self.fetchone()
335     
336     def statistics_update(self, node, population, size, mtime, cluster=0):
337         """Update the statistics of the given node.
338            Statistics keep track the population, total
339            size of objects and mtime in the node's namespace.
340            May be zero or positive or negative numbers.
341         """
342         
343         qs = ("select population, size from statistics "
344               "where node = ? and cluster = ?")
345         qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
346               "values (?, ?, ?, ?, ?)")
347         self.execute(qs, (node, cluster))
348         r = self.fetchone()
349         if r is None:
350             prepopulation, presize = (0, 0)
351         else:
352             prepopulation, presize = r
353         population += prepopulation
354         size += presize
355         self.execute(qu, (node, population, size, mtime, cluster))
356     
357     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
358         """Update the statistics of the given node's parent.
359            Then recursively update all parents up to the root.
360            Population is not recursive.
361         """
362         
363         while True:
364             if node == 0:
365                 break
366             props = self.node_get_properties(node)
367             if props is None:
368                 break
369             parent, path = props
370             self.statistics_update(parent, population, size, mtime, cluster)
371             node = parent
372             population = 0 # Population isn't recursive
373     
374     def statistics_latest(self, node, before=inf, except_cluster=0):
375         """Return population, total size and last mtime
376            for all latest versions under node that
377            do not belong to the cluster.
378         """
379         
380         execute = self.execute
381         fetchone = self.fetchone
382         
383         # The node.
384         props = self.node_get_properties(node)
385         if props is None:
386             return None
387         parent, path = props
388         
389         # The latest version.
390         q = ("select serial, node, hash, size, source, mtime, muser, cluster "
391              "from versions "
392              "where serial = (select max(serial) "
393                              "from versions "
394                              "where node = ? and mtime < ?) "
395              "and cluster != ?")
396         execute(q, (node, before, except_cluster))
397         props = fetchone()
398         if props is None:
399             return None
400         mtime = props[MTIME]
401         
402         # First level, just under node (get population).
403         q = ("select count(serial), sum(size), max(mtime) "
404              "from versions v "
405              "where serial = (select max(serial) "
406                              "from versions "
407                              "where node = v.node and mtime < ?) "
408              "and cluster != ? "
409              "and node in (select node "
410                           "from nodes "
411                           "where parent = ?)")
412         execute(q, (before, except_cluster, node))
413         r = fetchone()
414         if r is None:
415             return None
416         count = r[0]
417         mtime = max(mtime, r[2])
418         if count == 0:
419             return (0, 0, mtime)
420         
421         # All children (get size and mtime).
422         # XXX: This is why the full path is stored.
423         q = ("select count(serial), sum(size), max(mtime) "
424              "from versions v "
425              "where serial = (select max(serial) "
426                              "from versions "
427                              "where node = v.node and mtime < ?) "
428              "and cluster != ? "
429              "and node in (select node "
430                           "from nodes "
431                           "where path like ?)")
432         execute(q, (before, except_cluster, path + '%'))
433         r = fetchone()
434         if r is None:
435             return None
436         size = r[1] - props[SIZE]
437         mtime = max(mtime, r[2])
438         return (count, size, mtime)
439     
440     def version_create(self, node, hash, size, source, muser, cluster=0):
441         """Create a new version from the given properties.
442            Return the (serial, mtime) of the new version.
443         """
444         
445         q = ("insert into versions (node, hash, size, source, mtime, muser, cluster) "
446              "values (?, ?, ?, ?, ?, ?, ?)")
447         mtime = time()
448         props = (node, hash, size, source, mtime, muser, cluster)
449         serial = self.execute(q, props).lastrowid
450         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
451         return serial, mtime
452     
453     def version_lookup(self, node, before=inf, cluster=0):
454         """Lookup the current version of the given node.
455            Return a list with its properties:
456            (serial, node, hash, size, source, mtime, muser, cluster)
457            or None if the current version is not found in the given cluster.
458         """
459         
460         q = ("select serial, node, hash, size, source, mtime, muser, cluster "
461              "from versions "
462              "where serial = (select max(serial) "
463                              "from versions "
464                              "where node = ? and mtime < ?) "
465              "and cluster = ?")
466         self.execute(q, (node, before, cluster))
467         props = self.fetchone()
468         if props is not None:
469             return props
470         return None
471     
472     def version_get_properties(self, serial, keys=(), propnames=_propnames):
473         """Return a sequence of values for the properties of
474            the version specified by serial and the keys, in the order given.
475            If keys is empty, return all properties in the order
476            (serial, node, hash, size, source, mtime, muser, cluster).
477         """
478         
479         q = ("select serial, node, hash, size, source, mtime, muser, cluster "
480              "from versions "
481              "where serial = ?")
482         self.execute(q, (serial,))
483         r = self.fetchone()
484         if r is None:
485             return r
486         
487         if not keys:
488             return r
489         return [r[propnames[k]] for k in keys if k in propnames]
490     
491     def version_recluster(self, serial, cluster):
492         """Move the version into another cluster."""
493         
494         props = self.version_get_properties(serial)
495         if not props:
496             return
497         node = props[NODE]
498         size = props[SIZE]
499         oldcluster = props[CLUSTER]
500         if cluster == oldcluster:
501             return
502         
503         mtime = time()
504         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
505         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
506         
507         q = "update versions set cluster = ? where serial = ?"
508         self.execute(q, (cluster, serial))
509     
510     def version_remove(self, serial):
511         """Remove the serial specified."""
512         
513         props = self.node_get_properties(serial)
514         if not props:
515             return
516         node = props[NODE]
517         size = props[SIZE]
518         cluster = props[CLUSTER]
519         
520         mtime = time()
521         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
522         
523         q = "delete from versions where serial = ?"
524         self.execute(q, (serial,))
525         return True
526     
527     def attribute_get(self, serial, keys=()):
528         """Return a list of (key, value) pairs of the version specified by serial.
529            If keys is empty, return all attributes.
530            Othwerise, return only those specified.
531         """
532         
533         execute = self.execute
534         if keys:
535             marks = ','.join('?' for k in keys)
536             q = ("select key, value from attributes "
537                  "where key in (%s) and serial = ?" % (marks,))
538             execute(q, keys + (serial,))
539         else:
540             q = "select key, value from attributes where serial = ?"
541             execute(q, (serial,))
542         return self.fetchall()
543     
544     def attribute_set(self, serial, items):
545         """Set the attributes of the version specified by serial.
546            Receive attributes as an iterable of (key, value) pairs.
547         """
548         
549         q = ("insert or replace into attributes (serial, key, value) "
550              "values (?, ?, ?)")
551         self.executemany(q, ((serial, k, v) for k, v in items))
552     
553     def attribute_del(self, serial, keys=()):
554         """Delete attributes of the version specified by serial.
555            If keys is empty, delete all attributes.
556            Otherwise delete those specified.
557         """
558         
559         if keys:
560             q = "delete from attributes where serial = ? and key = ?"
561             self.executemany(q, ((serial, key) for key in keys))
562         else:
563             q = "delete from attributes where serial = ?"
564             self.execute(q, (serial,))
565     
566     def attribute_copy(self, source, dest):
567         q = ("insert or replace into attributes "
568              "select ?, key, value from attributes "
569              "where serial = ?")
570         self.execute(q, (dest, source))
571     
572     def _construct_filters(self, filterq):
573         if not filterq:
574             return None, None
575         
576         args = filterq.split(',')
577         subq = " and a.key in ("
578         subq += ','.join(('?' for x in args))
579         subq += ")"
580         
581         return subq, args
582     
583     def _construct_paths(self, pathq):
584         if not pathq:
585             return None, None
586         
587         subq = " and ("
588         subq += ' or '.join(('n.path like ?' for x in pathq))
589         subq += ")"
590         args = tuple([x + '%' for x in pathq])
591         
592         return subq, args
593     
594     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
595         """Return a list with all keys pairs defined
596            for all latest versions under parent that
597            do not belong to the cluster.
598         """
599         
600         # TODO: Use another table to store before=inf results.
601         q = ("select distinct a.key "
602              "from attributes a, versions v, nodes n "
603              "where v.serial = (select max(serial) "
604                               "from versions "
605                               "where node = v.node and mtime < ?) "
606              "and v.cluster != ? "
607              "and v.node in (select node "
608                            "from nodes "
609                            "where parent = ?) "
610              "and a.serial = v.serial "
611              "and n.node = v.node")
612         args = (before, except_cluster, parent)
613         subq, subargs = self._construct_paths(pathq)
614         if subq is not None:
615             q += subq
616             args += subargs
617         self.execute(q, args)
618         return [r[0] for r in self.fetchall()]
619     
620     def latest_version_list(self, parent, prefix='', delimiter=None,
621                             start='', limit=10000, before=inf,
622                             except_cluster=0, pathq=[], filterq=None):
623         """Return a (list of (path, serial) tuples, list of common prefixes)
624            for the current versions of the paths with the given parent,
625            matching the following criteria.
626            
627            The property tuple for a version is returned if all
628            of these conditions are true:
629                 
630                 a. parent matches
631                 
632                 b. path > start
633                 
634                 c. path starts with prefix (and paths in pathq)
635                 
636                 d. version is the max up to before
637                 
638                 e. version is not in cluster
639                 
640                 f. the path does not have the delimiter occuring
641                    after the prefix, or ends with the delimiter
642                 
643                 g. serial matches the attribute filter query.
644                    
645                    A filter query is a comma-separated list of
646                    terms in one of these three forms:
647                    
648                    key
649                        an attribute with this key must exist
650                    
651                    !key
652                        an attribute with this key must not exist
653                    
654                    key ?op value
655                        the attribute with this key satisfies the value
656                        where ?op is one of ==, != <=, >=, <, >.
657            
658            The list of common prefixes includes the prefixes
659            matching up to the first delimiter after prefix,
660            and are reported only once, as "virtual directories".
661            The delimiter is included in the prefixes.
662            
663            If arguments are None, then the corresponding matching rule
664            will always match.
665            
666            Limit applies to the first list of tuples returned.
667         """
668         
669         execute = self.execute
670         
671         if not start or start < prefix:
672             start = strprevling(prefix)
673         nextling = strnextling(prefix)
674         
675         q = ("select distinct n.path, v.serial "
676              "from attributes a, versions v, nodes n "
677              "where v.serial = (select max(serial) "
678                               "from versions "
679                               "where node = v.node and mtime < ?) "
680              "and v.cluster != ? "
681              "and v.node in (select node "
682                            "from nodes "
683                            "where parent = ?) "
684              "and a.serial = v.serial "
685              "and n.node = v.node "
686              "and n.path > ? and n.path < ?")
687         args = [before, except_cluster, parent, start, nextling]
688         
689         subq, subargs = self._construct_paths(pathq)
690         if subq is not None:
691             q += subq
692             args += subargs
693         subq, subargs = self._construct_filters(filterq)
694         if subq is not None:
695             q += subq
696             args += subargs
697         else:
698             q = q.replace("attributes a, ", "")
699             q = q.replace("and a.serial = v.serial ", "")
700         q += " order by n.path"
701         
702         if not delimiter:
703             q += " limit ?"
704             args.append(limit)
705             execute(q, args)
706             return self.fetchall(), ()
707         
708         pfz = len(prefix)
709         dz = len(delimiter)
710         count = 0
711         fetchone = self.fetchone
712         prefixes = []
713         pappend = prefixes.append
714         matches = []
715         mappend = matches.append
716         
717         execute(q, args)
718         while True:
719             props = fetchone()
720             if props is None:
721                 break
722             path, serial = props
723             idx = path.find(delimiter, pfz)
724             
725             if idx < 0:
726                 mappend(props)
727                 count += 1
728                 if count >= limit:
729                     break
730                 continue
731             
732             if idx + dz == len(path):
733                 mappend(props)
734                 count += 1
735                 continue # Get one more, in case there is a path.
736             pf = path[:idx + dz]
737             pappend(pf)
738             if count >= limit: 
739                 break
740             
741             args[3] = strnextling(pf) # New start.
742             execute(q, args)
743         
744         return matches, prefixes