Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ 442bc80c

History | View | Annotate | Download (33.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35

    
36
from dbworker import DBWorker
37

    
38
from pithos.backends.filter import parse_filters
39

    
40

    
41
ROOTNODE = 0
42

    
43
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
44
 CLUSTER) = range(11)
45

    
46
(MATCH_PREFIX, MATCH_EXACT) = range(2)
47

    
48
inf = float('inf')
49

    
50

    
51
def strnextling(prefix):
52
    """Return the first unicode string
53
       greater than but not starting with given prefix.
54
       strnextling('hello') -> 'hellp'
55
    """
56
    if not prefix:
57
        ## all strings start with the null string,
58
        ## therefore we have to approximate strnextling('')
59
        ## with the last unicode character supported by python
60
        ## 0x10ffff for wide (32-bit unicode) python builds
61
        ## 0x00ffff for narrow (16-bit unicode) python builds
62
        ## We will not autodetect. 0xffff is safe enough.
63
        return unichr(0xffff)
64
    s = prefix[:-1]
65
    c = ord(prefix[-1])
66
    if c >= 0xffff:
67
        raise RuntimeError
68
    s += unichr(c + 1)
69
    return s
70

    
71

    
72
def strprevling(prefix):
73
    """Return an approximation of the last unicode string
74
       less than but not starting with given prefix.
75
       strprevling(u'hello') -> u'helln\\xffff'
76
    """
77
    if not prefix:
78
        ## There is no prevling for the null string
79
        return prefix
80
    s = prefix[:-1]
81
    c = ord(prefix[-1])
82
    if c > 0:
83
        s += unichr(c - 1) + unichr(0xffff)
84
    return s
85

    
86

    
87
_propnames = {
88
    'serial': 0,
89
    'node': 1,
90
    'hash': 2,
91
    'size': 3,
92
    'type': 4,
93
    'source': 5,
94
    'mtime': 6,
95
    'muser': 7,
96
    'uuid': 8,
97
    'checksum': 9,
98
    'cluster': 10
99
}
100

    
101

    
102
class Node(DBWorker):
103
    """Nodes store path organization and have multiple versions.
104
       Versions store object history and have multiple attributes.
105
       Attributes store metadata.
106
    """
107

    
108
    # TODO: Provide an interface for included and excluded clusters.
109

    
110
    def __init__(self, **params):
111
        DBWorker.__init__(self, **params)
112
        execute = self.execute
113

    
114
        execute(""" pragma foreign_keys = on """)
115

    
116
        execute(""" create table if not exists nodes
117
                          ( node       integer primary key,
118
                            parent     integer default 0,
119
                            path       text    not null default '',
120
                            latest_version     integer,
121
                            foreign key (parent)
122
                            references nodes(node)
123
                            on update cascade
124
                            on delete cascade ) """)
125
        execute(""" create unique index if not exists idx_nodes_path
126
                    on nodes(path) """)
127
        execute(""" create index if not exists idx_nodes_parent
128
                    on nodes(parent) """)
129

    
130
        execute(""" create table if not exists policy
131
                          ( node   integer,
132
                            key    text,
133
                            value  text,
134
                            primary key (node, key)
135
                            foreign key (node)
136
                            references nodes(node)
137
                            on update cascade
138
                            on delete cascade ) """)
139

    
140
        execute(""" create table if not exists statistics
141
                          ( node       integer,
142
                            population integer not null default 0,
143
                            size       integer not null default 0,
144
                            mtime      integer,
145
                            cluster    integer not null default 0,
146
                            primary key (node, cluster)
147
                            foreign key (node)
148
                            references nodes(node)
149
                            on update cascade
150
                            on delete cascade ) """)
151

    
152
        execute(""" create table if not exists versions
153
                          ( serial     integer primary key,
154
                            node       integer,
155
                            hash       text,
156
                            size       integer not null default 0,
157
                            type       text    not null default '',
158
                            source     integer,
159
                            mtime      integer,
160
                            muser      text    not null default '',
161
                            uuid       text    not null default '',
162
                            checksum   text    not null default '',
163
                            cluster    integer not null default 0,
164
                            foreign key (node)
165
                            references nodes(node)
166
                            on update cascade
167
                            on delete cascade ) """)
