7fb10002b49e52c75755e65ed539d5842e6aa79b
[pithos] / pithos / backends / lib / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35
36 from dbworker import DBWorker
37
38
39 ROOTNODE  = 0
40
41 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
42
43 inf = float('inf')
44
45
46 def strnextling(prefix):
47     """return the first unicode string
48        greater than but not starting with given prefix.
49        strnextling('hello') -> 'hellp'
50     """
51     if not prefix:
52         ## all strings start with the null string,
53         ## therefore we have to approximate strnextling('')
54         ## with the last unicode character supported by python
55         ## 0x10ffff for wide (32-bit unicode) python builds
56         ## 0x00ffff for narrow (16-bit unicode) python builds
57         ## We will not autodetect. 0xffff is safe enough.
58         return unichr(0xffff)
59     s = prefix[:-1]
60     c = ord(prefix[-1])
61     if c >= 0xffff:
62         raise RuntimeError
63     s += unichr(c+1)
64     return s
65
66 def strprevling(prefix):
67     """return an approximation of the last unicode string
68        less than but not starting with given prefix.
69        strprevling(u'hello') -> u'helln\\xffff'
70     """
71     if not prefix:
72         ## There is no prevling for the null string
73         return prefix
74     s = prefix[:-1]
75     c = ord(prefix[-1])
76     if c > 0:
77         s += unichr(c-1) + unichr(0xffff)
78     return s
79
80
81 import re
82 _regexfilter = re.compile('(!?)\s*([\w-]+)\s*(=|!=|<=|>=|<|>)?\s*(.*)$', re.UNICODE)
83
84 _propnames = {
85     'serial'    : 0,
86     'node'      : 1,
87     'size'      : 2,
88     'source'    : 3,
89     'mtime'     : 4,
90     'muser'     : 5,
91     'cluster'   : 6,
92 }
93
94
95 class Node(DBWorker):
96     """Nodes store path organization.
97        Versions store object history.
98        Attributes store metadata.
99     """
100     
101     # TODO: Keep size of object in one place.
102     
103     def __init__(self, **params):
104         execute = self.execute
105         
106         execute(""" pragma foreign_keys = on """)
107         
108         execute(""" create table if not exists nodes
109                           ( node       integer primary key,
110                             parent     integer not null default 0,
111                             path       text    not null default '',
112                             foreign key (parent)
113                             references nodes(node)
114                             on update cascade
115                             on delete cascade )""")
116         execute(""" create unique index if not exists idx_nodes_path
117                     on nodes(path) """)
118         
119         execute(""" create table if not exists statistics
120                           ( node       integer not null,
121                             population integer not null default 0,
122                             size       integer not null default 0,
123                             mtime      integer,
124                             muser      text    not null default '',
125                             cluster    integer not null default 0,
126                             primary key (node, cluster)
127                             foreign key (node)
128                             references nodes(node)
129                             on update cascade
130                             on delete cascade )""")
131         
132         execute(""" create table if not exists versions
133                           ( serial     integer primary key,
134                             node       integer not null,
135                             size       integer not null default 0,
136                             source     integer,
137                             mtime      integer,
138                             cluster    integer not null default 0,
139                             foreign key (node)
140                             references nodes(node)
141                             on update cascade
142                             on delete cascade ) """)
143         # execute(""" create index if not exists idx_versions_path
144         #             on nodes(cluster, node, path) """)
145         # execute(""" create index if not exists idx_versions_mtime
146         #             on nodes(mtime) """)
147         
148         execute(""" create table if not exists attributes
149                           ( serial integer,
150                             key    text,
151                             value  text,
152                             primary key (serial, key)
153                             foreign key (serial)
154                             references versions(serial)
155                             on update cascade
156                             on delete cascade ) """)
157         
158         q = "insert or ignore into nodes(node, parent) values (?, ?)"
159         execute(q, (ROOTNODE, ROOTNODE))
160     
161     def node_create(self, parent, path):
162         """Create a new node from the given properties.
163            Return the node identifier of the new node.
164         """
165         
166         q = ("insert into nodes (parent, path) "
167              "values (?, ?)")
168         props = (parent, path)
169         return self.execute(q, props).lastrowid
170     
171     def node_lookup(self, path):
172         """Lookup the current node of the given path.
173            Return None if the path is not found.
174         """
175         
176         q = ("select node from nodes where path = ?")
177         self.execute(q, (path,))
178         r = self.fetchone()
179         if r is not None:
180             return r[0]
181         return None
182     
183     def node_update_ancestors(self, node, population, size, mtime, cluster=0):
184         """Update the population properties of the given node.
185            Population properties keep track the population and total
186            size of objects in the node's namespace.
187            May be zero or positive or negative numbers.
188         """
189         
190         qs = ("select population, size from statistics"
191               "where node = ? and cluster = ?")
192         qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
193               "values (?, ?, ?, ?, ?)")
194         qp = "select parent from nodes where serial = ?"
195         execute = self.execute
196         fetchone = self.fetchone
197         while 1:
198             execute(qs, (node, cluster))
199             r = fetchone()
200             if r is None:
201                 prepopulation, presize = (0, 0)
202             else:
203                 prepopulation, presize = r
204             population += prepopulation
205             size += presize
206             
207             execute(qu, (node, population, size, mtime, cluster))
208             if node == 0:
209                 break
210             
211             population = 0 # Population isn't recursive
212             execute(qp, (node,))
213             r = fetchone()
214             if r is None:
215                 break
216             node = r[0]
217     
218     def node_statistics(self, node, cluster=0):
219         """Return population, total size and last mtime
220            for all versions under node that belong to the cluster.
221         """
222         
223         q = ("select population, size, mtime from statistics"
224              "where node = ? and cluster = ?")
225         self.execute(q, (node, cluster))
226         r = fetchone()
227         if r is None:
228             return (0, 0, 0)
229         return r
230     
231     def node_count_children(self, node):
232         """Return node's child count."""
233         
234         q = "select count(node) from nodes where parent = ? and node != 0"
235         self.execute(q, (node,))
236         r = fetchone()
237         if r is None:
238             return 0
239         return r
240     
241     def node_purge_children(self, parent, before=inf, cluster=0):
242         """Delete all versions with the specified
243            parent and cluster, and return
244            the serials of versions deleted.
245            Clears out nodes with no remaining versions.
246         """
247         
248         execute = self.execute
249         q = ("select count(serial), sum(size) from versions "
250              "where node in (select node "
251                             "from nodes "
252                             "where parent = ?) "
253              "and cluster = ? "
254              "and mtime <= ?")
255         args = (parent, cluster, before)
256         execute(q, args)
257         nr, size = self.fetchone()
258         if not nr:
259             return ()
260         # TODO: Statistics for nodes (children) will be wrong.
261         self.node_update_ancestors(parent, -nr, -size, cluster)
262         
263         q = ("select serial from versions "
264              "where node in (select node "
265                             "from nodes "
266                             "where parent = ?) "
267              "and cluster = ? "
268              "and mtime <= ?")
269         execute(q, args)
270         serials = [r[SERIAL] for r in self.fetchall()]
271         q = ("delete from versions "
272              "where node in (select node "
273                             "from nodes "
274                             "where parent = ?) "
275              "and cluster = ? "
276              "and mtime <= ?")
277         execute(q, args)
278         q = ("delete from nodes n "
279              "where (select count(serial) "
280                     "from versions "
281                     "where node = n.node) = 0 "
282              "and parent = ?")
283         execute(q, parent)
284         return serials
285     
286     def node_purge(self, node, before=inf, cluster=0):
287         """Delete all versions with the specified
288            node and cluster, and return
289            the serials of versions deleted.
290            Clears out the node if it has no remaining versions.
291         """
292         
293         execute = self.execute
294         q = ("select count(serial), sum(size) from versions "
295              "where node = ? "
296              "and cluster = ? "
297              "and mtime <= ?")
298         args = (node, cluster, before)
299         execute(q, args)
300         nr, size = self.fetchone()
301         if not nr:
302             return ()
303         self.node_update_ancestors(node, -nr, -size, cluster)
304         
305         q = ("select serial from versions "
306              "where node = ? "
307              "and cluster = ? "
308              "and mtime <= ?")
309         execute(q, args)
310         serials = [r[SERIAL] for r in self.fetchall()]
311         q = ("delete from versions "
312              "where node = ? "
313              "and cluster = ? "
314              "and mtime <= ?")
315         execute(q, args)
316         q = ("delete from nodes n "
317              "where (select count(serial) "
318                     "from versions "
319                     "where node = n.node) = 0 "
320              "and node = ?")
321         execute(q, node)
322         return serials
323     
324     def node_remove(self, node):
325         """Remove the node specified.
326            Return false if the node has children or is not found.
327         """
328         
329         if self.node_children(node):
330             return False
331         
332         q = "select parent from node where node = ?"
333         self.execute(q, (node,))
334         r = self.fetchone()
335         if r is None:
336             return False
337         parent = r[0]
338         
339         mtime = time()
340         q = "select population, size, cluster from statistics where node = ?"
341         self.execute(q, (node,))
342         for population, size, cluster in self.fetchall():
343             self.node_update_ancestors(parent, -population, -size, mtime, cluster)
344         
345         q = "delete from nodes where node = ?"
346         self.execute(q, (node,))
347         return True
348     
349 #     def node_remove(self, serial, recursive=0):
350 #         """Remove the node specified by serial.
351 #            Return false if the node is not found,
352 #            or has ancestors and recursive is not set.
353 #         """
354 #         
355 #         props = self.node_get_properties(serial)
356 #         if props is None:
357 #             return False
358 #         size = props[SIZE]
359 #         parent = props[PARENT]
360 #         pop = props[POPULATION]
361 #         popsize = props[POPSIZE]
362 #         if pop and not recursive:
363 #             return False
364 #         
365 #         q = ("delete from nodes where serial = ?")
366 #         self.execute(q, (serial,))
367 #         self.node_update_ancestors(parent, -pop-1, -size-popsize)
368 #         return True
369     
370     def version_create(self, node, size, source, muser, cluster=0):
371         """Create a new version from the given properties.
372            Return the (serial, mtime) of the new version.
373         """
374         
375         q = ("insert into nodes (node, size, source, mtime, muser, cluster) "
376              "values (?, ?, ?, ?, ?)")
377         mtime = time()
378         props = (node, path, size, source, mtime, muser, cluster)
379         serial = self.execute(q, props).lastrowid
380         self.node_update_ancestors(node, 1, size, mtime, cluster)
381         return serial, mtime
382     
383     def version_lookup(self, node, before=inf, cluster=0):
384         """Lookup the current version of the given node.
385            Return a list with its properties:
386            (serial, node, size, source, mtime, muser, cluster)
387            or None if the current version is not found in the given cluster.
388         """
389         
390         q = ("select serial, node, size, source, mtime, muser, cluster "
391              "from versions "
392              "where serial = (select max(serial) "
393                              "from versions "
394                              "where node = ? and mtime < ?) "
395              "and cluster = ?")
396         self.execute(q, (node, before, cluster))
397         props = self.fetchone()
398         if props is not None:
399             return props
400         return None
401     
402     def version_get_properties(self, serial, keys=(), propnames=_propnames):
403         """Return a sequence of values for the properties of
404            the version specified by serial and the keys, in the order given.
405            If keys is empty, return all properties in the order
406            (serial, node, size, source, mtime, muser, cluster).
407         """
408         
409         q = ("select serial, node, path, size, source, mtime, muser, cluster "
410              "from nodes "
411              "where serial = ?")
412         self.execute(q, (serial,))
413         r = self.fetchone()
414         if r is None:
415             return r
416         
417         if not keys:
418             return r
419         return [r[propnames[k]] for k in keys if k in propnames]
420     
421 #     def node_set_properties(self, serial, items, propnames=_mutablepropnames):
422 #         """Set the properties of a node specified by the node serial and
423 #            the items iterable of (name, value) pairs.
424 #            Mutable properties are %s.
425 #            Invalid property names and 'serial' are not set.
426 #         """ % (_mutables,)
427 #         
428 #         if not items:
429 #             return
430 #         
431 #         keys, vals = zip(*items)
432 #         keystr = ','.join(("%s = ?" % k) for k in keys if k in propnames)
433 #         if not keystr:
434 #             return
435 #         q = "update nodes set %s where serial = ?" % keystr
436 #         vals += (serial,)
437 #         self.execute(q, vals)
438     
439     def version_recluster(self, serial, cluster):
440         """Move the version into another cluster."""
441         
442         props = self.node_get_properties(source)
443         node = props[NODE]
444         size = props[SIZE]
445         mtime = props[MTIME]
446         oldcluster = props[CLUSTER]
447         if cluster == oldcluster:
448             return
449         
450         self.node_update_ancestors(node, -1, -size, mtime, oldcluster)
451         self.node_update_ancestors(node, 1, size, mtime, cluster)
452
453         q = "update nodes set cluster = ? where serial = ?"
454         self.execute(q, (cluster, serial))
455     
456 #     def version_copy(self, serial, node, muser, copy_attr=True):
457 #         """Copy the version specified by serial into
458 #            a new version of node. Optionally copy attributes.
459 #            Return the (serial, mtime) of the new version.
460 #         """
461 #         
462 #         props = self.version_get_properties(serial)
463 #         if props is None:
464 #             return None
465 #         size = props[SIZE]
466 #         cluster = props[CLUSTER]
467 #         new_serial, mtime = self.version_create(node, size, serial, muser, cluster)
468 #         if copy_attr:
469 #             self.attribute_copy(serial, new_serial)
470 #         return (new_serial, mtime)
471     
472     def path_statistics(self, prefix, before=inf, except_cluster=0):
473         """Return population, total size and last mtime
474            for all latest versions under prefix that
475            do not belong to the cluster.
476         """
477         
478         q = ("select count(serial), sum(size), max(mtime) "
479              "from versions v "
480              "where serial = (select max(serial) "
481                              "from versions "
482                              "where node = v.node and mtime < ?) "
483              "and cluster != ? "
484              "and node in (select node "
485                           "from nodes "
486                           "where path like ?)")
487         self.execute(q, (before, except_cluster, prefix + '%'))
488         r = fetchone()
489         if r is None:
490             return (0, 0, 0)
491         return r
492     
493     def parse_filters(self, filterq):
494         preterms = filterq.split(',')
495         included = []
496         excluded = []
497         opers = []
498         match = _regexfilter.match
499         for term in preterms:
500             m = match(term)
501             if m is None:
502                 continue
503             neg, key, op, value = m.groups()
504             if neg:
505                 excluded.append(key)
506             elif not value:
507                 included.append(key)
508             elif op:
509                 opers.append((key, op, value))
510         
511         return included, excluded, opers
512     
513     def construct_filters(self, filterq):
514         subqlist = []
515         append = subqlist.append
516         included, excluded, opers = self.parse_filters(filterq)
517         args = []
518         
519         if included:
520             subq = "key in ("
521             subq += ','.join(('?' for x in included)) + ")"
522             args += included
523             append(subq)
524         
525         if excluded:
526             subq = "key not in ("
527             subq += ','.join(('?' for x in exluded)) + ")"
528             args += excluded
529             append(subq)
530         
531         if opers:
532             t = (("(key = %s and value %s %s)" % (k, o, v)) for k, o, v in opers)
533             subq = "(" + ' or '.join(t) + ")"
534             args += opers
535         
536         if not subqlist:
537             return None, None
538         
539         subq = " and serial in (select serial from attributes where "
540         subq += ' and '.join(subqlist)
541         subq += ")"
542         
543         return subq, args
544     
545 #     def node_list(self, parent, prefix='',
546 #                    start='', delimiter=None,
547 #                    after=0.0, before=inf,
548 #                    filterq=None, versions=0,
549 #                    cluster=0, limit=10000):
550 #         """Return (a list of property tuples, a list of common prefixes)
551 #            for the current versions of the paths with the given parent,
552 #            matching the following criteria.
553 #            
554 #            The property tuple for a version is returned if all
555 #            of these conditions are true:
556 #            
557 #                 a. parent (and cluster) matches
558 #                 
559 #                 b. path > start
560 #                 
561 #                 c. path starts with prefix
562 #                 
563 #                 d. i  [versions=true]  version is in (after, before)
564 #                    ii [versions=false] version is the max in (after, before)
565 #                 
566 #                 e. the path does not have the delimiter occuring
567 #                    after the prefix.
568 #                 
569 #                 f. serial matches the attribute filter query.
570 #                    
571 #                    A filter query is a comma-separated list of
572 #                    terms in one of these three forms:
573 #                    
574 #                    key
575 #                        an attribute with this key must exist
576 #                    
577 #                    !key
578 #                        an attribute with this key must not exist
579 #                    
580 #                    key ?op value
581 #                        the attribute with this key satisfies the value
582 #                        where ?op is one of ==, != <=, >=, <, >.
583 #            
584 #            matching up to the first delimiter after prefix,
585 #            and are reported only once, as "virtual directories".
586 #            The delimiter is included in the prefixes.
587 #            Prefixes do appear from (e) even if no paths would match in (f).
588 #            
589 #            If arguments are None, then the corresponding matching rule
590 #            will always match.
591 #         """
592 #         
593 #         execute = self.execute
594
595 #         if start < prefix:
596 #             start = strprevling(prefix)
597
598 #         nextling = strnextling(prefix)
599
600 #         q = ("select serial, parent, path, size, "
601 #                     "population, popsize, source, mtime, cluster "
602 #              "from nodes "
603 #              "where parent = ? and path > ? and path < ? "
604 #              "and mtime > ? and mtime < ? and cluster = ?")
605 #         args = [parent, start, nextling, after, before, cluster]
606
607 #         if filterq:
608 #             subq, subargs = self.construct_filters(filterq)
609 #             if subq is not None:
610 #                 q += subq
611 #                 args += subargs
612 #         q += " order by path"
613
614 #         if delimiter is None:
615 #             q += " limit ?"
616 #             args.append(limit)
617 #             execute(q, args)
618 #             return self.fetchall(), ()
619
620 #         pfz = len(prefix)
621 #         dz = len(delimiter)
622 #         count = 0
623 #         fetchone = self.fetchone
624 #         prefixes = []
625 #         pappend = prefixes.append
626 #         matches = []
627 #         mappend = matches.append
628 #         
629 #         execute(q, args)
630 #         while 1:
631 #             props = fetchone()
632 #             if props is None:
633 #                 break
634 #             path = props[PATH]
635 #             idx = path.find(delimiter, pfz)
636 #             if idx < 0:
637 #                 mappend(props)
638 #                 count += 1
639 #                 if count >= limit:
640 #                     break
641 #                 continue
642
643 #             pf = path[:idx + dz]
644 #             pappend(pf)
645 #             count += 1
646 #             ## XXX: if we break here due to limit,
647 #             ##      but a path would also be matched below,
648 #             ##      the path match would be lost since the
649 #             ##      next call with start=path would skip both of them.
650 #             ##      In this case, it is impossible to obey the limit,
651 #             ##      therefore we will break later, at limit + 1.
652 #             if idx + dz == len(path):
653 #                 mappend(props)
654 #                 count += 1
655
656 #             if count >= limit: 
657 #                 break
658
659 #             args[1] = strnextling(pf) # new start
660 #             execute(q, args)
661
662 #         return matches, prefixes
663     
664     def attribute_get(self, serial, keys=()):
665         """Return a list of (key, value) pairs of the version specified by serial.
666            If keys is empty, return all attributes.
667            Othwerise, return only those specified.
668         """
669         
670         execute = self.execute
671         if keys:
672             marks = ','.join('?' for k in keys)
673             q = ("select key, value from attributes "
674                  "where key in (%s) and serial = ?" % (marks,))
675             execute(q, keys + (serial,))
676         else:
677             q = "select key, value from attributes where serial = ?"
678             execute(q, (serial,))
679         return self.fetchall()
680     
681     def attribute_set(self, serial, items):
682         """Set the attributes of the version specified by serial.
683            Receive attributes as an iterable of (key, value) pairs.
684         """
685         
686         q = ("insert or replace into attributes (serial, key, value) "
687              "values (?, ?, ?)")
688         self.executemany(q, ((serial, k, v) for k, v in items))
689     
690     def attribute_del(self, serial, keys=()):
691         """Delete attributes of the version specified by serial.
692            If keys is empty, delete all attributes.
693            Otherwise delete those specified.
694         """
695         
696         if keys:
697             q = "delete from attributes where serial = ? and key = ?"
698             self.executemany(q, ((serial, key) for key in keys))
699         else:
700             q = "delete from attributes where serial = ?"
701             self.execute(q, (serial,))
702     
703 #     def node_get_attribute_keys(self, parent):
704 #         """Return a list with all keys pairs defined
705 #            for the namespace of the node specified.
706 #         """
707 #         
708 #         q = ("select distinct key from attributes a, versions v, nodes n "
709 #              "where a.serial = v.serial and v.node = n.node and n.parent = ?")
710 #         self.execute(q, (parent,))
711 #         return [r[0] for r in self.fetchall()]
712     
713     def attribute_copy(self, source, dest):
714         q = ("insert or replace into attributes "
715              "select ?, key, value from attributes "
716              "where serial = ?")
717         self.execute(q, (dest, source))