Revision 63092950 snf-pithos-backend/pithos/backends/lib/sqlalchemy/node.py

b/snf-pithos-backend/pithos/backends/lib/sqlalchemy/node.py
39 39
                        Column, String, MetaData, ForeignKey)
40 40
from sqlalchemy.schema import Index
41 41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.sql.expression import true
42 43
from sqlalchemy.exc import NoSuchTableError
43 44

  
44 45
from dbworker import DBWorker, ESCAPE_CHAR
......
254 255

  
255 256
    def node_lookup_bulk(self, paths):
256 257
        """Lookup the current nodes for the given paths.
257
           Return () if the path is not found.
258
           Return {} if the path is not found.
258 259
        """
259 260

  
260 261
        if not paths:
261
            return ()
262
            return {}
262 263
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
263
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
264
        s = select([self.nodes.c.path, self.nodes.c.node],
265
                   self.nodes.c.path.in_(paths))
264 266
        r = self.conn.execute(s)
265 267
        rows = r.fetchall()
266 268
        r.close()
267
        return [row[0] for row in rows]
269
        return dict([(row.node, row.path) for row in rows])
268 270

  
269 271
    def node_get_properties(self, node):
270 272
        """Return the node's (parent, path).
......
278 280
        r.close()
279 281
        return l
280 282

  
283
    def node_get_properties_bulk(self, nodes):
284
        """Return the (parent, path) for the specific nodes.
285
        """
286

  
287
        s = select([self.nodes.c.node, self.nodes.c.parent, self.nodes.c.path])
288
        s = s.where(self.nodes.c.node.in_(nodes))
289
        r = self.conn.execute(s)
290
        l = r.fetchall()
291
        r.close()
292
        return l
293

  
281 294
    def node_get_versions(self, node, keys=(), propnames=_propnames):
282 295
        """Return the properties of all versions at node.
283 296
           If keys is empty, return all properties in the order
......
605 618
            i += 1
606 619

  
607 620
    def statistics_latest(self, node, before=inf, except_cluster=0):
608
        """Return population, total size and last mtime
621
        """Compute population, total size and last mtime
609 622
           for all latest versions under node that
610 623
           do not belong to the cluster.
611 624
        """
......
617 630
        parent, path = props
618 631

  
619 632
        # The latest version.
620
        s = select([self.versions.c.serial,
621
                    self.versions.c.node,
622
                    self.versions.c.hash,
623
                    self.versions.c.size,
624
                    self.versions.c.type,
625
                    self.versions.c.source,
626
                    self.versions.c.mtime,
627
                    self.versions.c.muser,
628
                    self.versions.c.uuid,
629
                    self.versions.c.checksum,
630
                    self.versions.c.cluster])
633
        s = select([self.versions.c.size,
634
                    self.versions.c.mtime])
631 635
        if before != inf:
632 636
            filtered = select([func.max(self.versions.c.serial)],
633 637
                              self.versions.c.node == node)
......
642 646
        r.close()
643 647
        if not props:
644 648
            return None
645
        mtime = props[MTIME]
649
        mtime = props.mtime
646 650

  
647 651
        # First level, just under node (get population).
648 652
        v = self.versions.alias('v')
......
703 707
        rp.close()
704 708
        if not r:
705 709
            return None
706
        size = long(r[1] - props[SIZE])
710
        size = long(r[1] - props.size)
707 711
        mtime = max(mtime, r[2])
708 712
        return (count, size, mtime)
709 713

  
714
    def statistics_latest_bulk(self, nodes, before=inf, except_cluster=0):
715
        """Compute population, total size and last mtime
716
           for all latest versions under node that
717
           do not belong to the cluster.
