Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / sqlite / node.py @ 3d13f97a

History | View | Annotate | Download (28.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35

    
36
from dbworker import DBWorker
37

    
38
from pithos.lib.filter import parse_filters
39

    
40

    
41
ROOTNODE  = 0
42

    
43
( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
44

    
45
inf = float('inf')
46

    
47

    
48
def strnextling(prefix):
49
    """Return the first unicode string
50
       greater than but not starting with given prefix.
51
       strnextling('hello') -> 'hellp'
52
    """
53
    if not prefix:
54
        ## all strings start with the null string,
55
        ## therefore we have to approximate strnextling('')
56
        ## with the last unicode character supported by python
57
        ## 0x10ffff for wide (32-bit unicode) python builds
58
        ## 0x00ffff for narrow (16-bit unicode) python builds
59
        ## We will not autodetect. 0xffff is safe enough.
60
        return unichr(0xffff)
61
    s = prefix[:-1]
62
    c = ord(prefix[-1])
63
    if c >= 0xffff:
64
        raise RuntimeError
65
    s += unichr(c+1)
66
    return s
67

    
68
def strprevling(prefix):
69
    """Return an approximation of the last unicode string
70
       less than but not starting with given prefix.
71
       strprevling(u'hello') -> u'helln\\xffff'
72
    """
73
    if not prefix:
74
        ## There is no prevling for the null string
75
        return prefix
76
    s = prefix[:-1]
77
    c = ord(prefix[-1])
78
    if c > 0:
79
        s += unichr(c-1) + unichr(0xffff)
80
    return s
81

    
82

    
83
_propnames = {
84
    'serial'    : 0,
85
    'node'      : 1,
86
    'hash'      : 2,
87
    'size'      : 3,
88
    'source'    : 4,
89
    'mtime'     : 5,
90
    'muser'     : 6,
91
    'cluster'   : 7,
92
}
93

    
94

    
95
class Node(DBWorker):
96
    """Nodes store path organization and have multiple versions.
97
       Versions store object history and have multiple attributes.
98
       Attributes store metadata.
99
    """
100
    
101
    # TODO: Provide an interface for included and excluded clusters.
102
    
103
    def __init__(self, **params):
104
        DBWorker.__init__(self, **params)
105
        execute = self.execute
106
        
107
        execute(""" pragma foreign_keys = on """)
108
        
109
        execute(""" create table if not exists nodes
110
                          ( node       integer primary key,
111
                            parent     integer default 0,
112
                            path       text    not null default '',
113
                            foreign key (parent)
114
                            references nodes(node)
115
                            on update cascade
116
                            on delete cascade ) """)
117
        execute(""" create unique index if not exists idx_nodes_path
118
                    on nodes(path) """)
119
        
120
        execute(""" create table if not exists policy
121
                          ( node   integer,
122
                            key    text,
123
                            value  text,
124
                            primary key (node, key)
125
                            foreign key (node)
126
                            references nodes(node)
127
                            on update cascade
128
                            on delete cascade ) """)
129
        
130
        execute(""" create table if not exists statistics
131
                          ( node       integer,
132
                            population integer not null default 0,
133
                            size       integer not null default 0,
134
                            mtime      integer,
135
                            cluster    integer not null default 0,
136
                            primary key (node, cluster)
137
                            foreign key (node)
138
                            references nodes(node)
139
                            on update cascade
140
                            on delete cascade ) """)
141
        
142
        execute(""" create table if not exists versions
143
                          ( serial     integer primary key,
144
                            node       integer,
145
                            hash       text,
146
                            size       integer not null default 0,
147
                            source     integer,
148
                            mtime      integer,
149
                            muser      text    not null default '',
150
                            cluster    integer not null default 0,
151
                            foreign key (node)
152
                            references nodes(node)
153
                            on update cascade
154
                            on delete cascade ) """)
155
        execute(""" create index if not exists idx_versions_node_mtime
156
                    on versions(node, mtime) """)
157
        
158
        execute(""" create table if not exists attributes
159
                          ( serial integer,
160
                            key    text,
161
                            value  text,
162
                            primary key (serial, key)
163
                            foreign key (serial)
164
                            references versions(serial)
165
                            on update cascade
166
                            on delete cascade ) """)
167
        
168
        q = "insert or ignore into nodes(node, parent) values (?, ?)"
169
        execute(q, (ROOTNODE, ROOTNODE))
170
    
171
    def node_create(self, parent, path):
172
        """Create a new node from the given properties.
173
           Return the node identifier of the new node.
174
        """
175
        
176
        q = ("insert into nodes (parent, path) "
177
             "values (?, ?)")
178
        props = (parent, path)
179
        return self.execute(q, props).lastrowid
180
    
181
    def node_lookup(self, path):
182
        """Lookup the current node of the given path.
183
           Return None if the path is not found.
184
        """
185
        
186
        q = "select node from nodes where path = ?"
187
        self.execute(q, (path,))
188
        r = self.fetchone()
189
        if r is not None:
190
            return r[0]
191
        return None
192
    
193
    def node_get_properties(self, node):
194
        """Return the node's (parent, path).
195
           Return None if the node is not found.
196
        """
197
        
198
        q = "select parent, path from nodes where node = ?"
199
        self.execute(q, (node,))
200
        return self.fetchone()
201
    
202
    def node_get_versions(self, node, keys=(), propnames=_propnames):
203
        """Return the properties of all versions at node.
204
           If keys is empty, return all properties in the order
205
           (serial, node, size, source, mtime, muser, cluster).
206
        """
207
        
208
        q = ("select serial, node, hash, size, source, mtime, muser, cluster "
209
             "from versions "
210
             "where node = ?")
211
        self.execute(q, (node,))
212
        r = self.fetchall()
213
        if r is None:
214
            return r
215
        
216
        if not keys:
217
            return r
218
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
219
    
220
    def node_count_children(self, node):
221
        """Return node's child count."""
222
        
223
        q = "select count(node) from nodes where parent = ? and node != 0"
224
        self.execute(q, (node,))
225
        r = self.fetchone()
226
        if r is None:
227
            return 0
228
        return r[0]
229
    
230
    def node_purge_children(self, parent, before=inf, cluster=0):
231
        """Delete all versions with the specified
232
           parent and cluster, and return
233
           the hashes of versions deleted.
234
           Clears out nodes with no remaining versions.
235
        """
236
        
237
        execute = self.execute
238
        q = ("select count(serial), sum(size) from versions "
239
             "where node in (select node "
240
                            "from nodes "
241
                            "where parent = ?) "
242
             "and cluster = ? "
243
             "and mtime <= ?")
244
        args = (parent, cluster, before)
245
        execute(q, args)
246
        nr, size = self.fetchone()
247
        if not nr:
248
            return ()
249
        mtime = time()
250
        self.statistics_update(parent, -nr, -size, mtime, cluster)
251
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
252
        
253
        q = ("select hash from versions "
254
             "where node in (select node "
255
                            "from nodes "
256
                            "where parent = ?) "
257
             "and cluster = ? "
258
             "and mtime <= ?")
259
        execute(q, args)
260
        hashes = [r[0] for r in self.fetchall()]
261
        q = ("delete from versions "
262
             "where node in (select node "
263
                            "from nodes "
264
                            "where parent = ?) "
265
             "and cluster = ? "
266
             "and mtime <= ?")
267
        execute(q, args)
268
        q = ("delete from nodes "
269
             "where node in (select node from nodes n "
270
                            "where (select count(serial) "
271
                                   "from versions "
272
                                   "where node = n.node) = 0 "
273
                            "and parent = ?)")
274
        execute(q, (parent,))
275
        return hashes
276
    
277
    def node_purge(self, node, before=inf, cluster=0):
278
        """Delete all versions with the specified
279
           node and cluster, and return
280
           the hashes of versions deleted.
281
           Clears out the node if it has no remaining versions.
282
        """
283
        
284
        execute = self.execute
285
        q = ("select count(serial), sum(size) from versions "
286
             "where node = ? "
287
             "and cluster = ? "
288
             "and mtime <= ?")
289
        args = (node, cluster, before)
290
        execute(q, args)
291
        nr, size = self.fetchone()
292
        if not nr:
293
            return ()
294
        mtime = time()
295
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
296
        
297
        q = ("select hash from versions "
298
             "where node = ? "
299
             "and cluster = ? "
300
             "and mtime <= ?")
301
        execute(q, args)
302
        hashes = [r[0] for r in self.fetchall()]
303
        q = ("delete from versions "
304
             "where node = ? "
305
             "and cluster = ? "
306
             "and mtime <= ?")
307
        execute(q, args)
308
        q = ("delete from nodes "
309
             "where node in (select node from nodes n "
310
                            "where (select count(serial) "
311
                                   "from versions "
312
                                   "where node = n.node) = 0 "
313
                            "and node = ?)")
314
        execute(q, (node,))
315
        return hashes
316
    
317
    def node_remove(self, node):
318
        """Remove the node specified.
319
           Return false if the node has children or is not found.
320
        """
321
        
322
        if self.node_count_children(node):
323
            return False
324
        
325
        mtime = time()
326
        q = ("select count(serial), sum(size), cluster "
327
             "from versions "
328
             "where node = ? "
329
             "group by cluster")
330
        self.execute(q, (node,))
331
        for population, size, cluster in self.fetchall():
332
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
333
        
334
        q = "delete from nodes where node = ?"
335
        self.execute(q, (node,))
336
        return True
337
    
338
    def policy_get(self, node):
339
        q = "select key, value from policy where node = ?"
340
        self.execute(q, (node,))
341
        return dict(self.fetchall())
342
    
343
    def policy_set(self, node, policy):
344
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
345
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
346
    
347
    def statistics_get(self, node, cluster=0):
348
        """Return population, total size and last mtime
349
           for all versions under node that belong to the cluster.
350
        """
351
        
352
        q = ("select population, size, mtime from statistics "
353
             "where node = ? and cluster = ?")
354
        self.execute(q, (node, cluster))
355
        return self.fetchone()
356
    
357
    def statistics_update(self, node, population, size, mtime, cluster=0):
358
        """Update the statistics of the given node.
359
           Statistics keep track the population, total
360
           size of objects and mtime in the node's namespace.
361
           May be zero or positive or negative numbers.
362
        """
363
        
364
        qs = ("select population, size from statistics "
365
              "where node = ? and cluster = ?")
366
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
367
              "values (?, ?, ?, ?, ?)")
368
        self.execute(qs, (node, cluster))
369
        r = self.fetchone()
370
        if r is None:
371
            prepopulation, presize = (0, 0)
372
        else:
373
            prepopulation, presize = r
374
        population += prepopulation
375
        size += presize
376
        self.execute(qu, (node, population, size, mtime, cluster))
377
    
378
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
379
        """Update the statistics of the given node's parent.
380
           Then recursively update all parents up to the root.
381
           Population is not recursive.
382
        """
383
        
384
        while True:
385
            if node == 0:
386
                break
387
            props = self.node_get_properties(node)
388
            if props is None:
389
                break
390
            parent, path = props
391
            self.statistics_update(parent, population, size, mtime, cluster)
392
            node = parent
393
            population = 0 # Population isn't recursive
394
    
395
    def statistics_latest(self, node, before=inf, except_cluster=0):
396
        """Return population, total size and last mtime
397
           for all latest versions under node that
398
           do not belong to the cluster.
399
        """
400
        
401
        execute = self.execute
402
        fetchone = self.fetchone
403
        
404
        # The node.
405
        props = self.node_get_properties(node)
406
        if props is None:
407
            return None
408
        parent, path = props
409
        
410
        # The latest version.
411
        q = ("select serial, node, hash, size, source, mtime, muser, cluster "
412
             "from versions "
413
             "where serial = (select max(serial) "
414
                             "from versions "
415
                             "where node = ? and mtime < ?) "
416
             "and cluster != ?")
417
        execute(q, (node, before, except_cluster))
418
        props = fetchone()
419
        if props is None:
420
            return None
421
        mtime = props[MTIME]
422
        
423
        # First level, just under node (get population).
424
        q = ("select count(serial), sum(size), max(mtime) "
425
             "from versions v "
426
             "where serial = (select max(serial) "
427
                             "from versions "
428
                             "where node = v.node and mtime < ?) "
429
             "and cluster != ? "
430
             "and node in (select node "
431
                          "from nodes "
432
                          "where parent = ?)")
433
        execute(q, (before, except_cluster, node))
434
        r = fetchone()
435
        if r is None:
436
            return None
437
        count = r[0]
438
        mtime = max(mtime, r[2])
439
        if count == 0:
440
            return (0, 0, mtime)
441
        
442
        # All children (get size and mtime).
443
        # XXX: This is why the full path is stored.
444
        q = ("select count(serial), sum(size), max(mtime) "
445
             "from versions v "
446
             "where serial = (select max(serial) "
447
                             "from versions "
448
                             "where node = v.node and mtime < ?) "
449
             "and cluster != ? "
450
             "and node in (select node "
451
                          "from nodes "
452
                          "where path like ? escape '\\')")
453
        execute(q, (before, except_cluster, self.escape_like(path) + '%'))
454
        r = fetchone()
455
        if r is None:
456
            return None
457
        size = r[1] - props[SIZE]
458
        mtime = max(mtime, r[2])
459
        return (count, size, mtime)
460
    
461
    def version_create(self, node, hash, size, source, muser, cluster=0):
462
        """Create a new version from the given properties.
463
           Return the (serial, mtime) of the new version.
464
        """
465
        
466
        q = ("insert into versions (node, hash, size, source, mtime, muser, cluster) "
467
             "values (?, ?, ?, ?, ?, ?, ?)")
468
        mtime = time()
469
        props = (node, hash, size, source, mtime, muser, cluster)
470
        serial = self.execute(q, props).lastrowid
471
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
472
        return serial, mtime
473
    
474
    def version_lookup(self, node, before=inf, cluster=0):
475
        """Lookup the current version of the given node.
476
           Return a list with its properties:
477
           (serial, node, hash, size, source, mtime, muser, cluster)
478
           or None if the current version is not found in the given cluster.
479
        """
480
        
481
        q = ("select serial, node, hash, size, source, mtime, muser, cluster "
482
             "from versions "
483
             "where serial = (select max(serial) "
484
                             "from versions "
485
                             "where node = ? and mtime < ?) "
486
             "and cluster = ?")
487
        self.execute(q, (node, before, cluster))
488
        props = self.fetchone()
489
        if props is not None:
490
            return props
491
        return None
492
    
493
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
494
        """Return a sequence of values for the properties of
495
           the version specified by serial and the keys, in the order given.
496
           If keys is empty, return all properties in the order
497
           (serial, node, hash, size, source, mtime, muser, cluster).
498
        """
499
        
500
        q = ("select serial, node, hash, size, source, mtime, muser, cluster "
501
             "from versions "
502
             "where serial = ?")
503
        self.execute(q, (serial,))
504
        r = self.fetchone()
505
        if r is None:
506
            return r
507
        
508
        if not keys:
509
            return r
510
        return [r[propnames[k]] for k in keys if k in propnames]
511
    
512
    def version_recluster(self, serial, cluster):
513
        """Move the version into another cluster."""
514
        
515
        props = self.version_get_properties(serial)
516
        if not props:
517
            return
518
        node = props[NODE]
519
        size = props[SIZE]
520
        oldcluster = props[CLUSTER]
521
        if cluster == oldcluster:
522
            return
523
        
524
        mtime = time()
525
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
526
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
527
        
528
        q = "update versions set cluster = ? where serial = ?"
529
        self.execute(q, (cluster, serial))
530
    
531
    def version_remove(self, serial):
532
        """Remove the serial specified."""
533
        
534
        props = self.version_get_properties(serial)
535
        if not props:
536
            return
537
        node = props[NODE]
538
        hash = props[HASH]
539
        size = props[SIZE]
540
        cluster = props[CLUSTER]
541
        
542
        mtime = time()
543
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
544
        
545
        q = "delete from versions where serial = ?"
546
        self.execute(q, (serial,))
547
        return hash
548
    
549
    def attribute_get(self, serial, keys=()):
550
        """Return a list of (key, value) pairs of the version specified by serial.
551
           If keys is empty, return all attributes.
552
           Othwerise, return only those specified.
553
        """
554
        
555
        execute = self.execute
556
        if keys:
557
            marks = ','.join('?' for k in keys)
558
            q = ("select key, value from attributes "
559
                 "where key in (%s) and serial = ?" % (marks,))
560
            execute(q, keys + (serial,))
561
        else:
562
            q = "select key, value from attributes where serial = ?"
563
            execute(q, (serial,))
564
        return self.fetchall()
565
    
566
    def attribute_set(self, serial, items):
567
        """Set the attributes of the version specified by serial.
568
           Receive attributes as an iterable of (key, value) pairs.
569
        """
570
        
571
        q = ("insert or replace into attributes (serial, key, value) "
572
             "values (?, ?, ?)")
573
        self.executemany(q, ((serial, k, v) for k, v in items))
574
    
575
    def attribute_del(self, serial, keys=()):
576
        """Delete attributes of the version specified by serial.
577
           If keys is empty, delete all attributes.
578
           Otherwise delete those specified.
579
        """
580
        
581
        if keys:
582
            q = "delete from attributes where serial = ? and key = ?"
583
            self.executemany(q, ((serial, key) for key in keys))
584
        else:
585
            q = "delete from attributes where serial = ?"
586
            self.execute(q, (serial,))
587
    
588
    def attribute_copy(self, source, dest):
589
        q = ("insert or replace into attributes "
590
             "select ?, key, value from attributes "
591
             "where serial = ?")
592
        self.execute(q, (dest, source))
593
    
594
    def _construct_filters(self, filterq):
595
        if not filterq:
596
            return None, None
597
        
598
        subqlist = []
599
        append = subqlist.append
600
        included, excluded, opers = parse_filters(filterq)
601
        args = []
602
        
603
        if included:
604
            subq = "a.key in ("
605
            subq += ','.join(('?' for x in included)) + ")"
606
            args += included
607
            append(subq)
608
        
609
        if excluded:
610
            subq = "a.key not in ("
611
            subq += ','.join(('?' for x in excluded)) + ")"
612
            args += excluded
613
            append(subq)
614
        
615
        if opers:
616
            t = (("(a.key = ? and a.value %s ?)" % (o,)) for k, o, v in opers)
617
            subq = "(" + ' or '.join(t) + ")"
618
            for k, o, v in opers:
619
                args += (k, v)
620
            append(subq)
621
        
622
        if not subqlist:
623
            return None, None
624
        
625
        subq = ' and ' + ' and '.join(subqlist)
626
        return subq, args
627
    
628
    def _construct_paths(self, pathq):
629
        if not pathq:
630
            return None, None
631
        
632
        subq = " and ("
633
        subq += ' or '.join(("n.path like ? escape '\\'" for x in pathq))
634
        subq += ")"
635
        args = tuple([self.escape_like(x) + '%' for x in pathq])
636
        
637
        return subq, args
638
    
639
    def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
640
        """Return a list with all keys pairs defined
641
           for all latest versions under parent that
642
           do not belong to the cluster.
643
        """
644
        
645
        # TODO: Use another table to store before=inf results.
646
        q = ("select distinct a.key "
647
             "from attributes a, versions v, nodes n "
648
             "where v.serial = (select max(serial) "
649
                              "from versions "
650
                              "where node = v.node and mtime < ?) "
651
             "and v.cluster != ? "
652
             "and v.node in (select node "
653
                           "from nodes "
654
                           "where parent = ?) "
655
             "and a.serial = v.serial "
656
             "and n.node = v.node")
657
        args = (before, except_cluster, parent)
658
        subq, subargs = self._construct_paths(pathq)
659
        if subq is not None:
660
            q += subq
661
            args += subargs
662
        self.execute(q, args)
663
        return [r[0] for r in self.fetchall()]
664
    
665
    def latest_version_list(self, parent, prefix='', delimiter=None,
666
                            start='', limit=10000, before=inf,
667
                            except_cluster=0, pathq=[], filterq=[]):
668
        """Return a (list of (path, serial) tuples, list of common prefixes)
669
           for the current versions of the paths with the given parent,
670
           matching the following criteria.
671
           
672
           The property tuple for a version is returned if all
673
           of these conditions are true:
674
                
675
                a. parent matches
676
                
677
                b. path > start
678
                
679
                c. path starts with prefix (and paths in pathq)
680
                
681
                d. version is the max up to before
682
                
683
                e. version is not in cluster
684
                
685
                f. the path does not have the delimiter occuring
686
                   after the prefix, or ends with the delimiter
687
                
688
                g. serial matches the attribute filter query.
689
                   
690
                   A filter query is a comma-separated list of
691
                   terms in one of these three forms:
692
                   
693
                   key
694
                       an attribute with this key must exist
695
                   
696
                   !key
697
                       an attribute with this key must not exist
698
                   
699
                   key ?op value
700
                       the attribute with this key satisfies the value
701
                       where ?op is one of =, != <=, >=, <, >.
702
           
703
           The list of common prefixes includes the prefixes
704
           matching up to the first delimiter after prefix,
705
           and are reported only once, as "virtual directories".
706
           The delimiter is included in the prefixes.
707
           
708
           If arguments are None, then the corresponding matching rule
709
           will always match.
710
           
711
           Limit applies to the first list of tuples returned.
712
        """
713
        
714
        execute = self.execute
715
        
716
        if not start or start < prefix:
717
            start = strprevling(prefix)
718
        nextling = strnextling(prefix)
719
        
720
        q = ("select distinct n.path, v.serial "
721
             "from attributes a, versions v, nodes n "
722
             "where v.serial = (select max(serial) "
723
                              "from versions "
724
                              "where node = v.node and mtime < ?) "
725
             "and v.cluster != ? "
726
             "and v.node in (select node "
727
                           "from nodes "
728
                           "where parent = ?) "
729
             "and a.serial = v.serial "
730
             "and n.node = v.node "
731
             "and n.path > ? and n.path < ?")
732
        args = [before, except_cluster, parent, start, nextling]
733
        
734
        subq, subargs = self._construct_paths(pathq)
735
        if subq is not None:
736
            q += subq
737
            args += subargs
738
        subq, subargs = self._construct_filters(filterq)
739
        if subq is not None:
740
            q += subq
741
            args += subargs
742
        else:
743
            q = q.replace("attributes a, ", "")
744
            q = q.replace("and a.serial = v.serial ", "")
745
        q += " order by n.path"
746
        
747
        if not delimiter:
748
            q += " limit ?"
749
            args.append(limit)
750
            execute(q, args)
751
            return self.fetchall(), ()
752
        
753
        pfz = len(prefix)
754
        dz = len(delimiter)
755
        count = 0
756
        fetchone = self.fetchone
757
        prefixes = []
758
        pappend = prefixes.append
759
        matches = []
760
        mappend = matches.append
761
        
762
        execute(q, args)
763
        while True:
764
            props = fetchone()
765
            if props is None:
766
                break
767
            path, serial = props
768
            idx = path.find(delimiter, pfz)
769
            
770
            if idx < 0:
771
                mappend(props)
772
                count += 1
773
                if count >= limit:
774
                    break
775
                continue
776
            
777
            if idx + dz == len(path):
778
                mappend(props)
779
                count += 1
780
                continue # Get one more, in case there is a path.
781
            pf = path[:idx + dz]
782
            pappend(pf)
783
            if count >= limit: 
784
                break
785
            
786
            args[3] = strnextling(pf) # New start.
787
            execute(q, args)
788
        
789
        return matches, prefixes