6879b7048249cc78d46a1e0cb20d541aced2e629
[pithos] / pithos / backends / lib / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35
36 from dbworker import DBWorker
37
38
39 ROOTNODE  = 0
40
41 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
42
43 inf = float('inf')
44
45
46 def strnextling(prefix):
47     """return the first unicode string
48        greater than but not starting with given prefix.
49        strnextling('hello') -> 'hellp'
50     """
51     if not prefix:
52         ## all strings start with the null string,
53         ## therefore we have to approximate strnextling('')
54         ## with the last unicode character supported by python
55         ## 0x10ffff for wide (32-bit unicode) python builds
56         ## 0x00ffff for narrow (16-bit unicode) python builds
57         ## We will not autodetect. 0xffff is safe enough.
58         return unichr(0xffff)
59     s = prefix[:-1]
60     c = ord(prefix[-1])
61     if c >= 0xffff:
62         raise RuntimeError
63     s += unichr(c+1)
64     return s
65
66 def strprevling(prefix):
67     """return an approximation of the last unicode string
68        less than but not starting with given prefix.
69        strprevling(u'hello') -> u'helln\\xffff'
70     """
71     if not prefix:
72         ## There is no prevling for the null string
73         return prefix
74     s = prefix[:-1]
75     c = ord(prefix[-1])
76     if c > 0:
77         s += unichr(c-1) + unichr(0xffff)
78     return s
79
80
81 import re
82 _regexfilter = re.compile('(!?)\s*([\w-]+)\s*(=|!=|<=|>=|<|>)?\s*(.*)$', re.UNICODE)
83
84 _propnames = {
85     'serial'    : 0,
86     'node'      : 1,
87     'size'      : 2,
88     'source'    : 3,
89     'mtime'     : 4,
90     'muser'     : 5,
91     'cluster'   : 6,
92 }
93
94
95 class Node(DBWorker):
96     """Nodes store path organization.
97        Versions store object history.
98        Attributes store metadata.
99     """
100     
101     # TODO: Keep size of object in one place.
102     
103     def __init__(self, **params):
104         DBWorker.__init__(self, **params)
105         execute = self.execute
106         
107         execute(""" pragma foreign_keys = on """)
108         
109         execute(""" create table if not exists nodes
110                           ( node       integer primary key,
111                             parent     integer default 0,
112                             path       text    not null default '',
113                             foreign key (parent)
114                             references nodes(node)
115                             on update cascade
116                             on delete cascade )""")
117         execute(""" create unique index if not exists idx_nodes_path
118                     on nodes(path) """)
119         
120         execute(""" create table if not exists statistics
121                           ( node       integer,
122                             population integer not null default 0,
123                             size       integer not null default 0,
124                             mtime      integer,
125                             cluster    integer not null default 0,
126                             primary key (node, cluster)
127                             foreign key (node)
128                             references nodes(node)
129                             on update cascade
130                             on delete cascade )""")
131         
132         execute(""" create table if not exists versions
133                           ( serial     integer primary key,
134                             node       integer,
135                             size       integer not null default 0,
136                             source     integer,
137                             mtime      integer,
138                             muser      text    not null default '',
139                             cluster    integer not null default 0,
140                             foreign key (node)
141                             references nodes(node)
142                             on update cascade
143                             on delete cascade ) """)
144         # execute(""" create index if not exists idx_versions_path
145         #             on nodes(cluster, node, path) """)
146         # execute(""" create index if not exists idx_versions_mtime
147         #             on nodes(mtime) """)
148         
149         execute(""" create table if not exists attributes
150                           ( serial integer,
151                             key    text,
152                             value  text,
153                             primary key (serial, key)
154                             foreign key (serial)
155                             references versions(serial)
156                             on update cascade
157                             on delete cascade ) """)
158         
159         q = "insert or ignore into nodes(node, parent) values (?, ?)"
160         execute(q, (ROOTNODE, ROOTNODE))
161     
162     def node_create(self, parent, path):
163         """Create a new node from the given properties.
164            Return the node identifier of the new node.
165         """
166         
167         q = ("insert into nodes (parent, path) "
168              "values (?, ?)")
169         props = (parent, path)
170         return self.execute(q, props).lastrowid
171     
172     def node_lookup(self, path):
173         """Lookup the current node of the given path.
174            Return None if the path is not found.
175         """
176         
177         q = "select node from nodes where path = ?"
178         self.execute(q, (path,))
179         r = self.fetchone()
180         if r is not None:
181             return r[0]
182         return None
183     
184     def node_get_properties(self, node):
185         """Return the node's (parent, path).
186            Return None if the node is not found.
187         """
188         
189         q = "select parent, path from nodes where node = ?"
190         self.execute(q, (node,))
191         return self.fetchone()
192     
193     def node_count_children(self, node):
194         """Return node's child count."""
195         
196         q = "select count(node) from nodes where parent = ? and node != 0"
197         self.execute(q, (node,))
198         r = self.fetchone()
199         if r is None:
200             return 0
201         return r[0]
202     
203     def node_purge_children(self, parent, before=inf, cluster=0):
204         """Delete all versions with the specified
205            parent and cluster, and return
206            the serials of versions deleted.
207            Clears out nodes with no remaining versions.
208         """
209         
210         execute = self.execute
211         q = ("select count(serial), sum(size) from versions "
212              "where node in (select node "
213                             "from nodes "
214                             "where parent = ?) "
215              "and cluster = ? "
216              "and mtime <= ?")
217         args = (parent, cluster, before)
218         execute(q, args)
219         nr, size = self.fetchone()
220         if not nr:
221             return ()
222         self.statistics_update(parent, -nr, -size, cluster)
223         self.statistics_update_ancestors(parent, -nr, -size, cluster)
224         
225         q = ("select serial from versions "
226              "where node in (select node "
227                             "from nodes "
228                             "where parent = ?) "
229              "and cluster = ? "
230              "and mtime <= ?")
231         execute(q, args)
232         serials = [r[SERIAL] for r in self.fetchall()]
233         q = ("delete from versions "
234              "where node in (select node "
235                             "from nodes "
236                             "where parent = ?) "
237              "and cluster = ? "
238              "and mtime <= ?")
239         execute(q, args)
240         q = ("delete from nodes "
241              "where node in (select node from nodes n "
242                             "where (select count(serial) "
243                                    "from versions "
244                                    "where node = n.node) = 0 "
245                             "and parent = ?)")
246         execute(q, (parent,))
247         return serials
248     
249     def node_purge(self, node, before=inf, cluster=0):
250         """Delete all versions with the specified
251            node and cluster, and return
252            the serials of versions deleted.
253            Clears out the node if it has no remaining versions.
254         """
255         
256         execute = self.execute
257         q = ("select count(serial), sum(size) from versions "
258              "where node = ? "
259              "and cluster = ? "
260              "and mtime <= ?")
261         args = (node, cluster, before)
262         execute(q, args)
263         nr, size = self.fetchone()
264         if not nr:
265             return ()
266         self.statistics_update_ancestors(node, -nr, -size, cluster)
267         
268         q = ("select serial from versions "
269              "where node = ? "
270              "and cluster = ? "
271              "and mtime <= ?")
272         execute(q, args)
273         serials = [r[SERIAL] for r in self.fetchall()]
274         q = ("delete from versions "
275              "where node = ? "
276              "and cluster = ? "
277              "and mtime <= ?")
278         execute(q, args)
279         q = ("delete from nodes "
280              "where node in (select node from nodes n "
281                             "where (select count(serial) "
282                                    "from versions "
283                                    "where node = n.node) = 0 "
284                             "and node = ?)")
285         execute(q, (node,))
286         return serials
287     
288     def node_remove(self, node):
289         """Remove the node specified.
290            Return false if the node has children or is not found.
291         """
292         
293         if self.node_count_children(node):
294             return False
295         
296         mtime = time()
297         q = ("select count(serial), sum(size), cluster "
298              "from versions "
299              "where node = ? "
300              "group by cluster")
301         self.execute(q, (node,))
302         for population, size, cluster in self.fetchall():
303             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
304         
305         q = "delete from nodes where node = ?"
306         self.execute(q, (node,))
307         return True
308     
309     def statistics_get(self, node, cluster=0):
310         """Return population, total size and last mtime
311            for all versions under node that belong to the cluster.
312         """
313         
314         q = ("select population, size, mtime from statistics "
315              "where node = ? and cluster = ?")
316         self.execute(q, (node, cluster))
317         return self.fetchone()
318     
319     def statistics_update(self, node, population, size, mtime, cluster=0):
320         """Update the statistics of the given node.
321            Statistics keep track the population, total
322            size of objects and mtime in the node's namespace.
323            May be zero or positive or negative numbers.
324         """
325         
326         qs = ("select population, size from statistics "
327               "where node = ? and cluster = ?")
328         qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
329               "values (?, ?, ?, ?, ?)")
330         self.execute(qs, (node, cluster))
331         r = self.fetchone()
332         if r is None:
333             prepopulation, presize = (0, 0)
334         else:
335             prepopulation, presize = r
336         population += prepopulation
337         size += presize
338         self.execute(qu, (node, population, size, mtime, cluster))
339     
340     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
341         """Update the statistics of the given node's parent.
342            Then recursively update all parents up to the root.
343            Population is not recursive.
344         """
345         
346         while True:
347             if node == 0:
348                 break
349             props = self.node_get_properties(node)
350             if props is None:
351                 break
352             parent, path = props
353             self.statistics_update(parent, population, size, mtime, cluster)
354             node = parent
355             population = 0 # Population isn't recursive
356     
357     def statistics_latest(self, node, before=inf, except_cluster=0):
358         """Return population, total size and last mtime
359            for all latest versions under node that
360            do not belong to the cluster.
361         """
362         
363         # The node.
364         props = self.node_get_properties(node)
365         if props is None:
366             return None
367         parent, path = props
368         
369         # The latest version.
370         q = ("select serial, node, size, source, mtime, muser, cluster "
371              "from versions "
372              "where serial = (select max(serial) "
373                              "from versions "
374                              "where node = ? and mtime < ?) "
375              "and cluster != ?")
376         self.execute(q, (node, before, except_cluster))
377         props = self.fetchone()
378         if props is None:
379             return None
380         mtime = props[MTIME]
381         
382         # First level, just under node (get population).
383         q = ("select count(serial), sum(size), max(mtime) "
384              "from versions v "
385              "where serial = (select max(serial) "
386                              "from versions "
387                              "where node = v.node and mtime < ?) "
388              "and cluster != ? "
389              "and node in (select node "
390                           "from nodes "
391                           "where parent = ?)")
392         self.execute(q, (before, except_cluster, parent))
393         r = fetchone()
394         if r is None:
395             return None
396         count = r[0]
397         mtime = max(mtime, r[2])
398         if count == 0:
399             return (0, 0, mtime)
400         
401         # All children (get size and mtime).
402         q = ("select count(serial), sum(size), max(mtime) "
403              "from versions v "
404              "where serial = (select max(serial) "
405                              "from versions "
406                              "where node = v.node and mtime < ?) "
407              "and cluster != ? "
408              "and node in (select node "
409                           "from nodes "
410                           "where path like ?)")
411         self.execute(q, (before, except_cluster, path + '%'))
412         r = fetchone()
413         if r is None:
414             return None
415         size = r[1] - props[SIZE]
416         mtime = max(mtime, r[2])
417         return (count, size, mtime)
418     
419     def version_create(self, node, size, source, muser, cluster=0):
420         """Create a new version from the given properties.
421            Return the (serial, mtime) of the new version.
422         """
423         
424         q = ("insert into versions (node, size, source, mtime, muser, cluster) "
425              "values (?, ?, ?, ?, ?, ?)")
426         mtime = time()
427         props = (node, size, source, mtime, muser, cluster)
428         serial = self.execute(q, props).lastrowid
429         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
430         return serial, mtime
431     
432     def version_lookup(self, node, before=inf, cluster=0):
433         """Lookup the current version of the given node.
434            Return a list with its properties:
435            (serial, node, size, source, mtime, muser, cluster)
436            or None if the current version is not found in the given cluster.
437         """
438         
439         q = ("select serial, node, size, source, mtime, muser, cluster "
440              "from versions "
441              "where serial = (select max(serial) "
442                              "from versions "
443                              "where node = ? and mtime < ?) "
444              "and cluster = ?")
445         self.execute(q, (node, before, cluster))
446         props = self.fetchone()
447         if props is not None:
448             return props
449         return None
450     
451     def version_get_properties(self, serial, keys=(), propnames=_propnames):
452         """Return a sequence of values for the properties of
453            the version specified by serial and the keys, in the order given.
454            If keys is empty, return all properties in the order
455            (serial, node, size, source, mtime, muser, cluster).
456         """
457         
458         q = ("select serial, node, size, source, mtime, muser, cluster "
459              "from versions "
460              "where serial = ?")
461         self.execute(q, (serial,))
462         r = self.fetchone()
463         if r is None:
464             return r
465         
466         if not keys:
467             return r
468         return [r[propnames[k]] for k in keys if k in propnames]
469     
470     def version_recluster(self, serial, cluster):
471         """Move the version into another cluster."""
472         
473         props = self.version_get_properties(serial)
474         if not props:
475             return
476         node = props[NODE]
477         size = props[SIZE]
478         oldcluster = props[CLUSTER]
479         if cluster == oldcluster:
480             return
481         
482         mtime = time()
483         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
484         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
485         
486         q = "update versions set cluster = ? where serial = ?"
487         self.execute(q, (cluster, serial))
488     
489     def version_remove(self, serial):
490         """Remove the serial specified."""
491         
492         props = self.node_get_properties(serial)
493         if not props:
494             return
495         node = props[NODE]
496         size = props[SIZE]
497         cluster = props[CLUSTER]
498         
499         mtime = time()
500         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
501         
502         q = "delete from versions where serial = ?"
503         self.execute(q, (serial,))
504         return True
505     
506     def parse_filters(self, filterq):
507         preterms = filterq.split(',')
508         included = []
509         excluded = []
510         opers = []
511         match = _regexfilter.match
512         for term in preterms:
513             m = match(term)
514             if m is None:
515                 continue
516             neg, key, op, value = m.groups()
517             if neg:
518                 excluded.append(key)
519             elif not value:
520                 included.append(key)
521             elif op:
522                 opers.append((key, op, value))
523         
524         return included, excluded, opers
525     
526     def construct_filters(self, filterq):
527         subqlist = []
528         append = subqlist.append
529         included, excluded, opers = self.parse_filters(filterq)
530         args = []
531         
532         if included:
533             subq = "key in ("
534             subq += ','.join(('?' for x in included)) + ")"
535             args += included
536             append(subq)
537         
538         if excluded:
539             subq = "key not in ("
540             subq += ','.join(('?' for x in exluded)) + ")"
541             args += excluded
542             append(subq)
543         
544         if opers:
545             t = (("(key = %s and value %s %s)" % (k, o, v)) for k, o, v in opers)
546             subq = "(" + ' or '.join(t) + ")"
547             args += opers
548         
549         if not subqlist:
550             return None, None
551         
552         subq = " and serial in (select serial from attributes where "
553         subq += ' and '.join(subqlist)
554         subq += ")"
555         
556         return subq, args
557     
558 #     def node_list(self, parent, prefix='',
559 #                    start='', delimiter=None,
560 #                    after=0.0, before=inf,
561 #                    filterq=None, versions=0,
562 #                    cluster=0, limit=10000):
563 #         """Return (a list of property tuples, a list of common prefixes)
564 #            for the current versions of the paths with the given parent,
565 #            matching the following criteria.
566 #            
567 #            The property tuple for a version is returned if all
568 #            of these conditions are true:
569 #            
570 #                 a. parent (and cluster) matches
571 #                 
572 #                 b. path > start
573 #                 
574 #                 c. path starts with prefix
575 #                 
576 #                 d. i  [versions=true]  version is in (after, before)
577 #                    ii [versions=false] version is the max in (after, before)
578 #                 
579 #                 e. the path does not have the delimiter occuring
580 #                    after the prefix.
581 #                 
582 #                 f. serial matches the attribute filter query.
583 #                    
584 #                    A filter query is a comma-separated list of
585 #                    terms in one of these three forms:
586 #                    
587 #                    key
588 #                        an attribute with this key must exist
589 #                    
590 #                    !key
591 #                        an attribute with this key must not exist
592 #                    
593 #                    key ?op value
594 #                        the attribute with this key satisfies the value
595 #                        where ?op is one of ==, != <=, >=, <, >.
596 #            
597 #            matching up to the first delimiter after prefix,
598 #            and are reported only once, as "virtual directories".
599 #            The delimiter is included in the prefixes.
600 #            Prefixes do appear from (e) even if no paths would match in (f).
601 #            
602 #            If arguments are None, then the corresponding matching rule
603 #            will always match.
604 #         """
605 #         
606 #         execute = self.execute
607
608 #         if start < prefix:
609 #             start = strprevling(prefix)
610
611 #         nextling = strnextling(prefix)
612
613 #         q = ("select serial, parent, path, size, "
614 #                     "population, popsize, source, mtime, cluster "
615 #              "from nodes "
616 #              "where parent = ? and path > ? and path < ? "
617 #              "and mtime > ? and mtime < ? and cluster = ?")
618 #         args = [parent, start, nextling, after, before, cluster]
619
620 #         if filterq:
621 #             subq, subargs = self.construct_filters(filterq)
622 #             if subq is not None:
623 #                 q += subq
624 #                 args += subargs
625 #         q += " order by path"
626
627 #         if delimiter is None:
628 #             q += " limit ?"
629 #             args.append(limit)
630 #             execute(q, args)
631 #             return self.fetchall(), ()
632
633 #         pfz = len(prefix)
634 #         dz = len(delimiter)
635 #         count = 0
636 #         fetchone = self.fetchone
637 #         prefixes = []
638 #         pappend = prefixes.append
639 #         matches = []
640 #         mappend = matches.append
641 #         
642 #         execute(q, args)
643 #         while 1:
644 #             props = fetchone()
645 #             if props is None:
646 #                 break
647 #             path = props[PATH]
648 #             idx = path.find(delimiter, pfz)
649 #             if idx < 0:
650 #                 mappend(props)
651 #                 count += 1
652 #                 if count >= limit:
653 #                     break
654 #                 continue
655
656 #             pf = path[:idx + dz]
657 #             pappend(pf)
658 #             count += 1
659 #             ## XXX: if we break here due to limit,
660 #             ##      but a path would also be matched below,
661 #             ##      the path match would be lost since the
662 #             ##      next call with start=path would skip both of them.
663 #             ##      In this case, it is impossible to obey the limit,
664 #             ##      therefore we will break later, at limit + 1.
665 #             if idx + dz == len(path):
666 #                 mappend(props)
667 #                 count += 1
668
669 #             if count >= limit: 
670 #                 break
671
672 #             args[1] = strnextling(pf) # new start
673 #             execute(q, args)
674
675 #         return matches, prefixes
676     
677     def attribute_get(self, serial, keys=()):
678         """Return a list of (key, value) pairs of the version specified by serial.
679            If keys is empty, return all attributes.
680            Othwerise, return only those specified.
681         """
682         
683         execute = self.execute
684         if keys:
685             marks = ','.join('?' for k in keys)
686             q = ("select key, value from attributes "
687                  "where key in (%s) and serial = ?" % (marks,))
688             execute(q, keys + (serial,))
689         else:
690             q = "select key, value from attributes where serial = ?"
691             execute(q, (serial,))
692         return self.fetchall()
693     
694     def attribute_set(self, serial, items):
695         """Set the attributes of the version specified by serial.
696            Receive attributes as an iterable of (key, value) pairs.
697         """
698         
699         q = ("insert or replace into attributes (serial, key, value) "
700              "values (?, ?, ?)")
701         self.executemany(q, ((serial, k, v) for k, v in items))
702     
703     def attribute_del(self, serial, keys=()):
704         """Delete attributes of the version specified by serial.
705            If keys is empty, delete all attributes.
706            Otherwise delete those specified.
707         """
708         
709         if keys:
710             q = "delete from attributes where serial = ? and key = ?"
711             self.executemany(q, ((serial, key) for key in keys))
712         else:
713             q = "delete from attributes where serial = ?"
714             self.execute(q, (serial,))
715     
716 #     def node_get_attribute_keys(self, parent):
717 #         """Return a list with all keys pairs defined
718 #            for the namespace of the node specified.
719 #         """
720 #         
721 #         q = ("select distinct key from attributes a, versions v, nodes n "
722 #              "where a.serial = v.serial and v.node = n.node and n.parent = ?")
723 #         self.execute(q, (parent,))
724 #         return [r[0] for r in self.fetchall()]
725     
726     def attribute_copy(self, source, dest):
727         q = ("insert or replace into attributes "
728              "select ?, key, value from attributes "
729              "where serial = ?")
730         self.execute(q, (dest, source))