Merge branch 'master' of https://code.grnet.gr/git/pithos
[pithos] / pithos / backends / lib / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35
36 from dbworker import DBWorker
37
38
39 ROOTNODE  = 0
40
41 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
42
43 inf = float('inf')
44
45
46 def strnextling(prefix):
47     """Return the first unicode string
48        greater than but not starting with given prefix.
49        strnextling('hello') -> 'hellp'
50     """
51     if not prefix:
52         ## all strings start with the null string,
53         ## therefore we have to approximate strnextling('')
54         ## with the last unicode character supported by python
55         ## 0x10ffff for wide (32-bit unicode) python builds
56         ## 0x00ffff for narrow (16-bit unicode) python builds
57         ## We will not autodetect. 0xffff is safe enough.
58         return unichr(0xffff)
59     s = prefix[:-1]
60     c = ord(prefix[-1])
61     if c >= 0xffff:
62         raise RuntimeError
63     s += unichr(c+1)
64     return s
65
66 def strprevling(prefix):
67     """Return an approximation of the last unicode string
68        less than but not starting with given prefix.
69        strprevling(u'hello') -> u'helln\\xffff'
70     """
71     if not prefix:
72         ## There is no prevling for the null string
73         return prefix
74     s = prefix[:-1]
75     c = ord(prefix[-1])
76     if c > 0:
77         s += unichr(c-1) + unichr(0xffff)
78     return s
79
80
81 _propnames = {
82     'serial'    : 0,
83     'node'      : 1,
84     'size'      : 2,
85     'source'    : 3,
86     'mtime'     : 4,
87     'muser'     : 5,
88     'cluster'   : 6,
89 }
90
91
92 class Node(DBWorker):
93     """Nodes store path organization and have multiple versions.
94        Versions store object history and have multiple attributes.
95        Attributes store metadata.
96     """
97     
98     # TODO: Provide an interface for included and excluded clusters.
99     
100     def __init__(self, **params):
101         DBWorker.__init__(self, **params)
102         execute = self.execute
103         
104         execute(""" pragma foreign_keys = on """)
105         
106         execute(""" create table if not exists nodes
107                           ( node       integer primary key,
108                             parent     integer default 0,
109                             path       text    not null default '',
110                             foreign key (parent)
111                             references nodes(node)
112                             on update cascade
113                             on delete cascade )""")
114         execute(""" create unique index if not exists idx_nodes_path
115                     on nodes(path) """)
116         
117         execute(""" create table if not exists statistics
118                           ( node       integer,
119                             population integer not null default 0,
120                             size       integer not null default 0,
121                             mtime      integer,
122                             cluster    integer not null default 0,
123                             primary key (node, cluster)
124                             foreign key (node)
125                             references nodes(node)
126                             on update cascade
127                             on delete cascade )""")
128         
129         execute(""" create table if not exists versions
130                           ( serial     integer primary key,
131                             node       integer,
132                             size       integer not null default 0,
133                             source     integer,
134                             mtime      integer,
135                             muser      text    not null default '',
136                             cluster    integer not null default 0,
137                             foreign key (node)
138                             references nodes(node)
139                             on update cascade
140                             on delete cascade ) """)
141         execute(""" create index if not exists idx_versions_node
142                     on nodes(node) """)
143         # TODO: Sort out if more indexes are needed.
144         # execute(""" create index if not exists idx_versions_mtime
145         #             on nodes(mtime) """)
146         
147         execute(""" create table if not exists attributes
148                           ( serial integer,
149                             key    text,
150                             value  text,
151                             primary key (serial, key)
152                             foreign key (serial)
153                             references versions(serial)
154                             on update cascade
155                             on delete cascade ) """)
156         
157         q = "insert or ignore into nodes(node, parent) values (?, ?)"
158         execute(q, (ROOTNODE, ROOTNODE))
159     
160     def node_create(self, parent, path):
161         """Create a new node from the given properties.
162            Return the node identifier of the new node.
163         """
164         
165         q = ("insert into nodes (parent, path) "
166              "values (?, ?)")
167         props = (parent, path)
168         return self.execute(q, props).lastrowid
169     
170     def node_lookup(self, path):
171         """Lookup the current node of the given path.
172            Return None if the path is not found.
173         """
174         
175         q = "select node from nodes where path = ?"
176         self.execute(q, (path,))
177         r = self.fetchone()
178         if r is not None:
179             return r[0]
180         return None
181     
182     def node_get_properties(self, node):
183         """Return the node's (parent, path).
184            Return None if the node is not found.
185         """
186         
187         q = "select parent, path from nodes where node = ?"
188         self.execute(q, (node,))
189         return self.fetchone()
190     
191     def node_get_versions(self, node, keys=(), propnames=_propnames):
192         """Return the properties of all versions at node.
193            If keys is empty, return all properties in the order
194            (serial, node, size, source, mtime, muser, cluster).
195         """
196         
197         q = ("select serial, node, size, source, mtime, muser, cluster "
198              "from versions "
199              "where node = ?")
200         self.execute(q, (node,))
201         r = self.fetchall()
202         if r is None:
203             return r
204         
205         if not keys:
206             return r
207         return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
208     
209     def node_count_children(self, node):
210         """Return node's child count."""
211         
212         q = "select count(node) from nodes where parent = ? and node != 0"
213         self.execute(q, (node,))
214         r = self.fetchone()
215         if r is None:
216             return 0
217         return r[0]
218     
219     def node_purge_children(self, parent, before=inf, cluster=0):
220         """Delete all versions with the specified
221            parent and cluster, and return
222            the serials of versions deleted.
223            Clears out nodes with no remaining versions.
224         """
225         
226         execute = self.execute
227         q = ("select count(serial), sum(size) from versions "
228              "where node in (select node "
229                             "from nodes "
230                             "where parent = ?) "
231              "and cluster = ? "
232              "and mtime <= ?")
233         args = (parent, cluster, before)
234         execute(q, args)
235         nr, size = self.fetchone()
236         if not nr:
237             return ()
238         mtime = time()
239         self.statistics_update(parent, -nr, -size, mtime, cluster)
240         self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
241         
242         q = ("select serial from versions "
243              "where node in (select node "
244                             "from nodes "
245                             "where parent = ?) "
246              "and cluster = ? "
247              "and mtime <= ?")
248         execute(q, args)
249         serials = [r[SERIAL] for r in self.fetchall()]
250         q = ("delete from versions "
251              "where node in (select node "
252                             "from nodes "
253                             "where parent = ?) "
254              "and cluster = ? "
255              "and mtime <= ?")
256         execute(q, args)
257         q = ("delete from nodes "
258              "where node in (select node from nodes n "
259                             "where (select count(serial) "
260                                    "from versions "
261                                    "where node = n.node) = 0 "
262                             "and parent = ?)")
263         execute(q, (parent,))
264         return serials
265     
266     def node_purge(self, node, before=inf, cluster=0):
267         """Delete all versions with the specified
268            node and cluster, and return
269            the serials of versions deleted.
270            Clears out the node if it has no remaining versions.
271         """
272         
273         execute = self.execute
274         q = ("select count(serial), sum(size) from versions "
275              "where node = ? "
276              "and cluster = ? "
277              "and mtime <= ?")
278         args = (node, cluster, before)
279         execute(q, args)
280         nr, size = self.fetchone()
281         if not nr:
282             return ()
283         mtime = time()
284         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
285         
286         q = ("select serial from versions "
287              "where node = ? "
288              "and cluster = ? "
289              "and mtime <= ?")
290         execute(q, args)
291         serials = [r[SERIAL] for r in self.fetchall()]
292         q = ("delete from versions "
293              "where node = ? "
294              "and cluster = ? "
295              "and mtime <= ?")
296         execute(q, args)
297         q = ("delete from nodes "
298              "where node in (select node from nodes n "
299                             "where (select count(serial) "
300                                    "from versions "
301                                    "where node = n.node) = 0 "
302                             "and node = ?)")
303         execute(q, (node,))
304         return serials
305     
306     def node_remove(self, node):
307         """Remove the node specified.
308            Return false if the node has children or is not found.
309         """
310         
311         if self.node_count_children(node):
312             return False
313         
314         mtime = time()
315         q = ("select count(serial), sum(size), cluster "
316              "from versions "
317              "where node = ? "
318              "group by cluster")
319         self.execute(q, (node,))
320         for population, size, cluster in self.fetchall():
321             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
322         
323         q = "delete from nodes where node = ?"
324         self.execute(q, (node,))
325         return True
326     
327     def statistics_get(self, node, cluster=0):
328         """Return population, total size and last mtime
329            for all versions under node that belong to the cluster.
330         """
331         
332         q = ("select population, size, mtime from statistics "
333              "where node = ? and cluster = ?")
334         self.execute(q, (node, cluster))
335         return self.fetchone()
336     
337     def statistics_update(self, node, population, size, mtime, cluster=0):
338         """Update the statistics of the given node.
339            Statistics keep track the population, total
340            size of objects and mtime in the node's namespace.
341            May be zero or positive or negative numbers.
342         """
343         
344         qs = ("select population, size from statistics "
345               "where node = ? and cluster = ?")
346         qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
347               "values (?, ?, ?, ?, ?)")
348         self.execute(qs, (node, cluster))
349         r = self.fetchone()
350         if r is None:
351             prepopulation, presize = (0, 0)
352         else:
353             prepopulation, presize = r
354         population += prepopulation
355         size += presize
356         self.execute(qu, (node, population, size, mtime, cluster))
357     
358     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
359         """Update the statistics of the given node's parent.
360            Then recursively update all parents up to the root.
361            Population is not recursive.
362         """
363         
364         while True:
365             if node == 0:
366                 break
367             props = self.node_get_properties(node)
368             if props is None:
369                 break
370             parent, path = props
371             self.statistics_update(parent, population, size, mtime, cluster)
372             node = parent
373             population = 0 # Population isn't recursive
374     
375     def statistics_latest(self, node, before=inf, except_cluster=0):
376         """Return population, total size and last mtime
377            for all latest versions under node that
378            do not belong to the cluster.
379         """
380         
381         execute = self.execute
382         fetchone = self.fetchone
383         
384         # The node.
385         props = self.node_get_properties(node)
386         if props is None:
387             return None
388         parent, path = props
389         
390         # The latest version.
391         q = ("select serial, node, size, source, mtime, muser, cluster "
392              "from versions "
393              "where serial = (select max(serial) "
394                              "from versions "
395                              "where node = ? and mtime < ?) "
396              "and cluster != ?")
397         execute(q, (node, before, except_cluster))
398         props = fetchone()
399         if props is None:
400             return None
401         mtime = props[MTIME]
402         
403         # First level, just under node (get population).
404         q = ("select count(serial), sum(size), max(mtime) "
405              "from versions v "
406              "where serial = (select max(serial) "
407                              "from versions "
408                              "where node = v.node and mtime < ?) "
409              "and cluster != ? "
410              "and node in (select node "
411                           "from nodes "
412                           "where parent = ?)")
413         execute(q, (before, except_cluster, node))
414         r = fetchone()
415         if r is None:
416             return None
417         count = r[0]
418         mtime = max(mtime, r[2])
419         if count == 0:
420             return (0, 0, mtime)
421         
422         # All children (get size and mtime).
423         # XXX: This is why the full path is stored.
424         q = ("select count(serial), sum(size), max(mtime) "
425              "from versions v "
426              "where serial = (select max(serial) "
427                              "from versions "
428                              "where node = v.node and mtime < ?) "
429              "and cluster != ? "
430              "and node in (select node "
431                           "from nodes "
432                           "where path like ?)")
433         execute(q, (before, except_cluster, path + '%'))
434         r = fetchone()
435         if r is None:
436             return None
437         size = r[1] - props[SIZE]
438         mtime = max(mtime, r[2])
439         return (count, size, mtime)
440     
441     def version_create(self, node, size, source, muser, cluster=0):
442         """Create a new version from the given properties.
443            Return the (serial, mtime) of the new version.
444         """
445         
446         q = ("insert into versions (node, size, source, mtime, muser, cluster) "
447              "values (?, ?, ?, ?, ?, ?)")
448         mtime = time()
449         props = (node, size, source, mtime, muser, cluster)
450         serial = self.execute(q, props).lastrowid
451         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
452         return serial, mtime
453     
454     def version_lookup(self, node, before=inf, cluster=0):
455         """Lookup the current version of the given node.
456            Return a list with its properties:
457            (serial, node, size, source, mtime, muser, cluster)
458            or None if the current version is not found in the given cluster.
459         """
460         
461         q = ("select serial, node, size, source, mtime, muser, cluster "
462              "from versions "
463              "where serial = (select max(serial) "
464                              "from versions "
465                              "where node = ? and mtime < ?) "
466              "and cluster = ?")
467         self.execute(q, (node, before, cluster))
468         props = self.fetchone()
469         if props is not None:
470             return props
471         return None
472     
473     def version_get_properties(self, serial, keys=(), propnames=_propnames):
474         """Return a sequence of values for the properties of
475            the version specified by serial and the keys, in the order given.
476            If keys is empty, return all properties in the order
477            (serial, node, size, source, mtime, muser, cluster).
478         """
479         
480         q = ("select serial, node, size, source, mtime, muser, cluster "
481              "from versions "
482              "where serial = ?")
483         self.execute(q, (serial,))
484         r = self.fetchone()
485         if r is None:
486             return r
487         
488         if not keys:
489             return r
490         return [r[propnames[k]] for k in keys if k in propnames]
491     
492     def version_recluster(self, serial, cluster):
493         """Move the version into another cluster."""
494         
495         props = self.version_get_properties(serial)
496         if not props:
497             return
498         node = props[NODE]
499         size = props[SIZE]
500         oldcluster = props[CLUSTER]
501         if cluster == oldcluster:
502             return
503         
504         mtime = time()
505         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
506         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
507         
508         q = "update versions set cluster = ? where serial = ?"
509         self.execute(q, (cluster, serial))
510     
511     def version_remove(self, serial):
512         """Remove the serial specified."""
513         
514         props = self.node_get_properties(serial)
515         if not props:
516             return
517         node = props[NODE]
518         size = props[SIZE]
519         cluster = props[CLUSTER]
520         
521         mtime = time()
522         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
523         
524         q = "delete from versions where serial = ?"
525         self.execute(q, (serial,))
526         return True
527     
528     def attribute_get(self, serial, keys=()):
529         """Return a list of (key, value) pairs of the version specified by serial.
530            If keys is empty, return all attributes.
531            Othwerise, return only those specified.
532         """
533         
534         execute = self.execute
535         if keys:
536             marks = ','.join('?' for k in keys)
537             q = ("select key, value from attributes "
538                  "where key in (%s) and serial = ?" % (marks,))
539             execute(q, keys + (serial,))
540         else:
541             q = "select key, value from attributes where serial = ?"
542             execute(q, (serial,))
543         return self.fetchall()
544     
545     def attribute_set(self, serial, items):
546         """Set the attributes of the version specified by serial.
547            Receive attributes as an iterable of (key, value) pairs.
548         """
549         
550         q = ("insert or replace into attributes (serial, key, value) "
551              "values (?, ?, ?)")
552         self.executemany(q, ((serial, k, v) for k, v in items))
553     
554     def attribute_del(self, serial, keys=()):
555         """Delete attributes of the version specified by serial.
556            If keys is empty, delete all attributes.
557            Otherwise delete those specified.
558         """
559         
560         if keys:
561             q = "delete from attributes where serial = ? and key = ?"
562             self.executemany(q, ((serial, key) for key in keys))
563         else:
564             q = "delete from attributes where serial = ?"
565             self.execute(q, (serial,))
566     
567     def attribute_copy(self, source, dest):
568         q = ("insert or replace into attributes "
569              "select ?, key, value from attributes "
570              "where serial = ?")
571         self.execute(q, (dest, source))
572     
573     def _construct_filters(self, filterq):
574         if not filterq:
575             return None, None
576         
577         args = filterq.split(',')
578         subq = " and a.key in ("
579         subq += ','.join(('?' for x in args))
580         subq += ")"
581         
582         return subq, args
583     
584     def _construct_paths(self, pathq):
585         if not pathq:
586             return None, None
587         
588         subq = " and ("
589         subq += ' or '.join(('n.path like ?' for x in pathq))
590         subq += ")"
591         args = tuple([x + '%' for x in pathq])
592         
593         return subq, args
594     
595     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
596         """Return a list with all keys pairs defined
597            for all latest versions under parent that
598            do not belong to the cluster.
599         """
600         
601         # TODO: Use another table to store before=inf results.
602         q = ("select distinct a.key "
603              "from attributes a, versions v, nodes n "
604              "where v.serial = (select max(serial) "
605                               "from versions "
606                               "where node = v.node and mtime < ?) "
607              "and v.cluster != ? "
608              "and v.node in (select node "
609                            "from nodes "
610                            "where parent = ?) "
611              "and a.serial = v.serial "
612              "and n.node = v.node")
613         args = (before, except_cluster, parent)
614         subq, subargs = self._construct_paths(pathq)
615         if subq is not None:
616             q += subq
617             args += subargs
618         self.execute(q, args)
619         return [r[0] for r in self.fetchall()]
620     
621     def latest_version_list(self, parent, prefix='', delimiter=None,
622                             start='', limit=10000, before=inf,
623                             except_cluster=0, pathq=[], filterq=None):
624         """Return a (list of (path, serial) tuples, list of common prefixes)
625            for the current versions of the paths with the given parent,
626            matching the following criteria.
627            
628            The property tuple for a version is returned if all
629            of these conditions are true:
630                 
631                 a. parent matches
632                 
633                 b. path > start
634                 
635                 c. path starts with prefix (and paths in pathq)
636                 
637                 d. version is the max up to before
638                 
639                 e. version is not in cluster
640                 
641                 f. the path does not have the delimiter occuring
642                    after the prefix, or ends with the delimiter
643                 
644                 g. serial matches the attribute filter query.
645                    
646                    A filter query is a comma-separated list of
647                    terms in one of these three forms:
648                    
649                    key
650                        an attribute with this key must exist
651                    
652                    !key
653                        an attribute with this key must not exist
654                    
655                    key ?op value
656                        the attribute with this key satisfies the value
657                        where ?op is one of ==, != <=, >=, <, >.
658            
659            The list of common prefixes includes the prefixes
660            matching up to the first delimiter after prefix,
661            and are reported only once, as "virtual directories".
662            The delimiter is included in the prefixes.
663            
664            If arguments are None, then the corresponding matching rule
665            will always match.
666            
667            Limit applies to the first list of tuples returned.
668         """
669         
670         execute = self.execute
671         
672         if not start or start < prefix:
673             start = strprevling(prefix)
674         nextling = strnextling(prefix)
675         
676         q = ("select distinct n.path, v.serial "
677              "from attributes a, versions v, nodes n "
678              "where v.serial = (select max(serial) "
679                               "from versions "
680                               "where node = v.node and mtime < ?) "
681              "and v.cluster != ? "
682              "and v.node in (select node "
683                            "from nodes "
684                            "where parent = ?) "
685              "and a.serial = v.serial "
686              "and n.node = v.node "
687              "and n.path > ? and n.path < ?")
688         args = [before, except_cluster, parent, start, nextling]
689         
690         subq, subargs = self._construct_paths(pathq)
691         if subq is not None:
692             q += subq
693             args += subargs
694         subq, subargs = self._construct_filters(filterq)
695         if subq is not None:
696             q += subq
697             args += subargs
698         else:
699             q = q.replace("attributes a, ", "")
700             q = q.replace("and a.serial = v.serial ", "")
701         q += " order by n.path"
702         
703         if not delimiter:
704             q += " limit ?"
705             args.append(limit)
706             execute(q, args)
707             return self.fetchall(), ()
708         
709         pfz = len(prefix)
710         dz = len(delimiter)
711         count = 0
712         fetchone = self.fetchone
713         prefixes = []
714         pappend = prefixes.append
715         matches = []
716         mappend = matches.append
717         
718         execute(q, args)
719         while True:
720             props = fetchone()
721             if props is None:
722                 break
723             path, serial = props
724             idx = path.find(delimiter, pfz)
725             
726             if idx < 0:
727                 mappend(props)
728                 count += 1
729                 if count >= limit:
730                     break
731                 continue
732             
733             if idx + dz == len(path):
734                 mappend(props)
735                 count += 1
736                 continue # Get one more, in case there is a path.
737             pf = path[:idx + dz]
738             pappend(pf)
739             if count >= limit: 
740                 break
741             
742             args[3] = strnextling(pf) # New start.
743             execute(q, args)
744         
745         return matches, prefixes