self.params = params
self.conn = params['wrapper'].conn
self.engine = params['wrapper'].engine
+
+ def escape_like(self, s):
+ return s.replace('\\', '\\\\').replace('%', '\%').replace('_', '\_')
"""
# Use LIKE for comparison to avoid MySQL problems with trailing spaces.
- path = path.replace('%', '\%')
- path = path.replace('_', '\_')
- s = select([self.nodes.c.node], self.nodes.c.path.like(path, escape='\\'))
+ s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
r = self.conn.execute(s)
row = r.fetchone()
r.close()
self.versions.c.node == v.c.node)
if before != inf:
c1 = c1.where(self.versions.c.mtime < before)
- c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
+ c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
s = s.where(and_(v.c.serial == c1,
v.c.cluster != except_cluster,
v.c.node.in_(c2)))
s = s.where(n.c.node == v.c.node)
conj = []
for x in pathq:
- conj.append(n.c.path.like(x + '%'))
+ conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
if conj:
s = s.where(or_(*conj))
rp = self.conn.execute(s)
s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
conj = []
for x in pathq:
- conj.append(n.c.path.like(x + '%'))
+ conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
if conj:
s = s.where(or_(*conj))
self.xfeaturevals.c.value == u.c.value)
s = select([self.xfeatures.c.path], from_obj=[inner_join]).distinct()
if prefix:
- s = s.where(self.xfeatures.c.path.like(prefix + '%'))
+ s = s.where(self.xfeatures.c.path.like(self.escape_like(prefix) + '%', escape='\\'))
r = self.conn.execute(s)
l = [row[0] for row in r.fetchall()]
r.close()
"""Return the list of shared paths."""
s = select([self.xfeatures.c.path],
- self.xfeatures.c.path.like(prefix + '%')).order_by(self.xfeatures.c.path.asc())
+ self.xfeatures.c.path.like(self.escape_like(prefix) + '%', escape='\\')).order_by(self.xfeatures.c.path.asc())
r = self.conn.execute(s)
l = [row[0] for row in r.fetchall()]
r.close()
return [inherited]
s = select([self.xfeatures.c.path, self.xfeatures.c.feature_id])
- s = s.where(and_(self.xfeatures.c.path.like(path + '%'),
+ s = s.where(and_(self.xfeatures.c.path.like(self.escape_like(path) + '%', escape='\\'),
self.xfeatures.c.path != path))
s = s.order_by(self.xfeatures.c.path)
r = self.conn.execute(s)
self.fetchall = cur.fetchall
self.cur = cur
self.conn = conn
+
+ def escape_like(self, s):
+ return s.replace('\\', '\\\\').replace('%', '\%').replace('_', '\_')
def __init__(self, db):
self.conn = sqlite3.connect(db, check_same_thread=False)
+ self.conn.execute(""" pragma case_sensitive_like = on """)
def close(self):
self.conn.close()
"and cluster != ? "
"and node in (select node "
"from nodes "
- "where path like ?)")
- execute(q, (before, except_cluster, path + '%'))
+ "where path like ? escape '\\')")
+ execute(q, (before, except_cluster, self.escape_like(path) + '%'))
r = fetchone()
if r is None:
return None
return None, None
subq = " and ("
- subq += ' or '.join(('n.path like ?' for x in pathq))
+ subq += ' or '.join(("n.path like ? escape '\\'" for x in pathq))
subq += ")"
- args = tuple([x + '%' for x in pathq])
+ args = tuple([self.escape_like(x) + '%' for x in pathq])
return subq, args
"using (feature_id)")
p = (member, member)
if prefix:
- q += " where path like ?"
- p += (prefix + '%',)
+ q += " where path like ? escape '\\'"
+ p += (self.escape_like(prefix) + '%',)
self.execute(q, p)
return [r[0] for r in self.fetchall()]
def access_list_shared(self, prefix=''):
"""Return the list of shared paths."""
- q = "select path from xfeatures where path like ?"
- self.execute(q, (prefix + '%',))
+ q = "select path from xfeatures where path like ? escape '\\'"
+ self.execute(q, (self.escape_like(prefix) + '%',))
return [r[0] for r in self.fetchall()]
return [inherited]
q = ("select path, feature_id from xfeatures "
- "where path like ? and path != ? order by path")
- self.execute(q, (path + '%', path,))
+ "where path like ? escape '\\' and path != ? order by path")
+ self.execute(q, (self.escape_like(path) + '%', path,))
return self.fetchall()
def xfeature_create(self, path):