168
        execute(""" create index if not exists idx_versions_node_mtime
169
                    on versions(node, mtime) """)
170
        execute(""" create index if not exists idx_versions_node_uuid
171
                    on versions(uuid) """)
172

    
173
        execute(""" create table if not exists attributes
174
                          ( serial integer,
175
                            domain text,
176
                            key    text,
177
                            value  text,
178
                            primary key (serial, domain, key)
179
                            foreign key (serial)
180
                            references versions(serial)
181
                            on update cascade
182
                            on delete cascade ) """)
183

    
184
        q = "insert or ignore into nodes(node, parent) values (?, ?)"
185
        execute(q, (ROOTNODE, ROOTNODE))
186

    
187
    def node_create(self, parent, path):
188
        """Create a new node from the given properties.
189
           Return the node identifier of the new node.
190
        """
191

    
192
        q = ("insert into nodes (parent, path) "
193
             "values (?, ?)")
194
        props = (parent, path)
195
        return self.execute(q, props).lastrowid
196

    
197
    def node_lookup(self, path):
198
        """Lookup the current node of the given path.
199
           Return None if the path is not found.
200
        """
201

    
202
        q = "select node from nodes where path = ?"
203
        self.execute(q, (path,))
204
        r = self.fetchone()
205
        if r is not None:
206
            return r[0]
207
        return None
208

    
209
    def node_lookup_bulk(self, paths):
210
        """Lookup the current nodes for the given paths.
211
           Return () if the path is not found.
212
        """
213

    
214
        placeholders = ','.join('?' for path in paths)
215
        q = "select node from nodes where path in (%s)" % placeholders
216
        self.execute(q, paths)
217
        r = self.fetchall()
218
        if r is not None:
219
            return [row[0] for row in r]
220
        return None
221

    
222
    def node_get_properties(self, node):
223
        """Return the node's (parent, path).
224
           Return None if the node is not found.
225
        """
226

    
227
        q = "select parent, path from nodes where node = ?"
228
        self.execute(q, (node,))
229
        return self.fetchone()
230

    
231
    def node_get_versions(self, node, keys=(), propnames=_propnames):
232
        """Return the properties of all versions at node.
233
           If keys is empty, return all properties in the order
234
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
235
        """
236

    
237
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
238
             "from versions "
239
             "where node = ?")
240
        self.execute(q, (node,))
241
        r = self.fetchall()
242
        if r is None:
243
            return r
244

    
245
        if not keys:
246
            return r
247
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
248

    
249
    def node_count_children(self, node):
250
        """Return node's child count."""
251

    
252
        q = "select count(node) from nodes where parent = ? and node != 0"
253
        self.execute(q, (node,))
254
        r = self.fetchone()
255
        if r is None:
256
            return 0
257
        return r[0]
258

    
259
    def node_purge_children(self, parent, before=inf, cluster=0):
260
        """Delete all versions with the specified
261
           parent and cluster, and return
262
           the hashes and size of versions deleted.
263
           Clears out nodes with no remaining versions.
264
        """
265

    
266
        execute = self.execute
267
        q = ("select count(serial), sum(size) from versions "
268
             "where node in (select node "
269
             "from nodes "
270
             "where parent = ?) "
271
             "and cluster = ? "
272
             "and mtime <= ?")
273
        args = (parent, cluster, before)
274
        execute(q, args)
275
        nr, size = self.fetchone()
276
        if not nr:
277
            return (), 0, ()
278
        mtime = time()
279
        self.statistics_update(parent, -nr, -size, mtime, cluster)
280
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
281

    
282
        q = ("select hash from versions "
283
             "where node in (select node "
284
             "from nodes "
285
             "where parent = ?) "
286
             "and cluster = ? "
287
             "and mtime <= ?")
288
        execute(q, args)
289
        hashes = []
290
        serials = []
291
        for r in self.fetchall():
292
            hashes += [r[0]]
293
            serials += [r[1]]
294
        
295
        q = ("delete from versions "
296
             "where node in (select node "
297
             "from nodes "
298
             "where parent = ?) "
299
             "and cluster = ? "
300
             "and mtime <= ?")