718
        """
719

  
720
        # The node.
721
        props = self.node_get_properties_bulk(nodes)
722
        if not props:  # empty list
723
            return ()
724
        paths = [p.path for p in props]
725

  
726
        # The latest version.
727
        s = select([self.versions.c.node,
728
                    self.versions.c.size,
729
                    self.versions.c.mtime])
730
        if before != inf:
731
            filtered = select([func.max(self.versions.c.serial)],
732
                              self.versions.c.node.in_(nodes))
733
            filtered = filtered.where(self.versions.c.mtime < before)
734
        else:
735
            filtered = select([self.nodes.c.latest_version],
736
                              self.nodes.c.node.in_(nodes))
737
        s = s.where(and_(self.versions.c.cluster != except_cluster,
738
                         self.versions.c.serial.in_(filtered)))
739
        r = self.conn.execute(s)
740
        props = r.fetchall()
741
        r.close()
742
        if not props:
743
            return ()
744
        mtimes = {}
745
        sizes = {}
746
        for p in props:
747
            mtimes[p.node] = p.mtime
748
            sizes[p.node] = p.size
749

  
750
        # First level, just under node (get population).
751
        n = self.nodes.alias('n')
752
        v = self.versions.alias('v')
753
        s = select([n.c.parent,
754
                    func.count(v.c.serial).label('count'),
755
                    func.sum(v.c.size).label('total_size'),
756
                    func.max(v.c.mtime).label('max_timestamp')])
757
        if before != inf:
758
            c1 = select([func.max(self.versions.c.serial)],
759
                        and_(self.versions.c.mtime < before,
760
                             self.versions.c.node == v.c.node))
761
        else:
762
            c1 = select([self.nodes.c.latest_version])
763
            c1 = c1.where(self.nodes.c.node == v.c.node)
764
        c2 = select([self.nodes.c.node], self.nodes.c.parent.in_(nodes))
765
        s = s.where(and_(v.c.serial == c1,
766
                         v.c.cluster != except_cluster,
767
                         v.c.node.in_(c2),
768
                         v.c.node == n.c.node))
769
        s = s.group_by(n.c.parent)
770
        rp = self.conn.execute(s)
771
        r = rp.fetchall()
772
        rp.close()
773
        if not r:
774
            return None
775
        counts = {}
776
        for stats in r:
777
            counts[stats.parent] = stats.total_size
778
            mtimes[stats.parent] = max(mtimes[stats.parent],
779
                                       stats.max_timestamp)
780

  
781
        # All children (get size and mtime).
782
        # This is why the full path is stored.
783
        n1 = self.nodes.alias('n1')
784
        n2 = self.nodes.alias('n2')
785
        if before != inf:
786
            s = select([n2.c.node,
787
                        func.count(v.c.serial).label('count'),
788
                        func.sum(v.c.size).label('total_size'),
789
                        func.max(v.c.mtime).label('max_timestamp')])
790
            c1 = select([func.max(self.versions.c.serial)],
791
                        and_(self.versions.c.mtime < before,
792
                             self.versions.c.node == v.c.node))
793
        else:
794
            inner_join = \
795
                    self.versions.join(self.nodes, onclause=\
796
                    self.versions.c.serial == self.nodes.c.latest_version)
797
            s = select([n2.c.node,
798
                        func.count(self.versions.c.serial).label('count'),
799
                        func.sum(self.versions.c.size).label('total_size'),
800
                        func.max(self.versions.c.mtime).label('max_timestamp')],
801
                       from_obj=[inner_join])
802

  
803
        c2 = select([self.nodes.c.node])
804
        like = lambda p: self.nodes.c.path.like(self.escape_like(p) + '%',
805
                                                escape=ESCAPE_CHAR)
806
        c2 = c2.where(or_(*map(like, paths)))
807
        if before != inf:
808
            s = s.where(and_(v.c.serial == c1,
809
                         v.c.cluster != except_cluster,
810
                         v.c.node.in_(c2)))
811
        else:
812
            s = s.where(and_(self.versions.c.cluster != except_cluster,
813
                        self.versions.c.node.in_(c2)))
814
        s = s.where(self.nodes.c.parent == n1.c.node)
815
        s = s.where(n1.c.parent == n2.c.node)
816
        s = s.where(n2.c.node != ROOTNODE)
817
        s = s.group_by(n2.c.node)
818

  
819
        rp = self.conn.execute(s)
820
        r = rp.fetchall()
821
        rp.close()
822
        if not r:
823
            return None
824
        for stats in r:
825
            sizes[stats.node] = long(stats.total_size - sizes[stats.node])
826
            mtimes[stats.node] = max(mtimes[stats.node], stats.max_timestamp)
827
        l = []
828
        append = l.append
829
        for node in nodes:
830
            append((node, counts[node], sizes[node], mtimes[node]))
831
        return l
832

  
710 833
    def nodes_set_latest_version(self, node, serial):
711 834
        s = self.nodes.update().where(self.nodes.c.node == node)
712 835
        s = s.values(latest_version=serial)
......
798 921
        else:
799 922
            s = s.order_by(v.c.node)
800 923
        r = self.conn.execute(s)
801
        rproxy = r.fetchall()
924
        rows = r.fetchall()
802 925
        r.close()
803
        return (tuple(row.values()) for row in rproxy)
926
        return rows
804 927

  
805 928
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
806 929
                               node=None):
......
1129 1252
            elif match == MATCH_EXACT:
1130 1253
                conjb.append(path)
1131 1254
        if conja or conjb:
1132
            s = s.where(or_(self.nodes.c.path.in_(conjb),*conja))
1255
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1133 1256

  
1134 1257
        if sizeq and len(sizeq) == 2:
1135 1258
            if sizeq[0]:
......
1142 1265
            included, excluded, opers = parse_filters(filterq)
1143 1266
            if included:
1144 1267
                subs = select([1])
1145
                subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1268
                subs = subs.where(
1269
                    self.attributes.c.serial ==
1270
                    self.versions.c.serial).correlate(self.versions)
1146 1271
                subs = subs.where(self.attributes.c.domain == domain)
1147
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in included]))
1272
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for
1273
                                  x in included]))
1148 1274
                s = s.where(exists(subs))
1149 1275
            if excluded:
1150 1276
                subs = select([1])
1151
                subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1277
                subs = subs.where(
1278
                    self.attributes.c.serial ==
1279
                    self.versions.c.serial).correlate(self.versions)
1152 1280
                subs = subs.where(self.attributes.c.domain == domain)
1153
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in excluded]))
1281
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for
1282
                                  x in excluded]))
1154 1283
                s = s.where(not_(exists(subs)))
1155 1284
            if opers:
1156 1285
                for k, o, val in opers:
1157 1286
                    subs = select([1])
1158
                    subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1287
                    subs = subs.where(self.attributes.c.serial ==
1288
                                      self.versions.c.serial)
1289
                    subs = subs.correlate(self.versions)
1159 1290
                    subs = subs.where(self.attributes.c.domain == domain)
1160 1291
                    subs = subs.where(
1161
                        and_(self.attributes.c.key.op('=')(k), self.attributes.c.value.op(o)(val)))
1292
                        and_(self.attributes.c.key.op('=')(k),
1293
                             self.attributes.c.value.op(o)(val)))
1162 1294
                    s = s.where(exists(subs))
1163 1295

  
1164 1296
        s = s.order_by(self.nodes.c.path)
......
1247 1379
        s = s.where(v.c.serial == a.c.serial)
1248 1380
        s = s.where(a.c.domain == domain)
1249 1381
        s = s.where(a.c.node == n.c.node)
1250
        s = s.where(a.c.is_latest == True)
1382
        s = s.where(a.c.is_latest == true())
1251 1383
        if paths:
1252 1384
            s = s.where(n.c.path.in_(paths))
1253 1385

  
......
1264 1396
    def get_props(self, paths):
1265 1397
        inner_join = \
1266 1398
            self.nodes.join(self.versions,
1267
                onclause=self.versions.c.serial == self.nodes.c.latest_version)
1399
                            onclause=self.versions.c.serial ==
1400
                            self.nodes.c.latest_version)
1268 1401
        cc = self.nodes.c.path.in_(paths)
1269 1402
        s = select([self.nodes.c.path, self.versions.c.type],
1270
                    from_obj=[inner_join]).where(cc).distinct()
1403
                   from_obj=[inner_join]).where(cc).distinct()
1271 1404
        r = self.conn.execute(s)
1272 1405
        rows = r.fetchall()
1273 1406
        r.close()

Also available in: Unified diff