Modular backend progress.
[pithos] / pithos / backends / lib / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35
36 from dbworker import DBWorker
37
38
39 ROOTNODE  = 0
40
41 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
42
43 inf = float('inf')
44
45
46 def strnextling(prefix):
47     """return the first unicode string
48        greater than but not starting with given prefix.
49        strnextling('hello') -> 'hellp'
50     """
51     if not prefix:
52         ## all strings start with the null string,
53         ## therefore we have to approximate strnextling('')
54         ## with the last unicode character supported by python
55         ## 0x10ffff for wide (32-bit unicode) python builds
56         ## 0x00ffff for narrow (16-bit unicode) python builds
57         ## We will not autodetect. 0xffff is safe enough.
58         return unichr(0xffff)
59     s = prefix[:-1]
60     c = ord(prefix[-1])
61     if c >= 0xffff:
62         raise RuntimeError
63     s += unichr(c+1)
64     return s
65
66 def strprevling(prefix):
67     """return an approximation of the last unicode string
68        less than but not starting with given prefix.
69        strprevling(u'hello') -> u'helln\\xffff'
70     """
71     if not prefix:
72         ## There is no prevling for the null string
73         return prefix
74     s = prefix[:-1]
75     c = ord(prefix[-1])
76     if c > 0:
77         s += unichr(c-1) + unichr(0xffff)
78     return s
79
80
81 import re
82 _regexfilter = re.compile('(!?)\s*([\w-]+)\s*(=|!=|<=|>=|<|>)?\s*(.*)$', re.UNICODE)
83
84 _propnames = {
85     'serial'    : 0,
86     'node'      : 1,
87     'size'      : 2,
88     'source'    : 3,
89     'mtime'     : 4,
90     'muser'     : 5,
91     'cluster'   : 6,
92 }
93
94
95 class Node(DBWorker):
96     """Nodes store path organization.
97        Versions store object history.
98        Attributes store metadata.
99     """
100     
101     def __init__(self, **params):
102         execute = self.execute
103         
104         execute(""" pragma foreign_keys = on """)
105         
106         execute(""" create table if not exists nodes
107                           ( node       integer primary key,
108                             parent     integer not null default 0,
109                             path       text    not null default '',
110                             foreign key (parent)
111                             references nodes(node)
112                             on update cascade
113                             on delete cascade )""")
114         execute(""" create unique index if not exists idx_nodes_path
115                     on nodes(path) """)
116         
117         execute(""" create table if not exists statistics
118                           ( node       integer not null,
119                             population integer not null default 0,
120                             size       integer not null default 0,
121                             mtime      integer,
122                             muser      text    not null default '',
123                             cluster    integer not null default 0,
124                             primary key (node, cluster)
125                             foreign key (node)
126                             references nodes(node)
127                             on update cascade
128                             on delete cascade )""")
129         
130         execute(""" create table if not exists versions
131                           ( serial     integer primary key,
132                             node       integer not null,
133                             size       integer not null default 0,
134                             source     integer,
135                             mtime      integer,
136                             cluster    integer not null default 0,
137                             foreign key (node)
138                             references nodes(node)
139                             on update cascade
140                             on delete cascade ) """)
141         # execute(""" create index if not exists idx_versions_path
142         #             on nodes(cluster, node, path) """)
143         # execute(""" create index if not exists idx_versions_mtime
144         #             on nodes(mtime) """)
145         
146         execute(""" create table if not exists attributes
147                           ( serial integer,
148                             key    text,
149                             value  text,
150                             primary key (serial, key)
151                             foreign key (serial)
152                             references versions(serial)
153                             on update cascade
154                             on delete cascade ) """)
155         
156         q = "insert or ignore into nodes(node, parent) values (?, ?)"
157         execute(q, (ROOTNODE, ROOTNODE))
158     
159     def node_create(self, parent, path):
160         """Create a new node from the given properties.
161            Return the node identifier of the new node.
162         """
163         
164         q = ("insert into nodes (parent, path) "
165              "values (?, ?)")
166         props = (parent, path)
167         return self.execute(q, props).lastrowid
168     
169     def node_lookup(self, path):
170         """Lookup the current node of the given path.
171            Return None if the path is not found.
172         """
173         
174         q = ("select node from nodes where path = ?")
175         self.execute(q, (path,))
176         r = self.fetchone()
177         if r is not None:
178             return r[0]
179         return None
180     
181     def node_update_ancestors(self, node, population, size, mtime, cluster=0):
182         """Update the population properties of the given node.
183            Population properties keep track the population and total
184            size of objects in the node's namespace.
185            May be zero or positive or negative numbers.
186         """
187         
188         qs = ("select population, size from statistics"
189               "where node = ? and cluster = ?")
190         qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
191               "values (?, ?, ?, ?, ?)")
192         qp = "select parent from nodes where serial = ?"
193         execute = self.execute
194         fetchone = self.fetchone
195         while 1:
196             execute(qs, (node, cluster))
197             r = fetchone()
198             if r is None:
199                 prepopulation, presize = (0, 0)
200             else:
201                 prepopulation, presize = r
202             population += prepopulation
203             size += presize
204             
205             execute(qu, (node, population, size, mtime, cluster))
206             if node == 0:
207                 break
208             
209             population = 0 # Population isn't recursive
210             execute(qp, (node,))
211             r = fetchone()
212             if r is None:
213                 break
214             node = r[0]
215     
216     def node_statistics(self, node, cluster=0):
217         """Return population, total size and last mtime
218            for all versions under node that belong to the cluster.
219         """
220         
221         q = ("select population, size, mtime from statistics"
222              "where node = ? and cluster = ?")
223         self.execute(q, (node, cluster))
224         r = fetchone()
225         if r is None:
226             return (0, 0, 0)
227         return r
228     
229     def node_children(self, node):
230         """Return node's child count."""
231         
232         q = "select count(node) from nodes where parent = ?"
233         self.execute(q, (node,))
234         r = fetchone()
235         if r is None:
236             return 0
237         return r
238     
239 #     def node_remove(self, serial, recursive=0):
240 #         """Remove the node specified by serial.
241 #            Return false if the node is not found,
242 #            or has ancestors and recursive is not set.
243 #         """
244 #         
245 #         props = self.node_get_properties(serial)
246 #         if props is None:
247 #             return False
248 #         size = props[SIZE]
249 #         parent = props[PARENT]
250 #         pop = props[POPULATION]
251 #         popsize = props[POPSIZE]
252 #         if pop and not recursive:
253 #             return False
254 #         
255 #         q = ("delete from nodes where serial = ?")
256 #         self.execute(q, (serial,))
257 #         self.node_update_ancestors(parent, -pop-1, -size-popsize)
258 #         return True
259     
260     def version_create(self, node, size, source, muser, cluster=0):
261         """Create a new version from the given properties.
262            Return the (serial, mtime) of the new version.
263         """
264         
265         q = ("insert into nodes (node, size, source, mtime, muser, cluster) "
266              "values (?, ?, ?, ?, ?)")
267         mtime = time()
268         props = (node, path, size, source, mtime, muser, cluster)
269         serial = self.execute(q, props).lastrowid
270         self.node_update_ancestors(node, 1, size, mtime, cluster)
271         return serial, mtime
272     
273     def version_lookup(self, node, before=inf, cluster=0):
274         """Lookup the current version of the given node.
275            Return a list with its properties:
276            (serial, node, size, source, mtime, muser, cluster)
277            or None if the current version is not found in the given cluster.
278         """
279         
280         q = ("select serial, node, size, source, mtime, muser, cluster "
281              "from versions "
282              "where serial = (select max(serial) "
283                              "from versions "
284                              "where node = ? and mtime < ?) "
285              "and cluster = ?")
286         self.execute(q, (node, before, cluster))
287         props = self.fetchone()
288         if props is not None:
289             return props
290         return None
291     
292     def version_get_properties(self, serial, keys=(), propnames=_propnames):
293         """Return a sequence of values for the properties of
294            the version specified by serial and the keys, in the order given.
295            If keys is empty, return all properties in the order
296            (serial, node, size, source, mtime, muser, cluster).
297         """
298         
299         q = ("select serial, node, path, size, source, mtime, muser, cluster "
300              "from nodes "
301              "where serial = ?")
302         self.execute(q, (serial,))
303         r = self.fetchone()
304         if r is None:
305             return r
306         
307         if not keys:
308             return r
309         return [r[propnames[k]] for k in keys if k in propnames]
310     
311 #     def node_set_properties(self, serial, items, propnames=_mutablepropnames):
312 #         """Set the properties of a node specified by the node serial and
313 #            the items iterable of (name, value) pairs.
314 #            Mutable properties are %s.
315 #            Invalid property names and 'serial' are not set.
316 #         """ % (_mutables,)
317 #         
318 #         if not items:
319 #             return
320 #         
321 #         keys, vals = zip(*items)
322 #         keystr = ','.join(("%s = ?" % k) for k in keys if k in propnames)
323 #         if not keystr:
324 #             return
325 #         q = "update nodes set %s where serial = ?" % keystr
326 #         vals += (serial,)
327 #         self.execute(q, vals)
328     
329     def version_recluster(self, serial, cluster):
330         """Move the version into another cluster."""
331         
332         props = self.node_get_properties(source)
333         node = props[NODE]
334         size = props[SIZE]
335         mtime = props[MTIME]
336         oldcluster = props[CLUSTER]
337         if cluster == oldcluster:
338             return
339         
340         self.node_update_ancestors(node, -1, -size, mtime, oldcluster)
341         self.node_update_ancestors(node, 1, size, mtime, cluster)
342
343         q = "update nodes set parent = ?, path = ? where serial = ?"
344         self.execute(q, (parent, path, source))
345     
346 #     def version_copy(self, serial, node, muser, copy_attr=True):
347 #         """Copy the version specified by serial into
348 #            a new version of node. Optionally copy attributes.
349 #            Return the (serial, mtime) of the new version.
350 #         """
351 #         
352 #         props = self.version_get_properties(serial)
353 #         if props is None:
354 #             return None
355 #         size = props[SIZE]
356 #         cluster = props[CLUSTER]
357 #         new_serial, mtime = self.version_create(node, size, serial, muser, cluster)
358 #         if copy_attr:
359 #             self.attribute_copy(serial, new_serial)
360 #         return (new_serial, mtime)
361     
362     def path_statistics(self, prefix, before=inf, except_cluster=0):
363         """Return population, total size and last mtime
364            for all latest versions under prefix that
365            do not belong to the cluster.
366         """
367         
368         q = ("select count(serial), sum(size), max(mtime) "
369              "from versions v "
370              "where serial = (select max(serial) "
371                              "from versions "
372                              "where node = v.node and mtime < ?) "
373              "and cluster != ? "
374              "and node in (select node "
375                           "from nodes "
376                           "where path like ?")
377         self.execute(q, (before, except_cluster, prefix + '%'))
378         r = fetchone()
379         if r is None:
380             return (0, 0, 0)
381         return r
382     
383     def parse_filters(self, filterq):
384         preterms = filterq.split(',')
385         included = []
386         excluded = []
387         opers = []
388         match = _regexfilter.match
389         for term in preterms:
390             m = match(term)
391             if m is None:
392                 continue
393             neg, key, op, value = m.groups()
394             if neg:
395                 excluded.append(key)
396             elif not value:
397                 included.append(key)
398             elif op:
399                 opers.append((key, op, value))
400         
401         return included, excluded, opers
402     
403     def construct_filters(self, filterq):
404         subqlist = []
405         append = subqlist.append
406         included, excluded, opers = self.parse_filters(filterq)
407         args = []
408         
409         if included:
410             subq = "key in ("
411             subq += ','.join(('?' for x in included)) + ")"
412             args += included
413             append(subq)
414         
415         if excluded:
416             subq = "key not in ("
417             subq += ','.join(('?' for x in exluded)) + ")"
418             args += excluded
419             append(subq)
420         
421         if opers:
422             t = (("(key = %s and value %s %s)" % (k, o, v)) for k, o, v in opers)
423             subq = "(" + ' or '.join(t) + ")"
424             args += opers
425         
426         if not subqlist:
427             return None, None
428         
429         subq = " and serial in (select serial from attributes where "
430         subq += ' and '.join(subqlist)
431         subq += ")"
432         
433         return subq, args
434     
435 #     def node_list(self, parent, prefix='',
436 #                    start='', delimiter=None,
437 #                    after=0.0, before=inf,
438 #                    filterq=None, versions=0,
439 #                    cluster=0, limit=10000):
440 #         """Return (a list of property tuples, a list of common prefixes)
441 #            for the current versions of the paths with the given parent,
442 #            matching the following criteria.
443 #            
444 #            The property tuple for a version is returned if all
445 #            of these conditions are true:
446 #            
447 #                 a. parent (and cluster) matches
448 #                 
449 #                 b. path > start
450 #                 
451 #                 c. path starts with prefix
452 #                 
453 #                 d. i  [versions=true]  version is in (after, before)
454 #                    ii [versions=false] version is the max in (after, before)
455 #                 
456 #                 e. the path does not have the delimiter occuring
457 #                    after the prefix.
458 #                 
459 #                 f. serial matches the attribute filter query.
460 #                    
461 #                    A filter query is a comma-separated list of
462 #                    terms in one of these three forms:
463 #                    
464 #                    key
465 #                        an attribute with this key must exist
466 #                    
467 #                    !key
468 #                        an attribute with this key must not exist
469 #                    
470 #                    key ?op value
471 #                        the attribute with this key satisfies the value
472 #                        where ?op is one of ==, != <=, >=, <, >.
473 #            
474 #            matching up to the first delimiter after prefix,
475 #            and are reported only once, as "virtual directories".
476 #            The delimiter is included in the prefixes.
477 #            Prefixes do appear from (e) even if no paths would match in (f).
478 #            
479 #            If arguments are None, then the corresponding matching rule
480 #            will always match.
481 #         """
482 #         
483 #         execute = self.execute
484
485 #         if start < prefix:
486 #             start = strprevling(prefix)
487
488 #         nextling = strnextling(prefix)
489
490 #         q = ("select serial, parent, path, size, "
491 #                     "population, popsize, source, mtime, cluster "
492 #              "from nodes "
493 #              "where parent = ? and path > ? and path < ? "
494 #              "and mtime > ? and mtime < ? and cluster = ?")
495 #         args = [parent, start, nextling, after, before, cluster]
496
497 #         if filterq:
498 #             subq, subargs = self.construct_filters(filterq)
499 #             if subq is not None:
500 #                 q += subq
501 #                 args += subargs
502 #         q += " order by path"
503
504 #         if delimiter is None:
505 #             q += " limit ?"
506 #             args.append(limit)
507 #             execute(q, args)
508 #             return self.fetchall(), ()
509
510 #         pfz = len(prefix)
511 #         dz = len(delimiter)
512 #         count = 0
513 #         fetchone = self.fetchone
514 #         prefixes = []
515 #         pappend = prefixes.append
516 #         matches = []
517 #         mappend = matches.append
518 #         
519 #         execute(q, args)
520 #         while 1:
521 #             props = fetchone()
522 #             if props is None:
523 #                 break
524 #             path = props[PATH]
525 #             idx = path.find(delimiter, pfz)
526 #             if idx < 0:
527 #                 mappend(props)
528 #                 count += 1
529 #                 if count >= limit:
530 #                     break
531 #                 continue
532
533 #             pf = path[:idx + dz]
534 #             pappend(pf)
535 #             count += 1
536 #             ## XXX: if we break here due to limit,
537 #             ##      but a path would also be matched below,
538 #             ##      the path match would be lost since the
539 #             ##      next call with start=path would skip both of them.
540 #             ##      In this case, it is impossible to obey the limit,
541 #             ##      therefore we will break later, at limit + 1.
542 #             if idx + dz == len(path):
543 #                 mappend(props)
544 #                 count += 1
545
546 #             if count >= limit: 
547 #                 break
548
549 #             args[1] = strnextling(pf) # new start
550 #             execute(q, args)
551
552 #         return matches, prefixes
553
554 #     def node_delete(self, parent, prefix,
555 #                     start='', delimiter=None,
556 #                     after=0.0, before=inf,
557 #                     filterq=None, versions=0,
558 #                     cluster=0, limit=10000):
559 #         """Delete the matching version for each
560 #            of the matching paths in the parent's namespace.
561 #            Return empty if nothing is deleted, else return matches.
562 #            The paths matching are those that would
563 #            be returned by .node_list() with the same arguments.
564 #            Note that only paths are deleted, not prefixes.
565
566 #         """
567 #         r = self.node_list(parent, prefix,
568 #                            start=start, delimiter=delimiter,
569 #                            after=after, before=before,
570 #                            filterq=filterq, versions=versions,
571 #                            cluster=cluster, limit=limit)
572 #         matches, prefixes = r
573 #         if not matches:
574 #             return ()
575
576 #         q = "delete from nodes where serial = ?"
577 #         self.executemany(q, ((props[SERIAL],) for props in matches))
578 #         # TODO: Update sizes.
579 #         return matches
580
581 #     def node_purge(self, parent, path, after=0, before=inf, cluster=0):
582 #         """Delete all nodes with the specified
583 #            parent, cluster and path, and return
584 #            the serials of nodes deleted.
585 #         """
586 #         execute = self.execute
587 #         q = ("select count(serial), total(size), "
588 #                     "total(population), total(popsize) "
589 #              "from nodes "
590 #              "where parent = ? and cluster = ? "
591 #              "and path = ? and mtime between ? and ?")
592 #         args = (parent, cluster, path, after, before)
593 #         execute(q, args)
594 #         nr, size, pop, popsize = self.fetchone()
595 #         if not nr:
596 #             return ()
597 #         self.node_update_ancestors(parent, -pop-nr, -size-popsize)
598 #         q = ("select serial from nodes "
599 #              "where parent = ? and cluster = ? "
600 #              "and path = ? and mtime between ? and ?")
601 #         execute(q, args)
602 #         serials = [r[SERIAL] for r in self.fetchall()]
603 #         q = ("delete from nodes where "
604 #              "parent = ? and cluster = ? "
605 #              "and path = ? and mtime between ? and ?")
606 #         execute(q, args)
607 #         return serials
608     
609     def attribute_get(self, serial, keys=()):
610         """Return a list of (key, value) pairs of the version specified by serial.
611            If keys is empty, return all attributes.
612            Othwerise, return only those specified.
613         """
614         
615         execute = self.execute
616         if keys:
617             marks = ','.join('?' for k in keys)
618             q = ("select key, value from attributes "
619                  "where key in (%s) and serial = ?" % (marks,))
620             execute(q, keys + (serial,))
621         else:
622             q = "select key, value from attributes where serial = ?"
623             execute(q, (serial,))
624         return self.fetchall()
625     
626     def attribute_set(self, serial, items):
627         """Set the attributes of the version specified by serial.
628            Receive attributes as an iterable of (key, value) pairs.
629         """
630         
631         q = ("insert or replace into attributes (serial, key, value) "
632              "values (?, ?, ?)")
633         self.executemany(q, ((serial, k, v) for k, v in items))
634     
635     def attribute_del(self, serial, keys=()):
636         """Delete attributes of the version specified by serial.
637            If keys is empty, delete all attributes.
638            Otherwise delete those specified.
639         """
640         
641         if keys:
642             q = "delete from attributes where serial = ? and key = ?"
643             self.executemany(q, ((serial, key) for key in keys))
644         else:
645             q = "delete from attributes where serial = ?"
646             self.execute(q, (serial,))
647     
648 #     def node_get_attribute_keys(self, parent):
649 #         """Return a list with all keys pairs defined
650 #            for the namespace of the node specified.
651 #         """
652 #         
653 #         q = ("select distinct key from attributes a, versions v, nodes n "
654 #              "where a.serial = v.serial and v.node = n.node and n.parent = ?")
655 #         self.execute(q, (parent,))
656 #         return [r[0] for r in self.fetchall()]
657     
658     def attribute_copy(self, source, dest):
659         q = ("insert or replace into attributes "
660              "select ?, key, value from attributes "
661              "where serial = ?")
662         self.execute(q, (dest, source))