301
        execute(q, args)
302
        q = ("delete from nodes "
303
             "where node in (select node from nodes n "
304
             "where (select count(serial) "
305
             "from versions "
306
             "where node = n.node) = 0 "
307
             "and parent = ?)")
308
        execute(q, (parent,))
309
        return hashes, size, serials
310

    
311
    def node_purge(self, node, before=inf, cluster=0):
312
        """Delete all versions with the specified
313
           node and cluster, and return
314
           the hashes and size of versions deleted.
315
           Clears out the node if it has no remaining versions.
316
        """
317

    
318
        execute = self.execute
319
        q = ("select count(serial), sum(size) from versions "
320
             "where node = ? "
321
             "and cluster = ? "
322
             "and mtime <= ?")
323
        args = (node, cluster, before)
324
        execute(q, args)
325
        nr, size = self.fetchone()
326
        if not nr:
327
            return (), 0, ()
328
        mtime = time()
329
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
330

    
331
        q = ("select hash from versions "
332
             "where node = ? "
333
             "and cluster = ? "
334
             "and mtime <= ?")
335
        execute(q, args)
336
        hashes = []
337
        serials = []
338
        for r in self.fetchall():
339
            hashes += [r[0]]
340
            serials += [r[1]]
341
        
342
        q = ("delete from versions "
343
             "where node = ? "
344
             "and cluster = ? "
345
             "and mtime <= ?")
346
        execute(q, args)
347
        q = ("delete from nodes "
348
             "where node in (select node from nodes n "
349
             "where (select count(serial) "
350
             "from versions "
351
             "where node = n.node) = 0 "
352
             "and node = ?)")
353
        execute(q, (node,))
354
        return hashes, size, serials
355

    
356
    def node_remove(self, node):
357
        """Remove the node specified.
358
           Return false if the node has children or is not found.
359
        """
360

    
361
        if self.node_count_children(node):
362
            return False
363

    
364
        mtime = time()
365
        q = ("select count(serial), sum(size), cluster "
366
             "from versions "
367
             "where node = ? "
368
             "group by cluster")
369
        self.execute(q, (node,))
370
        for population, size, cluster in self.fetchall():
371
            self.statistics_update_ancestors(
372
                node, -population, -size, mtime, cluster)
373

    
374
        q = "delete from nodes where node = ?"
375
        self.execute(q, (node,))
376
        return True
377

    
378
    def policy_get(self, node):
379
        q = "select key, value from policy where node = ?"
380
        self.execute(q, (node,))
381
        return dict(self.fetchall())
382

    
383
    def policy_set(self, node, policy):
384
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
385
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
386

    
387
    def statistics_get(self, node, cluster=0):
388
        """Return population, total size and last mtime
389
           for all versions under node that belong to the cluster.
390
        """
391

    
392
        q = ("select population, size, mtime from statistics "
393
             "where node = ? and cluster = ?")
394
        self.execute(q, (node, cluster))
395
        return self.fetchone()
396

    
397
    def statistics_update(self, node, population, size, mtime, cluster=0):
398
        """Update the statistics of the given node.
399
           Statistics keep track the population, total
400
           size of objects and mtime in the node's namespace.
401
           May be zero or positive or negative numbers.
402
        """
403

    
404
        qs = ("select population, size from statistics "
405
              "where node = ? and cluster = ?")
406
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
407
              "values (?, ?, ?, ?, ?)")
408
        self.execute(qs, (node, cluster))
409
        r = self.fetchone()
410
        if r is None:
411
            prepopulation, presize = (0, 0)
412
        else:
413
            prepopulation, presize = r
414
        population += prepopulation
415
        population = max(population, 0)
416
        size += presize
417
        self.execute(qu, (node, population, size, mtime, cluster))
418

    
419
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
420
        """Update the statistics of the given node's parent.
421
           Then recursively update all parents up to the root.
422
           Population is not recursive.
423
        """
424

    
425
        while True:
426
            if node == 0:
427
                break
428
            props = self.node_get_properties(node)
429
            if props is None:
430
                break
431
            parent, path = props
