Revision 585b75e7

b/snf-pithos-backend/pithos/backends/lib/sqlalchemy/node.py
122 122
                                         ondelete='CASCADE',
123 123
                                         onupdate='CASCADE'),
124 124
                              autoincrement=False))
125
        columns.append(Column('latest_version', Integer))
125 126
        columns.append(Column('path', String(2048), default='', nullable=False))
126 127
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
127 128
        Index('idx_nodes_path', self.nodes.c.path, unique=True)
129
        Index('idx_nodes_parent', self.nodes.c.parent)
128 130
        
129 131
        #create policy table
130 132
        columns=[]
......
170 172
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
171 173
        Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
172 174
        Index('idx_versions_node_uuid', self.versions.c.uuid)
175
        Index('idx_versions_serial_cluster', self.versions.c.serial, self.versions.c.cluster)
173 176
        
174 177
        #create attributes table
175 178
        columns = []
......
514 517
                    self.versions.c.uuid,
515 518
                    self.versions.c.checksum,
516 519
                    self.versions.c.cluster])
517
        filtered = select([func.max(self.versions.c.serial)],
518
                            self.versions.c.node == node)
519 520
        if before != inf:
521
            filtered = select([func.max(self.versions.c.serial)],
522
                            self.versions.c.node == node)
520 523
            filtered = filtered.where(self.versions.c.mtime < before)
524
        else:
525
            filtered = select([self.nodes.c.latest_version],
526
                            self.versions.c.node == node)
521 527
        s = s.where(and_(self.versions.c.cluster != except_cluster,
522 528
                         self.versions.c.serial == filtered))
523 529
        r = self.conn.execute(s)
......
532 538
        s = select([func.count(v.c.serial),
533 539
                    func.sum(v.c.size),
534 540
                    func.max(v.c.mtime)])
535
        c1 = select([func.max(self.versions.c.serial)])
536 541
        if before != inf:
542
            c1 = select([func.max(self.versions.c.serial)])
537 543
            c1 = c1.where(self.versions.c.mtime < before)
544
            c1.where(self.versions.c.node == v.c.node)
545
        else:
546
            c1 = select([self.nodes.c.latest_version])
547
            c1.where(self.nodes.c.node == v.c.node)
538 548
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
539
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
549
        s = s.where(and_(v.c.serial == c1,
540 550
                         v.c.cluster != except_cluster,
541 551
                         v.c.node.in_(c2)))
542 552
        rp = self.conn.execute(s)
......
554 564
        s = select([func.count(v.c.serial),
555 565
                    func.sum(v.c.size),
556 566
                    func.max(v.c.mtime)])
557
        c1 = select([func.max(self.versions.c.serial)],
558
            self.versions.c.node == v.c.node)
559 567
        if before != inf:
568
            c1 = select([func.max(self.versions.c.serial)],
569
                    self.versions.c.node == v.c.node)
560 570
            c1 = c1.where(self.versions.c.mtime < before)
571
        else:
572
            c1 = select([self.nodes.c.serial],
573
                    self.nodes.c.node == v.c.node)
561 574
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
562 575
        s = s.where(and_(v.c.serial == c1,
563 576
                         v.c.cluster != except_cluster,
......
571 584
        mtime = max(mtime, r[2])
572 585
        return (count, size, mtime)
573 586
    
587
    def nodes_set_latest_version(self, node, serial):
588
        s = self.nodes.update().where(self.nodes.c.node == node)
589
        s = s.values(latest_version = serial)
590
        self.conn.execute(s).close()
591
    
574 592
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
575 593
        """Create a new version from the given properties.
576 594
           Return the (serial, mtime) of the new version.
......
581 599
                                          mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
582 600
        serial = self.conn.execute(s).inserted_primary_key[0]
583 601
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
602
        
603
        self.nodes_set_latest_version(node, serial)
604
        
584 605
        return serial, mtime
585 606
    
586 607
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
......
598 619
                        v.c.size, v.c.type, v.c.source,
599 620
                        v.c.mtime, v.c.muser, v.c.uuid,
600 621
                        v.c.checksum, v.c.cluster])
601
        c = select([func.max(self.versions.c.serial)],
602
            self.versions.c.node == node)
603 622
        if before != inf:
623
            c = select([func.max(self.versions.c.serial)],
624
                self.versions.c.node == node)
604 625
            c = c.where(self.versions.c.mtime < before)
626
        else:
627
            c = select([self.nodes.c.latest_version],
628
                self.nodes.c.node == node)
605 629
        s = s.where(and_(v.c.serial == c,
606 630
                         v.c.cluster == cluster))
607 631
        r = self.conn.execute(s)
......
616 640
           Return a list with their properties:
617 641
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
618 642
        """
619
        
643
        if not nodes:
644
            return ()
620 645
        v = self.versions.alias('v')
621 646
        if not all_props:
622 647
            s = select([v.c.serial])
......
625 650
                        v.c.size, v.c.type, v.c.source,
626 651
                        v.c.mtime, v.c.muser, v.c.uuid,
627 652
                        v.c.checksum, v.c.cluster])
