359d237ddc90f1a4cf4a626579b0bd856bebf043
[pithos] / pithos / backends / lib_alchemy / node.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from time import time
35 from sqlalchemy import Table, Integer, Column, String, MetaData, ForeignKey
36 from sqlalchemy.schema import Index, Sequence
37 from sqlalchemy.sql import func, and_, null
38 from sqlalchemy.sql import select
39
40 from dbworker import DBWorker
41
42
43 ROOTNODE  = 1
44
45 ( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
46
47 inf = float('inf')
48
49
50 def strnextling(prefix):
51     """Return the first unicode string
52        greater than but not starting with given prefix.
53        strnextling('hello') -> 'hellp'
54     """
55     if not prefix:
56         ## all strings start with the null string,
57         ## therefore we have to approximate strnextling('')
58         ## with the last unicode character supported by python
59         ## 0x10ffff for wide (32-bit unicode) python builds
60         ## 0x00ffff for narrow (16-bit unicode) python builds
61         ## We will not autodetect. 0xffff is safe enough.
62         return unichr(0xffff)
63     s = prefix[:-1]
64     c = ord(prefix[-1])
65     if c >= 0xffff:
66         raise RuntimeError
67     s += unichr(c+1)
68     return s
69
70 def strprevling(prefix):
71     """Return an approximation of the last unicode string
72        less than but not starting with given prefix.
73        strprevling(u'hello') -> u'helln\\xffff'
74     """
75     if not prefix:
76         ## There is no prevling for the null string
77         return prefix
78     s = prefix[:-1]
79     c = ord(prefix[-1])
80     if c > 0:
81         s += unichr(c-1) + unichr(0xffff)
82     return s
83
84
85 _propnames = {
86     'serial'    : 0,
87     'node'      : 1,
88     'size'      : 2,
89     'source'    : 3,
90     'mtime'     : 4,
91     'muser'     : 5,
92     'cluster'   : 6,
93 }
94
95
96 class Node(DBWorker):
97     """Nodes store path organization and have multiple versions.
98        Versions store object history and have multiple attributes.
99        Attributes store metadata.
100     """
101     
102     # TODO: Provide an interface for included and excluded clusters.
103     
104     def __init__(self, **params):
105         DBWorker.__init__(self, **params)
106         metadata = MetaData()
107         
108         #create nodes table
109         columns=[]
110         columns.append(Column('node', Integer, primary_key=True))
111         columns.append(Column('parent', Integer,
112                               ForeignKey('nodes.node',
113                                          ondelete='CASCADE',
114                                          onupdate='CASCADE'),
115                               autoincrement=False))
116         #columns.append(Column('path', String(2048), default='', nullable=False))
117         columns.append(Column('path', String(255), default='', nullable=False))
118         self.nodes = Table('nodes', metadata, *columns)
119         # place an index on path
120         Index('idx_nodes_path', self.nodes.c.path, unique=True)
121         
122         #create statistics table
123         columns=[]
124         columns.append(Column('node', Integer,
125                               ForeignKey('nodes.node',
126                                          ondelete='CASCADE',
127                                          onupdate='CASCADE'),
128                               primary_key=True))
129         columns.append(Column('population', Integer, nullable=False, default=0))
130         columns.append(Column('size', Integer, nullable=False, default=0))
131         columns.append(Column('mtime', Integer))
132         columns.append(Column('cluster', Integer, nullable=False, default=0,
133                               primary_key=True))
134         self.statistics = Table('statistics', metadata, *columns)
135         
136         #create versions table
137         columns=[]
138         columns.append(Column('serial', Integer, primary_key=True))
139         columns.append(Column('node', Integer,
140                               ForeignKey('nodes.node',
141                                          ondelete='CASCADE',
142                                          onupdate='CASCADE')))
143         columns.append(Column('size', Integer, nullable=False, default=0))
144         columns.append(Column('source', Integer))
145         columns.append(Column('mtime', Integer))
146         columns.append(Column('muser', String(255), nullable=False, default=''))
147         columns.append(Column('cluster', Integer, nullable=False, default=0))
148         self.versions = Table('versions', metadata, *columns)
149         # place an index on node
150         Index('idx_versions_node', self.versions.c.node)
151         # TODO: Sort out if more indexes are needed.
152         #Index('idx_versions_node', self.versions.c.mtime)
153         
154         #create attributes table
155         columns = []
156         columns.append(Column('serial', Integer,
157                               ForeignKey('versions.serial',
158                                          ondelete='CASCADE',
159                                          onupdate='CASCADE'),
160                               primary_key=True))
161         columns.append(Column('key', String(255), primary_key=True))
162         columns.append(Column('value', String(255)))
163         self.attributes = Table('attributes', metadata, *columns)
164         
165         metadata.create_all(self.engine)
166         
167         s = self.nodes.select().where(and_(self.nodes.c.node == 1,
168                                            self.nodes.c.parent == 1))
169         r = self.conn.execute(s).fetchone()
170         if not r:
171             s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
172             self.conn.execute(s)
173     
174     def node_create(self, parent, path):
175         """Create a new node from the given properties.
176            Return the node identifier of the new node.
177         """
178         #TODO catch IntegrityError?
179         s = self.nodes.insert().values(parent=parent, path=path)
180         r = self.conn.execute(s)
181         inserted_primary_key = r.inserted_primary_key[0]
182         r.close()
183         return inserted_primary_key
184     
185     def node_lookup(self, path):
186         """Lookup the current node of the given path.
187            Return None if the path is not found.
188         """
189         
190         s = select([self.nodes.c.node], self.nodes.c.path == path)
191         r = self.conn.execute(s)
192         row = r.fetchone()
193         r.close()
194         if row:
195             return row[0]
196         return None
197     
198     def node_get_properties(self, node):
199         """Return the node's (parent, path).
200            Return None if the node is not found.
201         """
202         
203         s = select([self.nodes.c.parent, self.nodes.c.path])
204         s = s.where(self.nodes.c.node == node)
205         r = self.conn.execute(s)
206         l = r.fetchone()
207         r.close()
208         return l
209     
210     def node_get_versions(self, node, keys=(), propnames=_propnames):
211         """Return the properties of all versions at node.
212            If keys is empty, return all properties in the order
213            (serial, node, size, source, mtime, muser, cluster).
214         """
215         
216         s = select(['*'], self.versions.c.node == node)
217         r = self.conn.execute(s)
218         rows = r.fetchall()
219         if not rows:
220             return rows
221         
222         if not keys:
223             return rows
224         
225         return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
226     
227     def node_count_children(self, node):
228         """Return node's child count."""
229         
230         s = select([func.count(self.nodes.c.node)])
231         s = s.where(and_(self.nodes.c.parent == node,
232                          self.nodes.c.node != ROOTNODE))
233         r = self.conn.execute(s)
234         row = r.fetchone()
235         r.close()
236         return row[0]
237     
238     def node_purge_children(self, parent, before=inf, cluster=0):
239         """Delete all versions with the specified
240            parent and cluster, and return
241            the serials of versions deleted.
242            Clears out nodes with no remaining versions.
243         """
244         
245         scalar = select([self.nodes.c.node],
246             self.nodes.c.parent == parent).as_scalar()
247         where_clause = and_(self.versions.c.node.in_(scalar),
248                             self.versions.c.cluster == cluster,
249                             "versions.mtime <= %f" %before)
250         s = select([func.count(self.versions.c.serial),
251                     func.sum(self.versions.c.size)])
252         s = s.where(where_clause)
253         r = self.conn.execute(s)
254         row = r.fetchone()
255         r.close()
256         if not row:
257             return ()
258         nr, size = row[0], -row[1] if row[1] else 0
259         mtime = time()
260         print '#', parent, -nr, size, mtime, cluster
261         self.statistics_update(parent, -nr, size, mtime, cluster)
262         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
263         
264         s = select([self.versions.c.serial])
265         s = s.where(where_clause)
266         r = self.conn.execute(s)
267         serials = [row[SERIAL] for row in r.fetchall()]
268         r.close()
269         
270         #delete versiosn
271         s = self.versions.delete().where(where_clause)
272         r = self.conn.execute(s)
273         r.close()
274         
275         #delete nodes
276         a = self.nodes.alias()
277         no_versions = select([func.count(self.versions.c.serial)],
278             self.versions.c.node == a.c.node).as_scalar() == 0
279         n = select([self.nodes.c.node],
280             and_(no_versions, self.nodes.c.parent == parent))
281         s = s.where(self.nodes.c.node.in_(n))
282         s = self.nodes.delete().where(self.nodes.c.node == s)
283         print '#', s
284         self.conn.execute(s).close()
285         return serials
286     
287     def node_purge(self, node, before=inf, cluster=0):
288         """Delete all versions with the specified
289            node and cluster, and return
290            the serials of versions deleted.
291            Clears out the node if it has no remaining versions.
292         """
293         
294         execute = self.execute
295         q = ("select count(serial), sum(size) from versions "
296              "where node = ? "
297              "and cluster = ? "
298              "and mtime <= ?")
299         args = (node, cluster, before)
300         execute(q, args)
301         nr, size = self.fetchone()
302         if not nr:
303             return ()
304         mtime = time()
305         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
306         
307         q = ("select serial from versions "
308              "where node = ? "
309              "and cluster = ? "
310              "and mtime <= ?")
311         execute(q, args)
312         serials = [r[SERIAL] for r in self.fetchall()]
313         q = ("delete from versions "
314              "where node = ? "
315              "and cluster = ? "
316              "and mtime <= ?")
317         execute(q, args)
318         q = ("delete from nodes "
319              "where node in (select node from nodes n "
320                             "where (select count(serial) "
321                                    "from versions "
322                                    "where node = n.node) = 0 "
323                             "and node = ?)")
324         execute(q, (node,))
325         return serials
326     
327     def node_remove(self, node):
328         """Remove the node specified.
329            Return false if the node has children or is not found.
330         """
331         
332         if self.node_count_children(node):
333             return False
334         
335         mtime = time()
336         q = ("select count(serial), sum(size), cluster "
337              "from versions "
338              "where node = ? "
339              "group by cluster")
340         self.execute(q, (node,))
341         for population, size, cluster in self.fetchall():
342             self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
343         
344         q = "delete from nodes where node = ?"
345         self.execute(q, (node,))
346         return True
347     
348     def statistics_get(self, node, cluster=0):
349         """Return population, total size and last mtime
350            for all versions under node that belong to the cluster.
351         """
352         
353         q = ("select population, size, mtime from statistics "
354              "where node = ? and cluster = ?")
355         self.execute(q, (node, cluster))
356         return self.fetchone()
357     
358     def statistics_update(self, node, population, size, mtime, cluster=0):
359         """Update the statistics of the given node.
360            Statistics keep track the population, total
361            size of objects and mtime in the node's namespace.
362            May be zero or positive or negative numbers.
363         """
364         
365         s = select([self.statistics.c.population, self.statistics.c.size],
366             and_(self.statistics.c.node == node,
367                  self.statistics.c.cluster == cluster))
368         res = self.conn.execute(s)
369         r = res.fetchone()
370         res.close()
371         if not r:
372             prepopulation, presize = (0, 0)
373         else:
374             prepopulation, presize = r
375         population += prepopulation
376         size += presize
377         
378         self.statistics.insert().values(node=node, population=population,
379                                         size=size, mtime=mtime, cluster=cluster)
380         self.conn.execute(s).close()
381     
382     def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
383         """Update the statistics of the given node's parent.
384            Then recursively update all parents up to the root.
385            Population is not recursive.
386         """
387         
388         while True:
389             if node == ROOTNODE:
390                 break
391             props = self.node_get_properties(node)
392             if props is None:
393                 break
394             parent, path = props
395             self.statistics_update(parent, population, size, mtime, cluster)
396             node = parent
397             population = 0 # Population isn't recursive
398     
399     def statistics_latest(self, node, before=inf, except_cluster=0):
400         """Return population, total size and last mtime
401            for all latest versions under node that
402            do not belong to the cluster.
403         """
404         
405         execute = self.execute
406         fetchone = self.fetchone
407         
408         # The node.
409         props = self.node_get_properties(node)
410         if props is None:
411             return None
412         parent, path = props
413         
414         # The latest version.
415         q = ("select serial, node, size, source, mtime, muser, cluster "
416              "from versions "
417              "where serial = (select max(serial) "
418                              "from versions "
419                              "where node = ? and mtime < ?) "
420              "and cluster != ?")
421         execute(q, (node, before, except_cluster))
422         props = fetchone()
423         if props is None:
424             return None
425         mtime = props[MTIME]
426         
427         # First level, just under node (get population).
428         q = ("select count(serial), sum(size), max(mtime) "
429              "from versions v "
430              "where serial = (select max(serial) "
431                              "from versions "
432                              "where node = v.node and mtime < ?) "
433              "and cluster != ? "
434              "and node in (select node "
435                           "from nodes "
436                           "where parent = ?)")
437         execute(q, (before, except_cluster, node))
438         r = fetchone()
439         if r is None:
440             return None
441         count = r[0]
442         mtime = max(mtime, r[2])
443         if count == 0:
444             return (0, 0, mtime)
445         
446         # All children (get size and mtime).
447         # XXX: This is why the full path is stored.
448         q = ("select count(serial), sum(size), max(mtime) "
449              "from versions v "
450              "where serial = (select max(serial) "
451                              "from versions "
452                              "where node = v.node and mtime < ?) "
453              "and cluster != ? "
454              "and node in (select node "
455                           "from nodes "
456                           "where path like ?)")
457         execute(q, (before, except_cluster, path + '%'))
458         r = fetchone()
459         if r is None:
460             return None
461         size = r[1] - props[SIZE]
462         mtime = max(mtime, r[2])
463         return (count, size, mtime)
464     
465     def version_create(self, node, size, source, muser, cluster=0):
466         """Create a new version from the given properties.
467            Return the (serial, mtime) of the new version.
468         """
469         
470         q = ("insert into versions (node, size, source, mtime, muser, cluster) "
471              "values (?, ?, ?, ?, ?, ?)")
472         mtime = time()
473         props = (node, size, source, mtime, muser, cluster)
474         serial = self.execute(q, props).lastrowid
475         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
476         return serial, mtime
477     
478     def version_lookup(self, node, before=inf, cluster=0):
479         """Lookup the current version of the given node.
480            Return a list with its properties:
481            (serial, node, size, source, mtime, muser, cluster)
482            or None if the current version is not found in the given cluster.
483         """
484         
485         q = ("select serial, node, size, source, mtime, muser, cluster "
486              "from versions "
487              "where serial = (select max(serial) "
488                              "from versions "
489                              "where node = ? and mtime < ?) "
490              "and cluster = ?")
491         self.execute(q, (node, before, cluster))
492         props = self.fetchone()
493         if props is not None:
494             return props
495         return None
496     
497     def version_get_properties(self, serial, keys=(), propnames=_propnames):
498         """Return a sequence of values for the properties of
499            the version specified by serial and the keys, in the order given.
500            If keys is empty, return all properties in the order
501            (serial, node, size, source, mtime, muser, cluster).
502         """
503         
504         q = ("select serial, node, size, source, mtime, muser, cluster "
505              "from versions "
506              "where serial = ?")
507         self.execute(q, (serial,))
508         r = self.fetchone()
509         if r is None:
510             return r
511         
512         if not keys:
513             return r
514         return [r[propnames[k]] for k in keys if k in propnames]
515     
516     def version_recluster(self, serial, cluster):
517         """Move the version into another cluster."""
518         
519         props = self.version_get_properties(serial)
520         if not props:
521             return
522         node = props[NODE]
523         size = props[SIZE]
524         oldcluster = props[CLUSTER]
525         if cluster == oldcluster:
526             return
527         
528         mtime = time()
529         self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
530         self.statistics_update_ancestors(node, 1, size, mtime, cluster)
531         
532         q = "update versions set cluster = ? where serial = ?"
533         self.execute(q, (cluster, serial))
534     
535     def version_remove(self, serial):
536         """Remove the serial specified."""
537         
538         props = self.node_get_properties(serial)
539         if not props:
540             return
541         node = props[NODE]
542         size = props[SIZE]
543         cluster = props[CLUSTER]
544         
545         mtime = time()
546         self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
547         
548         q = "delete from versions where serial = ?"
549         self.execute(q, (serial,))
550         return True
551     
552     def attribute_get(self, serial, keys=()):
553         """Return a list of (key, value) pairs of the version specified by serial.
554            If keys is empty, return all attributes.
555            Othwerise, return only those specified.
556         """
557         
558         execute = self.execute
559         if keys:
560             marks = ','.join('?' for k in keys)
561             q = ("select key, value from attributes "
562                  "where key in (%s) and serial = ?" % (marks,))
563             execute(q, keys + (serial,))
564         else:
565             q = "select key, value from attributes where serial = ?"
566             execute(q, (serial,))
567         return self.fetchall()
568     
569     def attribute_set(self, serial, items):
570         """Set the attributes of the version specified by serial.
571            Receive attributes as an iterable of (key, value) pairs.
572         """
573         
574         q = ("insert or replace into attributes (serial, key, value) "
575              "values (?, ?, ?)")
576         self.executemany(q, ((serial, k, v) for k, v in items))
577     
578     def attribute_del(self, serial, keys=()):
579         """Delete attributes of the version specified by serial.
580            If keys is empty, delete all attributes.
581            Otherwise delete those specified.
582         """
583         
584         if keys:
585             q = "delete from attributes where serial = ? and key = ?"
586             self.executemany(q, ((serial, key) for key in keys))
587         else:
588             q = "delete from attributes where serial = ?"
589             self.execute(q, (serial,))
590     
591     def attribute_copy(self, source, dest):
592         q = ("insert or replace into attributes "
593              "select ?, key, value from attributes "
594              "where serial = ?")
595         self.execute(q, (dest, source))
596     
597     def _construct_filters(self, filterq):
598         if not filterq:
599             return None, None
600         
601         args = filterq.split(',')
602         subq = " and a.key in ("
603         subq += ','.join(('?' for x in args))
604         subq += ")"
605         
606         return subq, args
607     
608     def _construct_paths(self, pathq):
609         if not pathq:
610             return None, None
611         
612         subq = " and ("
613         subq += ' or '.join(('n.path like ?' for x in pathq))
614         subq += ")"
615         args = tuple([x + '%' for x in pathq])
616         
617         return subq, args
618     
619     def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
620         """Return a list with all keys pairs defined
621            for all latest versions under parent that
622            do not belong to the cluster.
623         """
624         
625         # TODO: Use another table to store before=inf results.
626         q = ("select distinct a.key "
627              "from attributes a, versions v, nodes n "
628              "where v.serial = (select max(serial) "
629                               "from versions "
630                               "where node = v.node and mtime < ?) "
631              "and v.cluster != ? "
632              "and v.node in (select node "
633                            "from nodes "
634                            "where parent = ?) "
635              "and a.serial = v.serial "
636              "and n.node = v.node")
637         args = (before, except_cluster, parent)
638         subq, subargs = self._construct_paths(pathq)
639         if subq is not None:
640             q += subq
641             args += subargs
642         self.execute(q, args)
643         return [r[0] for r in self.fetchall()]
644     
645     def latest_version_list(self, parent, prefix='', delimiter=None,
646                             start='', limit=10000, before=inf,
647                             except_cluster=0, pathq=[], filterq=None):
648         """Return a (list of (path, serial) tuples, list of common prefixes)
649            for the current versions of the paths with the given parent,
650            matching the following criteria.
651            
652            The property tuple for a version is returned if all
653            of these conditions are true:
654                 
655                 a. parent matches
656                 
657                 b. path > start
658                 
659                 c. path starts with prefix (and paths in pathq)
660                 
661                 d. version is the max up to before
662                 
663                 e. version is not in cluster
664                 
665                 f. the path does not have the delimiter occuring
666                    after the prefix, or ends with the delimiter
667                 
668                 g. serial matches the attribute filter query.
669                    
670                    A filter query is a comma-separated list of
671                    terms in one of these three forms:
672                    
673                    key
674                        an attribute with this key must exist
675                    
676                    !key
677                        an attribute with this key must not exist
678                    
679                    key ?op value
680                        the attribute with this key satisfies the value
681                        where ?op is one of ==, != <=, >=, <, >.
682            
683            The list of common prefixes includes the prefixes
684            matching up to the first delimiter after prefix,
685            and are reported only once, as "virtual directories".
686            The delimiter is included in the prefixes.
687            
688            If arguments are None, then the corresponding matching rule
689            will always match.
690            
691            Limit applies to the first list of tuples returned.
692         """
693         
694         execute = self.execute
695         
696         if not start or start < prefix:
697             start = strprevling(prefix)
698         nextling = strnextling(prefix)
699         
700         q = ("select distinct n.path, v.serial "
701              "from attributes a, versions v, nodes n "
702              "where v.serial = (select max(serial) "
703                               "from versions "
704                               "where node = v.node and mtime < ?) "
705              "and v.cluster != ? "
706              "and v.node in (select node "
707                            "from nodes "
708                            "where parent = ?) "
709              "and a.serial = v.serial "
710              "and n.node = v.node "
711              "and n.path > ? and n.path < ?")
712         args = [before, except_cluster, parent, start, nextling]
713         
714         subq, subargs = self._construct_paths(pathq)
715         if subq is not None:
716             q += subq
717             args += subargs
718         subq, subargs = self._construct_filters(filterq)
719         if subq is not None:
720             q += subq
721             args += subargs
722         else:
723             q = q.replace("attributes a, ", "")
724             q = q.replace("and a.serial = v.serial ", "")
725         q += " order by n.path"
726         
727         if not delimiter:
728             q += " limit ?"
729             args.append(limit)
730             execute(q, args)
731             return self.fetchall(), ()
732         
733         pfz = len(prefix)
734         dz = len(delimiter)
735         count = 0
736         fetchone = self.fetchone
737         prefixes = []
738         pappend = prefixes.append
739         matches = []
740         mappend = matches.append
741         
742         execute(q, args)
743         while True:
744             props = fetchone()
745             if props is None:
746                 break
747             path, serial = props
748             idx = path.find(delimiter, pfz)
749             
750             if idx < 0:
751                 mappend(props)
752                 count += 1
753                 if count >= limit:
754                     break
755                 continue
756             
757             pf = path[:idx + dz]
758             pappend(pf)
759             if idx + dz == len(path):
760                 mappend(props)
761                 count += 1
762             if count >= limit: 
763                 break
764             
765             args[3] = strnextling(pf) # New start.
766             execute(q, args)
767         
768         return matches, prefixes