432
            self.statistics_update(parent, population, size, mtime, cluster)
433
            node = parent
434
            population = 0  # Population isn't recursive
435

    
436
    def statistics_latest(self, node, before=inf, except_cluster=0):
437
        """Return population, total size and last mtime
438
           for all latest versions under node that
439
           do not belong to the cluster.
440
        """
441

    
442
        execute = self.execute
443
        fetchone = self.fetchone
444

    
445
        # The node.
446
        props = self.node_get_properties(node)
447
        if props is None:
448
            return None
449
        parent, path = props
450

    
451
        # The latest version.
452
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
453
             "from versions v "
454
             "where serial = %s "
455
             "and cluster != ?")
456
        subq, args = self._construct_latest_version_subquery(
457
            node=node, before=before)
458
        execute(q % subq, args + [except_cluster])
459
        props = fetchone()
460
        if props is None:
461
            return None
462
        mtime = props[MTIME]
463

    
464
        # First level, just under node (get population).
465
        q = ("select count(serial), sum(size), max(mtime) "
466
             "from versions v "
467
             "where serial = %s "
468
             "and cluster != ? "
469
             "and node in (select node "
470
             "from nodes "
471
             "where parent = ?)")
472
        subq, args = self._construct_latest_version_subquery(
473
            node=None, before=before)
474
        execute(q % subq, args + [except_cluster, node])
475
        r = fetchone()
476
        if r is None:
477
            return None
478
        count = r[0]
479
        mtime = max(mtime, r[2])
480
        if count == 0:
481
            return (0, 0, mtime)
482

    
483
        # All children (get size and mtime).
484
        # This is why the full path is stored.
485
        q = ("select count(serial), sum(size), max(mtime) "
486
             "from versions v "
487
             "where serial = %s "
488
             "and cluster != ? "
489
             "and node in (select node "
490
             "from nodes "
491
             "where path like ? escape '\\')")
492
        subq, args = self._construct_latest_version_subquery(
493
            node=None, before=before)
494
        execute(
495
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
496
        r = fetchone()
497
        if r is None:
498
            return None
499
        size = r[1] - props[SIZE]
500
        mtime = max(mtime, r[2])
501
        return (count, size, mtime)
502

    
503
    def nodes_set_latest_version(self, node, serial):
504
        q = ("update nodes set latest_version = ? where node = ?")
505
        props = (serial, node)
506
        self.execute(q, props)
507

    
508
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
509
        """Create a new version from the given properties.
510
           Return the (serial, mtime) of the new version.
511
        """
512

    
513
        q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
514
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
515
        mtime = time()
516
        props = (node, hash, size, type, source, mtime, muser,
517
                 uuid, checksum, cluster)
518
        serial = self.execute(q, props).lastrowid
519
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
520

    
521
        self.nodes_set_latest_version(node, serial)
522

    
523
        return serial, mtime
524

    
525
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
526
        """Lookup the current version of the given node.
527
           Return a list with its properties:
528
           (serial, node, hash, size, type, source, mtime,
529
            muser, uuid, checksum, cluster)
530
           or None if the current version is not found in the given cluster.
531
        """
532

    
533
        q = ("select %s "
534
             "from versions v "
535
             "where serial = %s "
536
             "and cluster = ?")
537
        subq, args = self._construct_latest_version_subquery(
538
            node=node, before=before)
539
        if not all_props:
540
            q = q % ("serial", subq)
541
        else:
542
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
543

    
544
        self.execute(q, args + [cluster])
545
        props = self.fetchone()
546
        if props is not None:
547
            return props
548
        return None
549

    
550
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
551
        """Lookup the current versions of the given nodes.
552
           Return a list with their properties:
553
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
554
        """
555

    
556
        if not nodes:
557
            return ()
558
        q = ("select %s "
559
             "from versions "
560
             "where serial in %s "
561
             "and cluster = ? %s")
562
        subq, args = self._construct_latest_versions_subquery(
563
            nodes=nodes, before=before)
564
        if not all_props:
565
            q = q % ("serial", subq, '')
566
        else:
567
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq, 'order by node')
568

    
569
        args += [cluster]
570
        self.execute(q, args)