628
        c = select([func.max(self.versions.c.serial)],
629
            self.versions.c.node.in_(nodes)).group_by(self.versions.c.node)
630 653
        if before != inf:
654
            c = select([func.max(self.versions.c.serial)],
655
                self.versions.c.node.in_(nodes))
631 656
            c = c.where(self.versions.c.mtime < before)
657
            c = c.group_by(self.versions.c.node)
658
        else:
659
            c = select([self.nodes.c.latest_version],
660
                self.nodes.c.node.in_(nodes))
632 661
        s = s.where(and_(v.c.serial.in_(c),
633 662
                         v.c.cluster == cluster))
634 663
        s = s.order_by(v.c.node)
......
706 735
        
707 736
        s = self.versions.delete().where(self.versions.c.serial == serial)
708 737
        self.conn.execute(s).close()
738
        
739
        props = self.version_lookup(node, cluster=cluster, all_props=False)
740
        if props:
741
            self.nodes_set_latest_version(v.node, serial)
742
        
709 743
        return hash, size
710 744
    
711 745
    def attribute_get(self, serial, domain, keys=()):
......
799 833
        v = self.versions.alias('v')
800 834
        n = self.nodes.alias('n')
801 835
        s = select([a.c.key]).distinct()
802
        filtered = select([func.max(self.versions.c.serial)])
803 836
        if before != inf:
837
            filtered = select([func.max(self.versions.c.serial)])
804 838
            filtered = filtered.where(self.versions.c.mtime < before)
805
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
839
            filtered = filtered.where(self.versions.c.node == v.c.node)
840
        else:
841
            filtered = select([self.nodes.c.latest_version])
842
            filtered = filtered.where(self.nodes.c.node == v.c.node)
843
        s = s.where(v.c.serial == filtered)
806 844
        s = s.where(v.c.cluster != except_cluster)
807 845
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
808 846
            self.nodes.c.parent == parent)))
......
890 928
                        v.c.size, v.c.type, v.c.source,
891 929
                        v.c.mtime, v.c.muser, v.c.uuid,
892 930
                        v.c.checksum, v.c.cluster]).distinct()
893
        filtered = select([func.max(self.versions.c.serial)])
894 931
        if before != inf:
932
            filtered = select([func.max(self.versions.c.serial)])
895 933
            filtered = filtered.where(self.versions.c.mtime < before)
896
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
934
        else:
935
            filtered = select([self.nodes.c.latest_version])
936
        s = s.where(v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
897 937
        s = s.where(v.c.cluster != except_cluster)
898 938
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
899 939
            self.nodes.c.parent == parent)))
b/snf-pithos-backend/pithos/backends/lib/sqlite/node.py
115 115
                          ( node       integer primary key,
116 116
                            parent     integer default 0,
117 117
                            path       text    not null default '',
118
                            latest_version     integer,
118 119
                            foreign key (parent)
119 120
                            references nodes(node)
120 121
                            on update cascade
121 122
                            on delete cascade ) """)
122 123
        execute(""" create unique index if not exists idx_nodes_path
123 124
                    on nodes(path) """)
125
        execute(""" create index if not exists idx_nodes_parent
126
                    on nodes(parent) """)
124 127
        
125 128
        execute(""" create table if not exists policy
