Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ 9ac201e2

History | View | Annotate | Download (33 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35

    
36
from dbworker import DBWorker
37

    
38
from pithos.backends.filter import parse_filters
39

    
40

    
41
ROOTNODE  = 0
42

    
43
( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
44

    
45
( MATCH_PREFIX, MATCH_EXACT ) = range(2)
46

    
47
inf = float('inf')
48

    
49

    
50
def strnextling(prefix):
51
    """Return the first unicode string
52
       greater than but not starting with given prefix.
53
       strnextling('hello') -> 'hellp'
54
    """
55
    if not prefix:
56
        ## all strings start with the null string,
57
        ## therefore we have to approximate strnextling('')
58
        ## with the last unicode character supported by python
59
        ## 0x10ffff for wide (32-bit unicode) python builds
60
        ## 0x00ffff for narrow (16-bit unicode) python builds
61
        ## We will not autodetect. 0xffff is safe enough.
62
        return unichr(0xffff)
63
    s = prefix[:-1]
64
    c = ord(prefix[-1])
65
    if c >= 0xffff:
66
        raise RuntimeError
67
    s += unichr(c+1)
68
    return s
69

    
70
def strprevling(prefix):
71
    """Return an approximation of the last unicode string
72
       less than but not starting with given prefix.
73
       strprevling(u'hello') -> u'helln\\xffff'
74
    """
75
    if not prefix:
76
        ## There is no prevling for the null string
77
        return prefix
78
    s = prefix[:-1]
79
    c = ord(prefix[-1])
80
    if c > 0:
81
        s += unichr(c-1) + unichr(0xffff)
82
    return s
83

    
84

    
85
_propnames = {
86
    'serial'    : 0,
87
    'node'      : 1,
88
    'hash'      : 2,
89
    'size'      : 3,
90
    'type'      : 4,
91
    'source'    : 5,
92
    'mtime'     : 6,
93
    'muser'     : 7,
94
    'uuid'      : 8,
95
    'checksum'  : 9,
96
    'cluster'   : 10
97
}
98

    
99

    
100
class Node(DBWorker):
101
    """Nodes store path organization and have multiple versions.
102
       Versions store object history and have multiple attributes.
103
       Attributes store metadata.
104
    """
105
    
106
    # TODO: Provide an interface for included and excluded clusters.
107
    
108
    def __init__(self, **params):
109
        DBWorker.__init__(self, **params)
110
        execute = self.execute
111
        
112
        execute(""" pragma foreign_keys = on """)
113
        
114
        execute(""" create table if not exists nodes
115
                          ( node       integer primary key,
116
                            parent     integer default 0,
117
                            path       text    not null default '',
118
                            foreign key (parent)
119
                            references nodes(node)
120
                            on update cascade
121
                            on delete cascade ) """)
122
        execute(""" create unique index if not exists idx_nodes_path
123
                    on nodes(path) """)
124
        
125
        execute(""" create table if not exists policy
126
                          ( node   integer,
127
                            key    text,
128
                            value  text,
129
                            primary key (node, key)
130
                            foreign key (node)
131
                            references nodes(node)
132
                            on update cascade
133
                            on delete cascade ) """)
134
        
135
        execute(""" create table if not exists statistics
136
                          ( node       integer,
137
                            population integer not null default 0,
138
                            size       integer not null default 0,
139
                            mtime      integer,
140
                            cluster    integer not null default 0,
141
                            primary key (node, cluster)
142
                            foreign key (node)
143
                            references nodes(node)
144
                            on update cascade
145
                            on delete cascade ) """)
146
        
147
        execute(""" create table if not exists versions
148
                          ( serial     integer primary key,
149
                            node       integer,
150
                            hash       text,
151
                            size       integer not null default 0,
152
                            type       text    not null default '',
153
                            source     integer,
154
                            mtime      integer,
155
                            muser      text    not null default '',
156
                            uuid       text    not null default '',
157
                            checksum   text    not null default '',
158
                            cluster    integer not null default 0,
159
                            foreign key (node)
160
                            references nodes(node)
161
                            on update cascade
162
                            on delete cascade ) """)
163
        execute(""" create index if not exists idx_versions_node_mtime
164
                    on versions(node, mtime) """)
165
        execute(""" create index if not exists idx_versions_node_uuid
166
                    on versions(uuid) """)
167
        
168
        execute(""" create table if not exists attributes
169
                          ( serial integer,
170
                            domain text,
171
                            key    text,
172
                            value  text,
173
                            primary key (serial, domain, key)
174
                            foreign key (serial)
175
                            references versions(serial)
176
                            on update cascade
177
                            on delete cascade ) """)
178
        
179
        q = "insert or ignore into nodes(node, parent) values (?, ?)"
180
        execute(q, (ROOTNODE, ROOTNODE))
181
    
182
    def node_create(self, parent, path):
183
        """Create a new node from the given properties.
184
           Return the node identifier of the new node.
185
        """
186
        
187
        q = ("insert into nodes (parent, path) "
188
             "values (?, ?)")
189
        props = (parent, path)
190
        return self.execute(q, props).lastrowid
191
    
192
    def node_lookup(self, path):
193
        """Lookup the current node of the given path.
194
           Return None if the path is not found.
195
        """
196
        
197
        q = "select node from nodes where path = ?"
198
        self.execute(q, (path,))
199
        r = self.fetchone()
200
        if r is not None:
201
            return r[0]
202
        return None
203
    
204
    def node_lookup_bulk(self, paths):
205
            """Lookup the current nodes for the given paths.
206
           Return () if the path is not found.
207
        """
208
        
209
        placeholders = ','.join('?' for path in paths)
210
        q = "select node from nodes where path in (%s)" % placeholders
211
        self.execute(q, paths)
212
        r = self.fetchall()
213
        if r is not None:
214
                return [row[0] for row in r]
215
        return None
216
    
217
    def node_get_properties(self, node):
218
        """Return the node's (parent, path).
219
           Return None if the node is not found.
220
        """
221
        
222
        q = "select parent, path from nodes where node = ?"
223
        self.execute(q, (node,))
224
        return self.fetchone()
225
    
226
    def node_get_versions(self, node, keys=(), propnames=_propnames):
227
        """Return the properties of all versions at node.
228
           If keys is empty, return all properties in the order
229
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
230
        """
231
        
232
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
233
             "from versions "
234
             "where node = ?")
235
        self.execute(q, (node,))
236
        r = self.fetchall()
237
        if r is None:
238
            return r
239
        
240
        if not keys:
241
            return r
242
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
243
    
244
    def node_count_children(self, node):
245
        """Return node's child count."""
246
        
247
        q = "select count(node) from nodes where parent = ? and node != 0"
248
        self.execute(q, (node,))
249
        r = self.fetchone()
250
        if r is None:
251
            return 0
252
        return r[0]
253
    
254
    def node_purge_children(self, parent, before=inf, cluster=0):
255
        """Delete all versions with the specified
256
           parent and cluster, and return
257
           the hashes and size of versions deleted.
258
           Clears out nodes with no remaining versions.
259
        """
260
        
261
        execute = self.execute
262
        q = ("select count(serial), sum(size) from versions "
263
             "where node in (select node "
264
                            "from nodes "
265
                            "where parent = ?) "
266
             "and cluster = ? "
267
             "and mtime <= ?")
268
        args = (parent, cluster, before)
269
        execute(q, args)
270
        nr, size = self.fetchone()
271
        if not nr:
272
            return (), 0
273
        mtime = time()
274
        self.statistics_update(parent, -nr, -size, mtime, cluster)
275
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
276
        
277
        q = ("select hash from versions "
278
             "where node in (select node "
279
                            "from nodes "
280
                            "where parent = ?) "
281
             "and cluster = ? "
282
             "and mtime <= ?")
283
        execute(q, args)
284
        hashes = [r[0] for r in self.fetchall()]
285
        q = ("delete from versions "
286
             "where node in (select node "
287
                            "from nodes "
288
                            "where parent = ?) "
289
             "and cluster = ? "
290
             "and mtime <= ?")
291
        execute(q, args)
292
        q = ("delete from nodes "
293
             "where node in (select node from nodes n "
294
                            "where (select count(serial) "
295
                                   "from versions "
296
                                   "where node = n.node) = 0 "
297
                            "and parent = ?)")
298
        execute(q, (parent,))
299
        return hashes, size
300
    
301
    def node_purge(self, node, before=inf, cluster=0):
302
        """Delete all versions with the specified
303
           node and cluster, and return
304
           the hashes and size of versions deleted.
305
           Clears out the node if it has no remaining versions.
306
        """
307
        
308
        execute = self.execute
309
        q = ("select count(serial), sum(size) from versions "
310
             "where node = ? "
311
             "and cluster = ? "
312
             "and mtime <= ?")
313
        args = (node, cluster, before)
314
        execute(q, args)
315
        nr, size = self.fetchone()
316
        if not nr:
317
            return (), 0
318
        mtime = time()
319
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
320
        
321
        q = ("select hash from versions "
322
             "where node = ? "
323
             "and cluster = ? "
324
             "and mtime <= ?")
325
        execute(q, args)
326
        hashes = [r[0] for r in self.fetchall()]
327
        q = ("delete from versions "
328
             "where node = ? "
329
             "and cluster = ? "
330
             "and mtime <= ?")
331
        execute(q, args)
332
        q = ("delete from nodes "
333
             "where node in (select node from nodes n "
334
                            "where (select count(serial) "
335
                                   "from versions "
336
                                   "where node = n.node) = 0 "
337
                            "and node = ?)")
338
        execute(q, (node,))
339
        return hashes, size
340
    
341
    def node_remove(self, node):
342
        """Remove the node specified.
343
           Return false if the node has children or is not found.
344
        """
345
        
346
        if self.node_count_children(node):
347
            return False
348
        
349
        mtime = time()
350
        q = ("select count(serial), sum(size), cluster "
351
             "from versions "
352
             "where node = ? "
353
             "group by cluster")
354
        self.execute(q, (node,))
355
        for population, size, cluster in self.fetchall():
356
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
357
        
358
        q = "delete from nodes where node = ?"
359
        self.execute(q, (node,))
360
        return True
361
    
362
    def policy_get(self, node):
363
        q = "select key, value from policy where node = ?"
364
        self.execute(q, (node,))
365
        return dict(self.fetchall())
366
    
367
    def policy_set(self, node, policy):
368
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
369
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
370
    
371
    def statistics_get(self, node, cluster=0):
372
        """Return population, total size and last mtime
373
           for all versions under node that belong to the cluster.
374
        """
375
        
376
        q = ("select population, size, mtime from statistics "
377
             "where node = ? and cluster = ?")
378
        self.execute(q, (node, cluster))
379
        return self.fetchone()
380
    
381
    def statistics_update(self, node, population, size, mtime, cluster=0):
382
        """Update the statistics of the given node.
383
           Statistics keep track the population, total
384
           size of objects and mtime in the node's namespace.
385
           May be zero or positive or negative numbers.
386
        """
387
        
388
        qs = ("select population, size from statistics "
389
              "where node = ? and cluster = ?")
390
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
391
              "values (?, ?, ?, ?, ?)")