571
        return self.fetchall()
572

    
573
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
574
        """Return a sequence of values for the properties of
575
           the version specified by serial and the keys, in the order given.
576
           If keys is empty, return all properties in the order
577
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
578
        """
579

    
580
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
581
             "from versions "
582
             "where serial = ?")
583
        self.execute(q, (serial,))
584
        r = self.fetchone()
585
        if r is None:
586
            return r
587

    
588
        if not keys:
589
            return r
590
        return [r[propnames[k]] for k in keys if k in propnames]
591

    
592
    def version_put_property(self, serial, key, value):
593
        """Set value for the property of version specified by key."""
594

    
595
        if key not in _propnames:
596
            return
597
        q = "update versions set %s = ? where serial = ?" % key
598
        self.execute(q, (value, serial))
599

    
600
    def version_recluster(self, serial, cluster):
601
        """Move the version into another cluster."""
602

    
603
        props = self.version_get_properties(serial)
604
        if not props:
605
            return
606
        node = props[NODE]
607
        size = props[SIZE]
608
        oldcluster = props[CLUSTER]
609
        if cluster == oldcluster:
610
            return
611

    
612
        mtime = time()
613
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
614
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
615

    
616
        q = "update versions set cluster = ? where serial = ?"
617
        self.execute(q, (cluster, serial))
618

    
619
    def version_remove(self, serial):
620
        """Remove the serial specified."""
621

    
622
        props = self.version_get_properties(serial)
623
        if not props:
624
            return
625
        node = props[NODE]
626
        hash = props[HASH]
627
        size = props[SIZE]
628
        cluster = props[CLUSTER]
629

    
630
        mtime = time()
631
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
632

    
633
        q = "delete from versions where serial = ?"
634
        self.execute(q, (serial,))
635

    
636
        props = self.version_lookup(node, cluster=cluster, all_props=False)
637
        if props:
638
            self.nodes_set_latest_version(node, props[0])
639
        return hash, size
640

    
641
    def attribute_get(self, serial, domain, keys=()):
642
        """Return a list of (key, value) pairs of the version specified by serial.
643
           If keys is empty, return all attributes.
644
           Othwerise, return only those specified.
645
        """
646

    
647
        execute = self.execute
648
        if keys:
649
            marks = ','.join('?' for k in keys)