126 129
                          ( node   integer,
......
164 167
                    on versions(node, mtime) """)
165 168
        execute(""" create index if not exists idx_versions_node_uuid
166 169
                    on versions(uuid) """)
170
        execute(""" create index if not exists idx_versions_serial_cluster
171
                    on versions(serial, cluster) """)
167 172
        
168 173
        execute(""" create table if not exists attributes
169 174
                          ( serial integer,
......
433 438
        
434 439
        # The latest version.
435 440
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
436
             "from versions "
437
             "where serial = (select max(serial) "
438
                             "from versions "
439
                             "where node = ? and mtime < ?) "
441
             "from versions v "
442
             "where serial = %s "
440 443
             "and cluster != ?")
441
        execute(q, (node, before, except_cluster))
444
        subq, args = self._construct_latest_version_subquery(node=node, before=before)
445
        execute(q % subq, args + [except_cluster])
442 446
        props = fetchone()
443 447
        if props is None:
444 448
            return None
......
447 451
        # First level, just under node (get population).
448 452
        q = ("select count(serial), sum(size), max(mtime) "
449 453
             "from versions v "
450
             "where serial = (select max(serial) "
451
                             "from versions "
452
                             "where node = v.node and mtime < ?) "
454
             "where serial = %s "
453 455
             "and cluster != ? "
454 456
             "and node in (select node "
455 457
                          "from nodes "
456 458
                          "where parent = ?)")
457
        execute(q, (before, except_cluster, node))
459
        subq, args = self._construct_latest_version_subquery(node=None, before=before)
460
        execute(q % subq, args + [except_cluster, node])
458 461
        r = fetchone()
459 462
        if r is None:
460 463
            return None
......
467 470
        # This is why the full path is stored.
468 471
        q = ("select count(serial), sum(size), max(mtime) "
469 472
             "from versions v "
470
             "where serial = (select max(serial) "
471
                             "from versions "
472
                             "where node = v.node and mtime < ?) "
473
             "where serial = %s "
473 474
             "and cluster != ? "
474 475
             "and node in (select node "
475 476
                          "from nodes "
476 477
                          "where path like ? escape '\\')")
477
        execute(q, (before, except_cluster, self.escape_like(path) + '%'))
478
        subq, args = self._construct_latest_version_subquery(node=None, before=before)
479
        execute(q % subq, args + [except_cluster, self.escape_like(path) + '%'])
478 480
        r = fetchone()
479 481
        if r is None:
480 482
            return None
......
482 484
        mtime = max(mtime, r[2])
483 485
        return (count, size, mtime)
484 486
    
487
    def nodes_set_latest_version(self, node, serial):
488
    	q = ("update nodes set latest_version = ? where node = ?")
489
        props = (serial, node)
490
        self.execute(q, props)
491
    
485 492
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
486 493
        """Create a new version from the given properties.
487 494
           Return the (serial, mtime) of the new version.
......
493 500
        props = (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
494 501
        serial = self.execute(q, props).lastrowid
495 502
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
503
        
504
        self.nodes_set_latest_version(node, serial)
505
        
496 506
        return serial, mtime
497 507
    
498 508
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
......
503 513
        """
504 514
        
505 515
        q = ("select %s "
506
             "from versions "
507
             "where serial = (select max(serial) "
508
                             "from versions "
509
                             "where node = ? and mtime < ?) "
516
             "from versions v "
517
             "where serial = %s "
510 518
             "and cluster = ?")
519
        subq, args = self._construct_latest_version_subquery(node=node, before=before)
511 520
        if not all_props:
512
            q = q % "serial"
521
            q = q % ("serial", subq)
513 522
        else:
514
            q = q % "serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster"
523
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
515 524
        
516
        self.execute(q, (node, before, cluster))
525
        self.execute(q, args + [cluster])
517 526
        props = self.fetchone()
518 527
        if props is not None:
519 528
            return props
......
525 534
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
526 535
        """
527 536
        
537
        if not nodes:
538
        	return ()
528 539
        q = ("select %s "
529 540
             "from versions "
530
             "where serial in (select max(serial) "
531
                             "from versions "
532
                             "where node in (%s) and mtime < ? group by node) "
541
             "where serial in %s "
533 542
             "and cluster = ? %s")
534
        placeholders = ','.join('?' for node in nodes)
543
        subq, args = self._construct_latest_versions_subquery(nodes=nodes, before = before)
535 544
        if not all_props:
536
            q = q % ("serial",  placeholders, '')
545
            q = q % ("serial", subq, '')
537 546
        else:
538
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster",  placeholders, 'order by node')
547
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster",  subq, 'order by node')
539 548
        
540
        args = nodes
541
        args.extend((before, cluster))
549
        args += [cluster]
542 550
        self.execute(q, args)
543 551
        return self.fetchall()
544 552
    
......
604 612
        
605 613
        q = "delete from versions where serial = ?"
606 614
        self.execute(q, (serial,))
615
        
616
        props = self.version_lookup(node, cluster=cluster, all_props=False)
617
        if props:
618
        	self.nodes_set_latest_version(node, props[0])
607 619
        return hash, size
608 620
    
609 621
    def attribute_get(self, serial, domain, keys=()):
......
725 737
        
726 738
        return subq, args
727 739
    
740
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
741
        if before == inf:
742
            q = ("n.latest_version ")
743
            args = []
744
        else:
745
            q = ("(select max(serial) "
746
				   "from versions "
747
				   "where node = v.node and mtime < ?) ")
748
            args = [before]
749
        return q, args
750
    
751
    def _construct_latest_version_subquery(self, node=None, before=inf):
752
        where_cond = "node = v.node"
753
        args = []
754
        if node:
755
            where_cond = "node = ? "
756
            args = [node]
757
        
758
        if before == inf:
759
            q = ("(select latest_version "
760
                   "from nodes "
761
                   "where %s) ")
762
        else:
763
            q = ("(select max(serial) "
764
                   "from versions "
765
                   "where %s and mtime < ?) ")
766
            args += [before]
767
        return q % where_cond, args
768
    
769
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
770
        where_cond = ""
771
        args = []
772
        if nodes:
773
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
774
            args = nodes
775
        
776
        if before == inf:
777
            q = ("(select latest_version "
778
                   "from nodes "
779
                   "where %s ) ")
780
        else:
781
            q = ("(select max(serial) "
782
				 "from versions "
783
				 "where %s and mtime < ? group by node) ")
784
            args += [before]
785
        return q % where_cond, args
786
    
728 787
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
729 788
        """Return a list with all keys pairs defined
730 789
           for all latest versions under parent that
......
734 793
        # TODO: Use another table to store before=inf results.
735 794
        q = ("select distinct a.key "
736 795
             "from attributes a, versions v, nodes n "
737
             "where v.serial = (select max(serial) "
738
                               "from versions "
739
                               "where node = v.node and mtime < ?) "
796
             "where v.serial = %s "
740 797
             "and v.cluster != ? "
741 798
             "and v.node in (select node "
742 799
                            "from nodes "
......
744 801
             "and a.serial = v.serial "
745 802
             "and a.domain = ? "
746 803
             "and n.node = v.node")
747
        args = (before, except_cluster, parent, domain)
804
        subq, subargs = self._construct_latest_version_subquery(node=None, before=before)
805
        args = subargs + [except_cluster, parent, domain]
806
        q = q % subq
748 807
        subq, subargs = self._construct_paths(pathq)
749 808
        if subq is not None:
750 809
            q += subq
......
814 873
        
815 874
        q = ("select distinct n.path, %s "
816 875
             "from versions v, nodes n "
817
             "where v.serial = (select max(serial) "
818
                               "from versions "
819
                               "where node = v.node and mtime < ?) "
876
             "where v.serial = %s "
820 877
             "and v.cluster != ? "
821 878
             "and v.node in (select node "
822 879
                            "from nodes "
823 880
                            "where parent = ?) "
824 881
             "and n.node = v.node "
825 882
             "and n.path > ? and n.path < ?")
883
        subq, args = self._construct_versions_nodes_latest_version_subquery(before)
826 884
        if not all_props:
827
            q = q % "v.serial"
885
            q = q % ("v.serial", subq)
828 886
        else:
829
            q = q % "v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster"
830
        args = [before, except_cluster, parent, start, nextling]
887
            q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
888
        args += [except_cluster, parent, start, nextling]
889
        start_index = len(args) - 2
831 890
        
832 891
        subq, subargs = self._construct_paths(pathq)
833 892
        if subq is not None:
......
886 945
            if count >= limit: 
887 946
                break
888 947
            
889
            args[3] = strnextling(pf) # New start.
948
            args[start_index] = strnextling(pf) # New start.
890 949
            execute(q, args)
891 950
        
892 951
        return matches, prefixes

Also available in: Unified diff