Revision 63092950 snf-pithos-backend/pithos/backends/lib/sqlalchemy/node.py
b/snf-pithos-backend/pithos/backends/lib/sqlalchemy/node.py | ||
---|---|---|
39 | 39 |
Column, String, MetaData, ForeignKey) |
40 | 40 |
from sqlalchemy.schema import Index |
41 | 41 |
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists |
42 |
from sqlalchemy.sql.expression import true |
|
42 | 43 |
from sqlalchemy.exc import NoSuchTableError |
43 | 44 |
|
44 | 45 |
from dbworker import DBWorker, ESCAPE_CHAR |
... | ... | |
254 | 255 |
|
255 | 256 |
def node_lookup_bulk(self, paths): |
256 | 257 |
"""Lookup the current nodes for the given paths. |
257 |
Return () if the path is not found.
|
|
258 |
Return {} if the path is not found.
|
|
258 | 259 |
""" |
259 | 260 |
|
260 | 261 |
if not paths: |
261 |
return ()
|
|
262 |
return {}
|
|
262 | 263 |
# Use LIKE for comparison to avoid MySQL problems with trailing spaces. |
263 |
s = select([self.nodes.c.node], self.nodes.c.path.in_(paths)) |
|
264 |
s = select([self.nodes.c.path, self.nodes.c.node], |
|
265 |
self.nodes.c.path.in_(paths)) |
|
264 | 266 |
r = self.conn.execute(s) |
265 | 267 |
rows = r.fetchall() |
266 | 268 |
r.close() |
267 |
return [row[0] for row in rows]
|
|
269 |
return dict([(row.node, row.path) for row in rows])
|
|
268 | 270 |
|
269 | 271 |
def node_get_properties(self, node): |
270 | 272 |
"""Return the node's (parent, path). |
... | ... | |
278 | 280 |
r.close() |
279 | 281 |
return l |
280 | 282 |
|
283 |
def node_get_properties_bulk(self, nodes): |
|
284 |
"""Return the (parent, path) for the specific nodes. |
|
285 |
""" |
|
286 |
|
|
287 |
s = select([self.nodes.c.node, self.nodes.c.parent, self.nodes.c.path]) |
|
288 |
s = s.where(self.nodes.c.node.in_(nodes)) |
|
289 |
r = self.conn.execute(s) |
|
290 |
l = r.fetchall() |
|
291 |
r.close() |
|
292 |
return l |
|
293 |
|
|
281 | 294 |
def node_get_versions(self, node, keys=(), propnames=_propnames): |
282 | 295 |
"""Return the properties of all versions at node. |
283 | 296 |
If keys is empty, return all properties in the order |
... | ... | |
605 | 618 |
i += 1 |
606 | 619 |
|
607 | 620 |
def statistics_latest(self, node, before=inf, except_cluster=0): |
608 |
"""Return population, total size and last mtime
|
|
621 |
"""Compute population, total size and last mtime
|
|
609 | 622 |
for all latest versions under node that |
610 | 623 |
do not belong to the cluster. |
611 | 624 |
""" |
... | ... | |
617 | 630 |
parent, path = props |
618 | 631 |
|
619 | 632 |
# The latest version. |
620 |
s = select([self.versions.c.serial, |
|
621 |
self.versions.c.node, |
|
622 |
self.versions.c.hash, |
|
623 |
self.versions.c.size, |
|
624 |
self.versions.c.type, |
|
625 |
self.versions.c.source, |
|
626 |
self.versions.c.mtime, |
|
627 |
self.versions.c.muser, |
|
628 |
self.versions.c.uuid, |
|
629 |
self.versions.c.checksum, |
|
630 |
self.versions.c.cluster]) |
|
633 |
s = select([self.versions.c.size, |
|
634 |
self.versions.c.mtime]) |
|
631 | 635 |
if before != inf: |
632 | 636 |
filtered = select([func.max(self.versions.c.serial)], |
633 | 637 |
self.versions.c.node == node) |
... | ... | |
642 | 646 |
r.close() |
643 | 647 |
if not props: |
644 | 648 |
return None |
645 |
mtime = props[MTIME]
|
|
649 |
mtime = props.mtime
|
|
646 | 650 |
|
647 | 651 |
# First level, just under node (get population). |
648 | 652 |
v = self.versions.alias('v') |
... | ... | |
703 | 707 |
rp.close() |
704 | 708 |
if not r: |
705 | 709 |
return None |
706 |
size = long(r[1] - props[SIZE])
|
|
710 |
size = long(r[1] - props.size)
|
|
707 | 711 |
mtime = max(mtime, r[2]) |
708 | 712 |
return (count, size, mtime) |
709 | 713 |
|
714 |
def statistics_latest_bulk(self, nodes, before=inf, except_cluster=0): |
|
715 |
"""Compute population, total size and last mtime |
|
716 |
for all latest versions under node that |
|
717 |
do not belong to the cluster. |
|
718 |
""" |
|
719 |
|
|
720 |
# The node. |
|
721 |
props = self.node_get_properties_bulk(nodes) |
|
722 |
if not props: # empty list |
|
723 |
return () |
|
724 |
paths = [p.path for p in props] |
|
725 |
|
|
726 |
# The latest version. |
|
727 |
s = select([self.versions.c.node, |
|
728 |
self.versions.c.size, |
|
729 |
self.versions.c.mtime]) |
|
730 |
if before != inf: |
|
731 |
filtered = select([func.max(self.versions.c.serial)], |
|
732 |
self.versions.c.node.in_(nodes)) |
|
733 |
filtered = filtered.where(self.versions.c.mtime < before) |
|
734 |
else: |
|
735 |
filtered = select([self.nodes.c.latest_version], |
|
736 |
self.nodes.c.node.in_(nodes)) |
|
737 |
s = s.where(and_(self.versions.c.cluster != except_cluster, |
|
738 |
self.versions.c.serial.in_(filtered))) |
|
739 |
r = self.conn.execute(s) |
|
740 |
props = r.fetchall() |
|
741 |
r.close() |
|
742 |
if not props: |
|
743 |
return () |
|
744 |
mtimes = {} |
|
745 |
sizes = {} |
|
746 |
for p in props: |
|
747 |
mtimes[p.node] = p.mtime |
|
748 |
sizes[p.node] = p.size |
|
749 |
|
|
750 |
# First level, just under node (get population). |
|
751 |
n = self.nodes.alias('n') |
|
752 |
v = self.versions.alias('v') |
|
753 |
s = select([n.c.parent, |
|
754 |
func.count(v.c.serial).label('count'), |
|
755 |
func.sum(v.c.size).label('total_size'), |
|
756 |
func.max(v.c.mtime).label('max_timestamp')]) |
|
757 |
if before != inf: |
|
758 |
c1 = select([func.max(self.versions.c.serial)], |
|
759 |
and_(self.versions.c.mtime < before, |
|
760 |
self.versions.c.node == v.c.node)) |
|
761 |
else: |
|
762 |
c1 = select([self.nodes.c.latest_version]) |
|
763 |
c1 = c1.where(self.nodes.c.node == v.c.node) |
|
764 |
c2 = select([self.nodes.c.node], self.nodes.c.parent.in_(nodes)) |
|
765 |
s = s.where(and_(v.c.serial == c1, |
|
766 |
v.c.cluster != except_cluster, |
|
767 |
v.c.node.in_(c2), |
|
768 |
v.c.node == n.c.node)) |
|
769 |
s = s.group_by(n.c.parent) |
|
770 |
rp = self.conn.execute(s) |
|
771 |
r = rp.fetchall() |
|
772 |
rp.close() |
|
773 |
if not r: |
|
774 |
return None |
|
775 |
counts = {} |
|
776 |
for stats in r: |
|
777 |
counts[stats.parent] = stats.total_size |
|
778 |
mtimes[stats.parent] = max(mtimes[stats.parent], |
|
779 |
stats.max_timestamp) |
|
780 |
|
|
781 |
# All children (get size and mtime). |
|
782 |
# This is why the full path is stored. |
|
783 |
n1 = self.nodes.alias('n1') |
|
784 |
n2 = self.nodes.alias('n2') |
|
785 |
if before != inf: |
|
786 |
s = select([n2.c.node, |
|
787 |
func.count(v.c.serial).label('count'), |
|
788 |
func.sum(v.c.size).label('total_size'), |
|
789 |
func.max(v.c.mtime).label('max_timestamp')]) |
|
790 |
c1 = select([func.max(self.versions.c.serial)], |
|
791 |
and_(self.versions.c.mtime < before, |
|
792 |
self.versions.c.node == v.c.node)) |
|
793 |
else: |
|
794 |
inner_join = \ |
|
795 |
self.versions.join(self.nodes, onclause=\ |
|
796 |
self.versions.c.serial == self.nodes.c.latest_version) |
|
797 |
s = select([n2.c.node, |
|
798 |
func.count(self.versions.c.serial).label('count'), |
|
799 |
func.sum(self.versions.c.size).label('total_size'), |
|
800 |
func.max(self.versions.c.mtime).label('max_timestamp')], |
|
801 |
from_obj=[inner_join]) |
|
802 |
|
|
803 |
c2 = select([self.nodes.c.node]) |
|
804 |
like = lambda p: self.nodes.c.path.like(self.escape_like(p) + '%', |
|
805 |
escape=ESCAPE_CHAR) |
|
806 |
c2 = c2.where(or_(*map(like, paths))) |
|
807 |
if before != inf: |
|
808 |
s = s.where(and_(v.c.serial == c1, |
|
809 |
v.c.cluster != except_cluster, |
|
810 |
v.c.node.in_(c2))) |
|
811 |
else: |
|
812 |
s = s.where(and_(self.versions.c.cluster != except_cluster, |
|
813 |
self.versions.c.node.in_(c2))) |
|
814 |
s = s.where(self.nodes.c.parent == n1.c.node) |
|
815 |
s = s.where(n1.c.parent == n2.c.node) |
|
816 |
s = s.where(n2.c.node != ROOTNODE) |
|
817 |
s = s.group_by(n2.c.node) |
|
818 |
|
|
819 |
rp = self.conn.execute(s) |
|
820 |
r = rp.fetchall() |
|
821 |
rp.close() |
|
822 |
if not r: |
|
823 |
return None |
|
824 |
for stats in r: |
|
825 |
sizes[stats.node] = long(stats.total_size - sizes[stats.node]) |
|
826 |
mtimes[stats.node] = max(mtimes[stats.node], stats.max_timestamp) |
|
827 |
l = [] |
|
828 |
append = l.append |
|
829 |
for node in nodes: |
|
830 |
append((node, counts[node], sizes[node], mtimes[node])) |
|
831 |
return l |
|
832 |
|
|
710 | 833 |
def nodes_set_latest_version(self, node, serial): |
711 | 834 |
s = self.nodes.update().where(self.nodes.c.node == node) |
712 | 835 |
s = s.values(latest_version=serial) |
... | ... | |
798 | 921 |
else: |
799 | 922 |
s = s.order_by(v.c.node) |
800 | 923 |
r = self.conn.execute(s) |
801 |
rproxy = r.fetchall()
|
|
924 |
rows = r.fetchall()
|
|
802 | 925 |
r.close() |
803 |
return (tuple(row.values()) for row in rproxy)
|
|
926 |
return rows
|
|
804 | 927 |
|
805 | 928 |
def version_get_properties(self, serial, keys=(), propnames=_propnames, |
806 | 929 |
node=None): |
... | ... | |
1129 | 1252 |
elif match == MATCH_EXACT: |
1130 | 1253 |
conjb.append(path) |
1131 | 1254 |
if conja or conjb: |
1132 |
s = s.where(or_(self.nodes.c.path.in_(conjb),*conja)) |
|
1255 |
s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
|
|
1133 | 1256 |
|
1134 | 1257 |
if sizeq and len(sizeq) == 2: |
1135 | 1258 |
if sizeq[0]: |
... | ... | |
1142 | 1265 |
included, excluded, opers = parse_filters(filterq) |
1143 | 1266 |
if included: |
1144 | 1267 |
subs = select([1]) |
1145 |
subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions) |
|
1268 |
subs = subs.where( |
|
1269 |
self.attributes.c.serial == |
|
1270 |
self.versions.c.serial).correlate(self.versions) |
|
1146 | 1271 |
subs = subs.where(self.attributes.c.domain == domain) |
1147 |
subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in included])) |
|
1272 |
subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for |
|
1273 |
x in included])) |
|
1148 | 1274 |
s = s.where(exists(subs)) |
1149 | 1275 |
if excluded: |
1150 | 1276 |
subs = select([1]) |
1151 |
subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions) |
|
1277 |
subs = subs.where( |
|
1278 |
self.attributes.c.serial == |
|
1279 |
self.versions.c.serial).correlate(self.versions) |
|
1152 | 1280 |
subs = subs.where(self.attributes.c.domain == domain) |
1153 |
subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in excluded])) |
|
1281 |
subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for |
|
1282 |
x in excluded])) |
|
1154 | 1283 |
s = s.where(not_(exists(subs))) |
1155 | 1284 |
if opers: |
1156 | 1285 |
for k, o, val in opers: |
1157 | 1286 |
subs = select([1]) |
1158 |
subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions) |
|
1287 |
subs = subs.where(self.attributes.c.serial == |
|
1288 |
self.versions.c.serial) |
|
1289 |
subs = subs.correlate(self.versions) |
|
1159 | 1290 |
subs = subs.where(self.attributes.c.domain == domain) |
1160 | 1291 |
subs = subs.where( |
1161 |
and_(self.attributes.c.key.op('=')(k), self.attributes.c.value.op(o)(val))) |
|
1292 |
and_(self.attributes.c.key.op('=')(k), |
|
1293 |
self.attributes.c.value.op(o)(val))) |
|
1162 | 1294 |
s = s.where(exists(subs)) |
1163 | 1295 |
|
1164 | 1296 |
s = s.order_by(self.nodes.c.path) |
... | ... | |
1247 | 1379 |
s = s.where(v.c.serial == a.c.serial) |
1248 | 1380 |
s = s.where(a.c.domain == domain) |
1249 | 1381 |
s = s.where(a.c.node == n.c.node) |
1250 |
s = s.where(a.c.is_latest == True)
|
|
1382 |
s = s.where(a.c.is_latest == true())
|
|
1251 | 1383 |
if paths: |
1252 | 1384 |
s = s.where(n.c.path.in_(paths)) |
1253 | 1385 |
|
... | ... | |
1264 | 1396 |
def get_props(self, paths): |
1265 | 1397 |
inner_join = \ |
1266 | 1398 |
self.nodes.join(self.versions, |
1267 |
onclause=self.versions.c.serial == self.nodes.c.latest_version) |
|
1399 |
onclause=self.versions.c.serial == |
|
1400 |
self.nodes.c.latest_version) |
|
1268 | 1401 |
cc = self.nodes.c.path.in_(paths) |
1269 | 1402 |
s = select([self.nodes.c.path, self.versions.c.type], |
1270 |
from_obj=[inner_join]).where(cc).distinct()
|
|
1403 |
from_obj=[inner_join]).where(cc).distinct() |
|
1271 | 1404 |
r = self.conn.execute(s) |
1272 | 1405 |
rows = r.fetchall() |
1273 | 1406 |
r.close() |
Also available in: Unified diff