650
            q = ("select key, value from attributes "
651
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
652
            execute(q, keys + (serial, domain))
653
        else:
654
            q = "select key, value from attributes where serial = ? and domain = ?"
655
            execute(q, (serial, domain))
656
        return self.fetchall()
657

    
658
    def attribute_set(self, serial, domain, items):
659
        """Set the attributes of the version specified by serial.
660
           Receive attributes as an iterable of (key, value) pairs.
661
        """
662

    
663
        q = ("insert or replace into attributes (serial, domain, key, value) "
664
             "values (?, ?, ?, ?)")
665
        self.executemany(q, ((serial, domain, k, v) for k, v in items))
666

    
667
    def attribute_del(self, serial, domain, keys=()):
668
        """Delete attributes of the version specified by serial.
669
           If keys is empty, delete all attributes.
670
           Otherwise delete those specified.
671
        """
672

    
673
        if keys:
674
            q = "delete from attributes where serial = ? and domain = ? and key = ?"
675
            self.executemany(q, ((serial, domain, key) for key in keys))
676
        else:
677
            q = "delete from attributes where serial = ? and domain = ?"
678
            self.execute(q, (serial, domain))
679

    
680
    def attribute_copy(self, source, dest):
681
        q = ("insert or replace into attributes "
682
             "select ?, domain, key, value from attributes "
683
             "where serial = ?")
684
        self.execute(q, (dest, source))
685

    
686
    def _construct_filters(self, domain, filterq):
687
        if not domain or not filterq:
688
            return None, None
689

    
690
        subqlist = []
691
        append = subqlist.append
692
        included, excluded, opers = parse_filters(filterq)
693
        args = []
694

    
695
        if included:
696
            subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
697
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
698
            subq += ")"
699
            args += [domain]
700
            args += included
701
            append(subq)
702

    
703
        if excluded:
704
            subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
705
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
706
            subq += ")"
707
            args += [domain]
708
            args += excluded
709
            append(subq)
710

    
711
        if opers:
712
            for k, o, v in opers:
713
                subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
714
                subq += "key = ? and value %s ?" % (o,)
715
                subq += ")"
716
                args += [domain, k, v]
717
                append(subq)
718

    
719
        if not subqlist:
720
            return None, None
721

    
722
        subq = ' and ' + ' and '.join(subqlist)
723

    
724
        return subq, args
725

    
726
    def _construct_paths(self, pathq):
727
        if not pathq:
728
            return None, None
729

    
730
        subqlist = []
731
        args = []
732
        for path, match in pathq:
733
            if match == MATCH_PREFIX:
734
                subqlist.append("n.path like ? escape '\\'")
735
                args.append(self.escape_like(path) + '%')
736
            elif match == MATCH_EXACT:
737
                subqlist.append("n.path = ?")
738
                args.append(path)
739

    
740
        subq = ' and (' + ' or '.join(subqlist) + ')'
741
        args = tuple(args)
742

    
743
        return subq, args
744

    
745
    def _construct_size(self, sizeq):
746
        if not sizeq or len(sizeq) != 2:
747
            return None, None
748

    
749
        subq = ''
750
        args = []
751
        if sizeq[0]:
752
            subq += " and v.size >= ?"
753
            args += [sizeq[0]]
754
        if sizeq[1]:
755
            subq += " and v.size < ?"
756
            args += [sizeq[1]]
757

    
758
        return subq, args
759

    
760
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
761
        if before == inf:
762
            q = ("n.latest_version ")
763
            args = []
764
        else:
765
            q = ("(select max(serial) "
766
                 "from versions "
767
                 "where node = v.node and mtime < ?) ")
768
            args = [before]
769
        return q, args
770

    
771
    def _construct_latest_version_subquery(self, node=None, before=inf):
772
        where_cond = "node = v.node"
773
        args = []
774
        if node:
775
            where_cond = "node = ? "
776
            args = [node]
777

    
778
        if before == inf:
779
            q = ("(select latest_version "
780
                 "from nodes "
781
                 "where %s) ")
782
        else:
783
            q = ("(select max(serial) "
784
                 "from versions "
785
                 "where %s and mtime < ?) ")
786
            args += [before]
787
        return q % where_cond, args
788

    
789
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
790
        where_cond = ""
791
        args = []
792
        if nodes:
793
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
794
            args = nodes
795

    
796
        if before == inf:
797
            q = ("(select latest_version "
798
                 "from nodes "
799
                 "where %s ) ")
800
        else:
801
            q = ("(select max(serial) "
802
                 "from versions "
803
                 "where %s and mtime < ? group by node) ")
804
            args += [before]
805
        return q % where_cond, args
806

    
807
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
808
        """Return a list with all keys pairs defined
809
           for all latest versions under parent that
810
           do not belong to the cluster.
811
        """
812

    
813
        # TODO: Use another table to store before=inf results.
814
        q = ("select distinct a.key "
815
             "from attributes a, versions v, nodes n "
816
             "where v.serial = %s "
817
             "and v.cluster != ? "
818
             "and v.node in (select node "
819
             "from nodes "
820
             "where parent = ?) "
821
             "and a.serial = v.serial "
822
             "and a.domain = ? "
823
             "and n.node = v.node")
824
        subq, subargs = self._construct_latest_version_subquery(
825
            node=None, before=before)
826
        args = subargs + [except_cluster, parent, domain]
827
        q = q % subq
828
        subq, subargs = self._construct_paths(pathq)
829
        if subq is not None:
830
            q += subq
831
            args += subargs
832
        self.execute(q, args)
833
        return [r[0] for r in self.fetchall()]
834

    
835
    def latest_version_list(self, parent, prefix='', delimiter=None,
836
                            start='', limit=10000, before=inf,
837
                            except_cluster=0, pathq=[], domain=None,
838
                            filterq=[], sizeq=None, all_props=False):
839
        """Return a (list of (path, serial) tuples, list of common prefixes)
840
           for the current versions of the paths with the given parent,
841
           matching the following criteria.
842

843
           The property tuple for a version is returned if all
844
           of these conditions are true:
845

846
                a. parent matches
847

848
                b. path > start
849

850
                c. path starts with prefix (and paths in pathq)
851

852
                d. version is the max up to before
853

854
                e. version is not in cluster
855

856
                f. the path does not have the delimiter occuring
857
                   after the prefix, or ends with the delimiter
858

859
                g. serial matches the attribute filter query.
860

861
                   A filter query is a comma-separated list of
862
                   terms in one of these three forms:
863

864
                   key
865
                       an attribute with this key must exist
866

867
                   !key
868
                       an attribute with this key must not exist
869

870
                   key ?op value
871
                       the attribute with this key satisfies the value
872
                       where ?op is one of =, != <=, >=, <, >.
873

874
                h. the size is in the range set by sizeq
875

876
           The list of common prefixes includes the prefixes
877
           matching up to the first delimiter after prefix,
878
           and are reported only once, as "virtual directories".
879
           The delimiter is included in the prefixes.
880

881
           If arguments are None, then the corresponding matching rule
882
           will always match.
883

884
           Limit applies to the first list of tuples returned.
885

886
           If all_props is True, return all properties after path, not just serial.
887
        """
888

    
889
        execute = self.execute
890

    
891
        if not start or start < prefix:
892
            start = strprevling(prefix)
893
        nextling = strnextling(prefix)
894

    
895
        q = ("select distinct n.path, %s "
896
             "from versions v, nodes n "
897
             "where v.serial = %s "
898
             "and v.cluster != ? "
899
             "and v.node in (select node "
900
             "from nodes "
901
             "where parent = ?) "
902
             "and n.node = v.node "
903
             "and n.path > ? and n.path < ?")
904
        subq, args = self._construct_versions_nodes_latest_version_subquery(
905
            before)
906
        if not all_props:
907
            q = q % ("v.serial", subq)
908
        else:
909
            q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
910
        args += [except_cluster, parent, start, nextling]
911
        start_index = len(args) - 2
912

    
913
        subq, subargs = self._construct_paths(pathq)
914
        if subq is not None:
915
            q += subq
916
            args += subargs
917
        subq, subargs = self._construct_size(sizeq)
918
        if subq is not None:
919
            q += subq
920
            args += subargs
921
        subq, subargs = self._construct_filters(domain, filterq)
922
        if subq is not None:
923
            q += subq
924
            args += subargs
925
        else:
926
            q = q.replace("attributes a, ", "")
927
            q = q.replace("and a.serial = v.serial ", "")
928
        q += " order by n.path"
929

    
930
        if not delimiter:
931
            q += " limit ?"
932
            args.append(limit)
933
            execute(q, args)
934
            return self.fetchall(), ()
935

    
936
        pfz = len(prefix)
937
        dz = len(delimiter)
938
        count = 0
939
        fetchone = self.fetchone
940
        prefixes = []
941
        pappend = prefixes.append
942
        matches = []
943
        mappend = matches.append
944

    
945
        execute(q, args)
946
        while True:
947
            props = fetchone()
948
            if props is None:
949
                break
950
            path = props[0]
951
            serial = props[1]
952
            idx = path.find(delimiter, pfz)
953

    
954
            if idx < 0:
955
                mappend(props)
956
                count += 1
957
                if count >= limit:
958
                    break
959
                continue
960

    
961
            if idx + dz == len(path):
962
                mappend(props)
963
                count += 1
964
                continue  # Get one more, in case there is a path.
965
            pf = path[:idx + dz]
966
            pappend(pf)
967
            if count >= limit:
968
                break
969

    
970
            args[start_index] = strnextling(pf)  # New start.
971
            execute(q, args)
972

    
973
        return matches, prefixes
974

    
975
    def latest_uuid(self, uuid):
976
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
977

    
978
        q = ("select n.path, v.serial "
979
             "from versions v, nodes n "
980
             "where v.serial = (select max(serial) "
981
             "from versions "
982
             "where uuid = ?) "
983
             "and n.node = v.node")
984
        self.execute(q, (uuid,))
985
        return self.fetchone()