Revision 60b8a083

b/pithos/backends/lib/node.py
44 44

  
45 45

  
46 46
def strnextling(prefix):
47
    """return the first unicode string
47
    """Return the first unicode string
48 48
       greater than but not starting with given prefix.
49 49
       strnextling('hello') -> 'hellp'
50 50
    """
......
64 64
    return s
65 65

  
66 66
def strprevling(prefix):
67
    """return an approximation of the last unicode string
67
    """Return an approximation of the last unicode string
68 68
       less than but not starting with given prefix.
69 69
       strprevling(u'hello') -> u'helln\\xffff'
70 70
    """
......
78 78
    return s
79 79

  
80 80

  
81
import re
82
_regexfilter = re.compile('(!?)\s*([\w-]+)\s*(=|!=|<=|>=|<|>)?\s*(.*)$', re.UNICODE)
83

  
84 81
_propnames = {
85 82
    'serial'    : 0,
86 83
    'node'      : 1,
......
93 90

  
94 91

  
95 92
class Node(DBWorker):
96
    """Nodes store path organization.
97
       Versions store object history.
93
    """Nodes store path organization and have multiple versions.
94
       Versions store object history and have multiple attributes.
98 95
       Attributes store metadata.
99 96
    """
100 97
    
101
    # TODO: Keep size of object in one place.
98
    # TODO: Provide an interface for included and excluded clusters.
102 99
    
103 100
    def __init__(self, **params):
104 101
        DBWorker.__init__(self, **params)
......
141 138
                            references nodes(node)
142 139
                            on update cascade
143 140
                            on delete cascade ) """)
144
        # execute(""" create index if not exists idx_versions_path
145
        #             on nodes(cluster, node, path) """)
141
        execute(""" create index if not exists idx_versions_node
142
                    on nodes(node) """)
143
        # TODO: Sort out if more indexes are needed.
146 144
        # execute(""" create index if not exists idx_versions_mtime
147 145
        #             on nodes(mtime) """)
148 146
        
......
199 197
        q = ("select serial, node, size, source, mtime, muser, cluster "
200 198
             "from versions "
201 199
             "where node = ?")
202
        self.execute(q, (serial,))
200
        self.execute(q, (node,))
203 201
        r = self.fetchall()
204 202
        if r is None:
205 203
            return r
......
237 235
        nr, size = self.fetchone()
238 236
        if not nr:
239 237
            return ()
240
        self.statistics_update(parent, -nr, -size, cluster)
241
        self.statistics_update_ancestors(parent, -nr, -size, cluster)
238
        mtime = time()
239
        self.statistics_update(parent, -nr, -size, mtime, cluster)
240
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
242 241
        