392
        self.execute(qs, (node, cluster))
393
        r = self.fetchone()
394
        if r is None:
395
            prepopulation, presize = (0, 0)
396
        else:
397
            prepopulation, presize = r
398
        population += prepopulation
399
        size += presize
400
        self.execute(qu, (node, population, size, mtime, cluster))
401
    
402
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
403
        """Update the statistics of the given node's parent.
404
           Then recursively update all parents up to the root.
405
           Population is not recursive.
406
        """
407
        
408
        while True:
409
            if node == 0:
410
                break
411
            props = self.node_get_properties(node)
412
            if props is None:
413
                break
414
            parent, path = props
415
            self.statistics_update(parent, population, size, mtime, cluster)
416
            node = parent
417
            population = 0 # Population isn't recursive
418
    
419
    def statistics_latest(self, node, before=inf, except_cluster=0):
420
        """Return population, total size and last mtime
421
           for all latest versions under node that
422
           do not belong to the cluster.
423
        """
424
        
425
        execute = self.execute
426
        fetchone = self.fetchone
427
        
428
        # The node.
429
        props = self.node_get_properties(node)
430
        if props is None:
431
            return None
432
        parent, path = props
433
        
434
        # The latest version.
435
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
436
             "from versions "
437
             "where serial = (select max(serial) "
438
                             "from versions "
439
                             "where node = ? and mtime < ?) "
440
             "and cluster != ?")
441
        execute(q, (node, before, except_cluster))
442
        props = fetchone()
443
        if props is None:
444
            return None
445
        mtime = props[MTIME]
446
        
447
        # First level, just under node (get population).
448
        q = ("select count(serial), sum(size), max(mtime) "
449
             "from versions v "
450
             "where serial = (select max(serial) "
451
                             "from versions "
452
                             "where node = v.node and mtime < ?) "
453
             "and cluster != ? "
454
             "and node in (select node "
455
                          "from nodes "
456
                          "where parent = ?)")
457
        execute(q, (before, except_cluster, node))
458
        r = fetchone()
459
        if r is None:
460
            return None
461
        count = r[0]
462
        mtime = max(mtime, r[2])
463
        if count == 0:
464
            return (0, 0, mtime)
465
        
466
        # All children (get size and mtime).
467
        # This is why the full path is stored.
468
        q = ("select count(serial), sum(size), max(mtime) "
469
             "from versions v "
470
             "where serial = (select max(serial) "
471
                             "from versions "
472
                             "where node = v.node and mtime < ?) "
473
             "and cluster != ? "
474
             "and node in (select node "
475
                          "from nodes "
476
                          "where path like ? escape '\\')")