243 242
        q = ("select serial from versions "
244 243
             "where node in (select node "
......
281 280
        nr, size = self.fetchone()
282 281
        if not nr:
283 282
            return ()
284
        self.statistics_update_ancestors(node, -nr, -size, cluster)
283
        mtime = time()
284
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
285 285
        
286 286
        q = ("select serial from versions "
287 287
             "where node = ? "
......
420 420
            return (0, 0, mtime)
421 421
        
422 422
        # All children (get size and mtime).
423
        # XXX: This is why the full path is stored.
423 424
        q = ("select count(serial), sum(size), max(mtime) "
424 425
             "from versions v "
425 426
             "where serial = (select max(serial) "
......
524 525
        self.execute(q, (serial,))
525 526
        return True
526 527
    
527
    def parse_filters(self, filterq):
528
        preterms = filterq.split(',')
529
        included = []
530
        excluded = []
531
        opers = []
532
        match = _regexfilter.match
533
        for term in preterms:
534
            m = match(term)
535
            if m is None:
536
                continue
537
            neg, key, op, value = m.groups()
538
            if neg:
539
                excluded.append(key)
540
            elif not value:
541
                included.append(key)
542
            elif op:
543
                opers.append((key, op, value))
544
        
545
        return included, excluded, opers
546
    
547
    def construct_filters(self, filterq):
548
        subqlist = []
549
        append = subqlist.append
550
        included, excluded, opers = self.parse_filters(filterq)
551
        args = []
552
        
553
        if included:
554
            subq = "key in ("
555
            subq += ','.join(('?' for x in included)) + ")"
556
            args += included
557
            append(subq)
558
        
559
        if excluded:
560
            subq = "key not in ("
561
            subq += ','.join(('?' for x in exluded)) + ")"
562
            args += excluded
563
            append(subq)
564
        
565
        if opers:
566
            t = (("(key = %s and value %s %s)" % (k, o, v)) for k, o, v in opers)
567
            subq = "(" + ' or '.join(t) + ")"
568
            args += opers
569
        
570
        if not subqlist:
571
            return None, None
572
        
573
        subq = " and serial in (select serial from attributes where "
574
        subq += ' and '.join(subqlist)
575
        subq += ")"
576
        
577
        return subq, args
578
    
579
#     def node_list(self, parent, prefix='',
580
#                    start='', delimiter=None,
581
#                    after=0.0, before=inf,
582
#                    filterq=None, versions=0,
583
#                    cluster=0, limit=10000):
584
#         """Return (a list of property tuples, a list of common prefixes)
585
#            for the current versions of the paths with the given parent,
586
#            matching the following criteria.
587
#            
588
#            The property tuple for a version is returned if all
589
#            of these conditions are true:
590
#            
591
#                 a. parent (and cluster) matches
592
#                 
593
#                 b. path > start
594
#                 
595
#                 c. path starts with prefix
596
#                 
597
#                 d. i  [versions=true]  version is in (after, before)
598
#                    ii [versions=false] version is the max in (after, before)
599
#                 
600
#                 e. the path does not have the delimiter occuring
601
#                    after the prefix.
602
#                 
603
#                 f. serial matches the attribute filter query.
604
#                    
605
#                    A filter query is a comma-separated list of
606
#                    terms in one of these three forms:
607
#                    
608
#                    key
609
#                        an attribute with this key must exist
610
#                    
611
#                    !key
612
#                        an attribute with this key must not exist
613
#                    
614
#                    key ?op value
615
#                        the attribute with this key satisfies the value
616
#                        where ?op is one of ==, != <=, >=, <, >.
617
#            
618
#            matching up to the first delimiter after prefix,
619
#            and are reported only once, as "virtual directories".
620
#            The delimiter is included in the prefixes.
621
#            Prefixes do appear from (e) even if no paths would match in (f).
622
#            
623
#            If arguments are None, then the corresponding matching rule
624
#            will always match.
625
#         """
626
#         
627
#         execute = self.execute
628
# 
629
#         if start < prefix:
630
#             start = strprevling(prefix)
631
# 
632
#         nextling = strnextling(prefix)
633
# 
634
#         q = ("select serial, parent, path, size, "
635
#                     "population, popsize, source, mtime, cluster "
636
#              "from nodes "
637
#              "where parent = ? and path > ? and path < ? "
638
#              "and mtime > ? and mtime < ? and cluster = ?")
639
#         args = [parent, start, nextling, after, before, cluster]
640
# 
641
#         if filterq:
642
#             subq, subargs = self.construct_filters(filterq)
643
#             if subq is not None:
644
#                 q += subq
645
#                 args += subargs
646
#         q += " order by path"
647
# 
648
#         if delimiter is None:
649
#             q += " limit ?"
650
#             args.append(limit)
651
#             execute(q, args)
652
#             return self.fetchall(), ()
653
# 
654
#         pfz = len(prefix)
655
#         dz = len(delimiter)
656
#         count = 0
657
#         fetchone = self.fetchone
658
#         prefixes = []
659
#         pappend = prefixes.append
660
#         matches = []
661
#         mappend = matches.append
662
#         
663
#         execute(q, args)
664
#         while 1:
665
#             props = fetchone()
666
#             if props is None:
667
#                 break
668
#             path = props[PATH]
669
#             idx = path.find(delimiter, pfz)
670
#             if idx < 0:
671
#                 mappend(props)
672
#                 count += 1
673
#                 if count >= limit:
674
#                     break
675
#                 continue
676
# 
677
#             pf = path[:idx + dz]
678
#             pappend(pf)
679
#             count += 1
680
#             ## XXX: if we break here due to limit,
681
#             ##      but a path would also be matched below,
682
#             ##      the path match would be lost since the
683
#             ##      next call with start=path would skip both of them.
684
#             ##      In this case, it is impossible to obey the limit,
685
#             ##      therefore we will break later, at limit + 1.
686
#             if idx + dz == len(path):
687
#                 mappend(props)
688
#                 count += 1
689
# 
690
#             if count >= limit: 
691
#                 break
692
# 
693
#             args[1] = strnextling(pf) # new start
694
#             execute(q, args)
695
# 
696
#         return matches, prefixes
697
    
698 528
    def attribute_get(self, serial, keys=()):
699 529
        """Return a list of (key, value) pairs of the version specified by serial.
700 530
           If keys is empty, return all attributes.
......
740 570
             "where serial = ?")
741 571
        self.execute(q, (dest, source))
742 572
    
743
    def latest_attribute_keys(self, parent, before=inf, except_cluster=0, allowed_paths=[]):
573
    def _construct_filters(self, filterq):
574
        if not filterq:
575
            return None, None
576
        
577
        args = filterq.split(',')
578
        subq = " and a.key in ("
579
        subq += ','.join(('?' for x in args))
580
        subq += ")"
581
        
582
        return subq, args
583
    
584
    def _construct_paths(self, pathq):
585
        if not pathq:
586
            return None, None
587
        
588
        subq = " and ("
589
        subq += ' or '.join(('n.path like ?' for x in pathq))
590
        subq += ")"
591
        args = tuple([x + '%' for x in pathq])
592
        
593
        return subq, args
594
    
595
    def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
744 596
        """Return a list with all keys pairs defined
745 597
           for all latest versions under parent that
746 598
           do not belong to the cluster.
747 599
        """
748 600
        
749 601
        # TODO: Use another table to store before=inf results.
750
        q = ("select a.key "
602
        q = ("select distinct a.key "
751 603
             "from attributes a, versions v, nodes n "
752 604
             "where v.serial = (select max(serial) "
753 605
                              "from versions "
......
759 611
             "and a.serial = v.serial "
760 612
             "and n.node = v.node")
761 613
        args = (before, except_cluster, parent)
762
        for path in allowed_paths:
763
            q += ' and n.path like ?'
764
            args += (path + '%',)
614
        subq, subargs = self._construct_paths(pathq)
615
        if subq is not None:
616
            q += subq
617
            args += subargs
765 618
        self.execute(q, args)
766 619
        return [r[0] for r in self.fetchall()]
620
    
621
    def latest_version_list(self, parent, prefix='', delimiter=None,
622
                            start='', limit=10000, before=inf,
623
                            except_cluster=0, pathq=[], filterq=None):
624
        """Return a (list of (path, serial) tuples, list of common prefixes)
625
           for the current versions of the paths with the given parent,
626
           matching the following criteria.
627
           
628
           The property tuple for a version is returned if all
629
           of these conditions are true:
630
                
631
                a. parent matches
632
                
633
                b. path > start
634
                
635
                c. path starts with prefix (and paths in pathq)
636
                
637
                d. version is the max up to before
638
                
639
                e. version is not in cluster
640
                
641
                f. the path does not have the delimiter occuring
642
                   after the prefix, or ends with the delimiter
643
                
644
                g. serial matches the attribute filter query.
645
                   
646
                   A filter query is a comma-separated list of
647
                   terms in one of these three forms:
648
                   
649
                   key
650
                       an attribute with this key must exist
651
                   
652
                   !key
653
                       an attribute with this key must not exist
654
                   
655
                   key ?op value
656
                       the attribute with this key satisfies the value
657
                       where ?op is one of ==, != <=, >=, <, >.
658
           
659
           The list of common prefixes includes the prefixes
660
           matching up to the first delimiter after prefix,
661
           and are reported only once, as "virtual directories".
662
           The delimiter is included in the prefixes.
663
           
664
           If arguments are None, then the corresponding matching rule
665
           will always match.
666
           
667
           Limit applies to the first list of tuples returned.
668
        """
669
        
670
        execute = self.execute
671
        
672
        if not start or start < prefix:
673
            start = strprevling(prefix)
674
        nextling = strnextling(prefix)
675
        
676
        q = ("select distinct n.path, v.serial "
677
             "from attributes a, versions v, nodes n "
678
             "where v.serial = (select max(serial) "
679
                              "from versions "
680
                              "where node = v.node and mtime < ?) "
681
             "and v.cluster != ? "
682
             "and v.node in (select node "
683
                           "from nodes "
684
                           "where parent = ?) "
685
             "and a.serial = v.serial "
686
             "and n.node = v.node "
687
             "and n.path > ? and n.path < ?")
688
        args = [before, except_cluster, parent, start, nextling]
689
        
690
        subq, subargs = self._construct_paths(pathq)
691
        if subq is not None:
692
            q += subq
693
            args += subargs
694
        subq, subargs = self._construct_filters(filterq)
695
        if subq is not None:
696
            q += subq
697
            args += subargs
698
        else:
699
            q = q.replace("attributes a, ", "")
700
            q = q.replace("and a.serial = v.serial ", "")
701
        q += " order by n.path"
702
        
703
        if not delimiter:
704
            q += " limit ?"
705
            args.append(limit)
706
            execute(q, args)
707
            return self.fetchall(), ()
708
        
709
        pfz = len(prefix)
710
        dz = len(delimiter)
711
        count = 0
712
        fetchone = self.fetchone
713
        prefixes = []
714
        pappend = prefixes.append
715
        matches = []
716
        mappend = matches.append
717
        
718
        execute(q, args)
719
        while True:
720
            props = fetchone()
721
            if props is None:
722
                break
723
            path, serial = props
724
            idx = path.find(delimiter, pfz)
725
            
726
            if idx < 0:
727
                mappend(props)
728
                count += 1
729
                if count >= limit:
730
                    break
731
                continue
732
            
733
            pf = path[:idx + dz]
734
            pappend(pf)
735
            if idx + dz == len(path):
736
                mappend(props)
737
                count += 1
738
            if count >= limit: 
739
                break
740
            
741
            args[3] = strnextling(pf) # New start.
742
            execute(q, args)
743
        
744
        return matches, prefixes
b/pithos/backends/modular.py
555 555
        
556 556
        logger.debug("list_versions: %s %s %s", account, container, name)
557 557
        self._can_read(user, account, container, name)
558
        path, node = self._lookup_object(account, container, name)
558 559
        return self.node.node_get_versions(node, ['serial', 'mtime'])
559 560
    
560 561
    @backend_method(autocommit=0)
......
569 570
    
570 571
    @backend_method(autocommit=0)
571 572
    def put_block(self, data):
572
        """Create a block and return the hash."""
573
        """Store a block and return the hash."""
573 574
        
574 575
        logger.debug("put_block: %s", len(data))
575 576
        hashes, absent = self.blocker.block_stor((data,))
......
585 586
        h, e = self.blocker.block_delta(binascii.unhexlify(hash), ((offset, data),))
586 587
        return binascii.hexlify(h)
587 588
    
588
    def _check_policy(self, policy):
589
        for k in policy.keys():
590
            if policy[k] == '':
591
                policy[k] = self.default_policy.get(k)
592
        for k, v in policy.iteritems():
593
            if k == 'quota':
594
                q = int(v) # May raise ValueError.
595
                if q < 0:
596
                    raise ValueError
597
            elif k == 'versioning':
598
                if v not in ['auto', 'manual', 'none']:
599
                    raise ValueError
600
            else:
601
                raise ValueError
602
    
603
    def _sql_until(self, parent, until=None):
604
        """Return the sql to get the latest versions until the timestamp given."""
605
        
606
        if until is None:
607
            until = time.time()
608
        sql = ("select v.serial, n.path, v.mtime, v.size "
609
               "from versions v, nodes n "
610
               "where v.serial = (select max(serial) "
611
                                 "from versions "
612
                                 "where node = v.node and mtime < %s) "
613
               "and v.cluster != %s "
614
               "and v.node = n.node "
615
               "and v.node in (select node "
616
                              "from nodes "
617
                              "where parent = %s)")
618
        return sql % (until, CLUSTER_DELETED, parent)
619
    
620
    def _list_limits(self, listing, marker, limit):
621
        start = 0
622
        if marker:
623
            try:
624
                start = listing.index(marker) + 1
625
            except ValueError:
626
                pass
627
        if not limit or limit > 10000:
628
            limit = 10000
629
        return start, limit
630
    
631
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
632
        cont_prefix = path + '/'
633
        if keys and len(keys) > 0:
634
#             sql = '''select distinct o.name, o.version_id from (%s) o, metadata m where o.name like ? and
635
#                         m.version_id = o.version_id and m.key in (%s)'''
636
#             sql = sql % (self._sql_until(until), ', '.join('?' * len(keys)))
637
#             param = (cont_prefix + prefix + '%',) + tuple(keys)
638
#             if allowed:
639
#                 sql += ' and (' + ' or '.join(('o.name like ?',) * len(allowed)) + ')'
640
#                 param += tuple([x + '%' for x in allowed])
641
#             sql += ' order by o.name'
642
            return []
643
        else:
644
            sql = 'select path, serial from (%s) where path like ?'
645
            sql = sql % self._sql_until(parent, until)
646
            param = (cont_prefix + prefix + '%',)
647
            if allowed:
648
                sql += ' and (' + ' or '.join(('name like ?',) * len(allowed)) + ')'
649
                param += tuple([x + '%' for x in allowed])
650
            sql += ' order by path'
651
        c = self.con.execute(sql, param)
652
        objects = [(x[0][len(cont_prefix):], x[1]) for x in c.fetchall()]
653
        if delimiter:
654
            pseudo_objects = []
655
            for x in objects:
656
                pseudo_name = x[0]
657
                i = pseudo_name.find(delimiter, len(prefix))
658
                if not virtual:
659
                    # If the delimiter is not found, or the name ends
660
                    # with the delimiter's first occurence.
661
                    if i == -1 or len(pseudo_name) == i + len(delimiter):
662
                        pseudo_objects.append(x)
663
                else:
664
                    # If the delimiter is found, keep up to (and including) the delimiter.
665
                    if i != -1:
666
                        pseudo_name = pseudo_name[:i + len(delimiter)]
667
                    if pseudo_name not in [y[0] for y in pseudo_objects]:
668
                        if pseudo_name == x[0]:
669
                            pseudo_objects.append(x)
670
                        else:
671
                            pseudo_objects.append((pseudo_name, None))
672
            objects = pseudo_objects
673
        
674
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
675
        return objects[start:start + limit]
676
    
677 589
    # Path functions.
678 590
    
679 591
    def _put_object_node(self, account, container, name):
......
795 707
        if copy_data and src_version_id is not None:
796 708
            self._copy_data(src_version_id, dest_version_id)
797 709
    
710
    def _list_limits(self, listing, marker, limit):
711
        start = 0
712
        if marker:
713
            try:
714
                start = listing.index(marker) + 1
715
            except ValueError:
716
                pass
717
        if not limit or limit > 10000:
718
            limit = 10000
719
        return start, limit
720
    
721
    def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
722
        cont_prefix = path + '/'
723
        prefix = cont_prefix + prefix
724
        start = cont_prefix + marker if marker else None
725
        before = until if until is not None else inf
726
        filterq = ','.join(keys) if keys else None
727
        
728
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
729
        objects.extend([(p, None) for p in prefixes] if virtual else [])
730
        objects.sort()
731
        objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
732
        
733
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
734
        return objects[start:start + limit]
735
    
736
    # Policy functions.
737
    
738
    def _check_policy(self, policy):
739
        for k in policy.keys():
740
            if policy[k] == '':
741
                policy[k] = self.default_policy.get(k)
742
        for k, v in policy.iteritems():
743
            if k == 'quota':
744
                q = int(v) # May raise ValueError.
745
                if q < 0:
746
                    raise ValueError
747
            elif k == 'versioning':
748
                if v not in ['auto', 'manual', 'none']:
749
                    raise ValueError
750
            else:
751
                raise ValueError
752
    
798 753
    # Access control functions.
799 754
    
800 755
    def _check_groups(self, groups):

Also available in: Unified diff