477
        execute(q, (before, except_cluster, self.escape_like(path) + '%'))
478
        r = fetchone()
479
        if r is None:
480
            return None
481
        size = r[1] - props[SIZE]
482
        mtime = max(mtime, r[2])
483
        return (count, size, mtime)
484
    
485
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
486
        """Create a new version from the given properties.
487
           Return the (serial, mtime) of the new version.
488
        """
489
        
490
        q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
491
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
492
        mtime = time()
493
        props = (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
494
        serial = self.execute(q, props).lastrowid
495
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
496
        return serial, mtime
497
    
498
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
499
        """Lookup the current version of the given node.
500
           Return a list with its properties:
501
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
502
           or None if the current version is not found in the given cluster.
503
        """
504
        
505
        q = ("select %s "
506
             "from versions "
507
             "where serial = (select max(serial) "
508
                             "from versions "
509
                             "where node = ? and mtime < ?) "
510
             "and cluster = ?")
511
        if not all_props:
512
            q = q % "serial"
513
        else:
514
            q = q % "serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster"
515
        
516
        self.execute(q, (node, before, cluster))
517
        props = self.fetchone()
518
        if props is not None:
519
            return props
520
        return None
521

    
522
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
523
        """Lookup the current versions of the given nodes.
524
           Return a list with their properties:
525
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
526
        """
527
        
528
        q = ("select %s "
529
             "from versions "
530
             "where serial in (select max(serial) "
531
                             "from versions "
532
                             "where node in (%s) and mtime < ? group by node) "
533
             "and cluster = ? %s")
534
        placeholders = ','.join('?' for node in nodes)
535
        if not all_props:
536
            q = q % ("serial",  placeholders, '')
537
        else:
538
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster",  placeholders, 'order by node')
539
        
540
        args = nodes
541
        args.extend((before, cluster))
542
        self.execute(q, args)
543
        return self.fetchall()
544
    
545
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
546
        """Return a sequence of values for the properties of
547
           the version specified by serial and the keys, in the order given.
548
           If keys is empty, return all properties in the order
549
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
550
        """
551
        
552
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
553
             "from versions "
554
             "where serial = ?")
555
        self.execute(q, (serial,))
556
        r = self.fetchone()
557
        if r is None:
558
            return r
559
        
560
        if not keys:
561
            return r
562
        return [r[propnames[k]] for k in keys if k in propnames]
563
    
564
    def version_put_property(self, serial, key, value):
565
        """Set value for the property of version specified by key."""
566
        
567
        if key not in _propnames:
568
            return
569
        q = "update versions set %s = ? where serial = ?" % key
570
        self.execute(q, (value, serial))
571
    
572
    def version_recluster(self, serial, cluster):
573
        """Move the version into another cluster."""
574
        
575
        props = self.version_get_properties(serial)
576
        if not props:
577
            return
578
        node = props[NODE]
579
        size = props[SIZE]
580
        oldcluster = props[CLUSTER]
581
        if cluster == oldcluster:
582
            return
583
        
584
        mtime = time()
585
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
586
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
587
        
588
        q = "update versions set cluster = ? where serial = ?"
589
        self.execute(q, (cluster, serial))
590
    
591
    def version_remove(self, serial):
592
        """Remove the serial specified."""
593
        
594
        props = self.version_get_properties(serial)
595
        if not props:
596
            return
597
        node = props[NODE]
598
        hash = props[HASH]
599
        size = props[SIZE]
600
        cluster = props[CLUSTER]
601
        
602
        mtime = time()
603
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
604
        
605
        q = "delete from versions where serial = ?"
606
        self.execute(q, (serial,))
607
        return hash, size
608
    
609
    def attribute_get(self, serial, domain, keys=()):
610
        """Return a list of (key, value) pairs of the version specified by serial.
611
           If keys is empty, return all attributes.
612
           Othwerise, return only those specified.
613
        """
614
        
615
        execute = self.execute
616
        if keys:
617
            marks = ','.join('?' for k in keys)
618
            q = ("select key, value from attributes "
619
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
620
            execute(q, keys + (serial, domain))
621
        else:
622
            q = "select key, value from attributes where serial = ? and domain = ?"
623
            execute(q, (serial, domain))
624
        return self.fetchall()
625
    
626
    def attribute_set(self, serial, domain, items):
627
        """Set the attributes of the version specified by serial.
628
           Receive attributes as an iterable of (key, value) pairs.
629
        """
630
        
631
        q = ("insert or replace into attributes (serial, domain, key, value) "
632
             "values (?, ?, ?, ?)")
633
        self.executemany(q, ((serial, domain, k, v) for k, v in items))
634
    
635
    def attribute_del(self, serial, domain, keys=()):
636
        """Delete attributes of the version specified by serial.
637
           If keys is empty, delete all attributes.
638
           Otherwise delete those specified.
639
        """
640
        
641
        if keys:
642
            q = "delete from attributes where serial = ? and domain = ? and key = ?"
643
            self.executemany(q, ((serial, domain, key) for key in keys))
644
        else:
645
            q = "delete from attributes where serial = ? and domain = ?"
646
            self.execute(q, (serial, domain))
647
    
648
    def attribute_copy(self, source, dest):
649
        q = ("insert or replace into attributes "
650
             "select ?, domain, key, value from attributes "
651
             "where serial = ?")
652
        self.execute(q, (dest, source))
653
    
654
    def _construct_filters(self, domain, filterq):
655
        if not domain or not filterq:
656
            return None, None
657
        
658
        subqlist = []
659
        append = subqlist.append
660
        included, excluded, opers = parse_filters(filterq)
661
        args = []
662
        
663
        if included:
664
            subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
665
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
666
            subq += ")"
667
            args += [domain]
668
            args += included
669
            append(subq)
670
        
671
        if excluded:
672
            subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
673
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
674
            subq += ")"
675
            args += [domain]
676
            args += excluded
677
            append(subq)
678
        
679
        if opers:
680
            for k, o, v in opers:
681
                subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
682
                subq += "key = ? and value %s ?" % (o,)
683
                subq += ")"
684
                args += [domain, k, v]
685
                append(subq)
686
        
687
        if not subqlist:
688
            return None, None
689
        
690
        subq = ' and ' + ' and '.join(subqlist)
691
        
692
        return subq, args
693
    
694
    def _construct_paths(self, pathq):
695
        if not pathq:
696
            return None, None
697
        
698
        subqlist = []
699
        args = []
700
        for path, match in pathq:
701
            if match == MATCH_PREFIX:
702
                subqlist.append("n.path like ? escape '\\'")
703
                args.append(self.escape_like(path) + '%')
704
            elif match == MATCH_EXACT:
705
                subqlist.append("n.path = ?")
706
                args.append(path)
707
        
708
        subq = ' and (' + ' or '.join(subqlist) + ')'
709
        args = tuple(args)
710
        
711
        return subq, args
712
    
713
    def _construct_size(self, sizeq):
714
        if not sizeq or len(sizeq) != 2:
715
            return None, None
716
        
717
        subq = ''
718
        args = []
719
        if sizeq[0]:
720
            subq += " and v.size >= ?"
721
            args += [sizeq[0]]
722
        if sizeq[1]:
723
            subq += " and v.size < ?"
724
            args += [sizeq[1]]
725
        
726
        return subq, args
727
    
728
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
729
        """Return a list with all keys pairs defined
730
           for all latest versions under parent that
731
           do not belong to the cluster.
732
        """
733
        
734
        # TODO: Use another table to store before=inf results.
735
        q = ("select distinct a.key "
736
             "from attributes a, versions v, nodes n "
737
             "where v.serial = (select max(serial) "
738
                               "from versions "
739
                               "where node = v.node and mtime < ?) "
740
             "and v.cluster != ? "
741
             "and v.node in (select node "
742
                            "from nodes "
743
                            "where parent = ?) "
744
             "and a.serial = v.serial "
745
             "and a.domain = ? "
746
             "and n.node = v.node")
747
        args = (before, except_cluster, parent, domain)
748
        subq, subargs = self._construct_paths(pathq)
749
        if subq is not None:
750
            q += subq
751
            args += subargs
752
        self.execute(q, args)
753
        return [r[0] for r in self.fetchall()]
754
    
755
    def latest_version_list(self, parent, prefix='', delimiter=None,
756
                            start='', limit=10000, before=inf,
757
                            except_cluster=0, pathq=[], domain=None,
758
                            filterq=[], sizeq=None, all_props=False):
759
        """Return a (list of (path, serial) tuples, list of common prefixes)
760
           for the current versions of the paths with the given parent,
761
           matching the following criteria.
762
           
763
           The property tuple for a version is returned if all
764
           of these conditions are true:
765
                
766
                a. parent matches
767
                
768
                b. path > start
769
                
770
                c. path starts with prefix (and paths in pathq)
771
                
772
                d. version is the max up to before
773
                
774
                e. version is not in cluster
775
                
776
                f. the path does not have the delimiter occuring
777
                   after the prefix, or ends with the delimiter
778
                
779
                g. serial matches the attribute filter query.
780
                   
781
                   A filter query is a comma-separated list of
782
                   terms in one of these three forms:
783
                   
784
                   key
785
                       an attribute with this key must exist
786
                   
787
                   !key
788
                       an attribute with this key must not exist
789
                   
790
                   key ?op value
791
                       the attribute with this key satisfies the value
792
                       where ?op is one of =, != <=, >=, <, >.
793
                
794
                h. the size is in the range set by sizeq
795
           
796
           The list of common prefixes includes the prefixes
797
           matching up to the first delimiter after prefix,
798
           and are reported only once, as "virtual directories".
799
           The delimiter is included in the prefixes.
800
           
801
           If arguments are None, then the corresponding matching rule
802
           will always match.
803
           
804
           Limit applies to the first list of tuples returned.
805
           
806
           If all_props is True, return all properties after path, not just serial.
807
        """
808
        
809
        execute = self.execute
810
        
811
        if not start or start < prefix:
812
            start = strprevling(prefix)
813
        nextling = strnextling(prefix)
814
        
815
        q = ("select distinct n.path, %s "
816
             "from versions v, nodes n "
817
             "where v.serial = (select max(serial) "
818
                               "from versions "
819
                               "where node = v.node and mtime < ?) "
820
             "and v.cluster != ? "
821
             "and v.node in (select node "
822
                            "from nodes "
823
                            "where parent = ?) "
824
             "and n.node = v.node "
825
             "and n.path > ? and n.path < ?")
826
        if not all_props:
827
            q = q % "v.serial"
828
        else:
829
            q = q % "v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster"
830
        args = [before, except_cluster, parent, start, nextling]
831
        
832
        subq, subargs = self._construct_paths(pathq)
833
        if subq is not None:
834
            q += subq
835
            args += subargs
836
        subq, subargs = self._construct_size(sizeq)
837
        if subq is not None:
838
            q += subq
839
            args += subargs
840
        subq, subargs = self._construct_filters(domain, filterq)
841
        if subq is not None:
842
            q += subq
843
            args += subargs
844
        else:
845
            q = q.replace("attributes a, ", "")
846
            q = q.replace("and a.serial = v.serial ", "")
847
        q += " order by n.path"
848
        
849
        if not delimiter:
850
            q += " limit ?"
851
            args.append(limit)
852
            execute(q, args)
853
            return self.fetchall(), ()
854
        
855
        pfz = len(prefix)
856
        dz = len(delimiter)
857
        count = 0
858
        fetchone = self.fetchone
859
        prefixes = []
860
        pappend = prefixes.append
861
        matches = []
862
        mappend = matches.append
863
        
864
        execute(q, args)
865
        while True:
866
            props = fetchone()
867
            if props is None:
868
                break
869
            path = props[0]
870
            serial = props[1]
871
            idx = path.find(delimiter, pfz)
872
            
873
            if idx < 0:
874
                mappend(props)
875
                count += 1
876
                if count >= limit:
877
                    break
878
                continue
879
            
880
            if idx + dz == len(path):
881
                mappend(props)
882
                count += 1
883
                continue # Get one more, in case there is a path.
884
            pf = path[:idx + dz]
885
            pappend(pf)
886
            if count >= limit: 
887
                break
888
            
889
            args[3] = strnextling(pf) # New start.
890
            execute(q, args)
891
        
892
        return matches, prefixes
893
    
894
    def latest_uuid(self, uuid):
895
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
896
        
897
        q = ("select n.path, v.serial "
898
             "from versions v, nodes n "
899
             "where v.serial = (select max(serial) "
900
                               "from versions "
901
                               "where uuid = ?) "
902
             "and n.node = v.node")
903
        self.execute(q, (uuid,))
904
        return self.fetchone()