Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ 9d4502a8

History | View | Annotate | Download (33.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35

    
36
from dbworker import DBWorker
37

    
38
from pithos.backends.filter import parse_filters
39

    
40

    
41
ROOTNODE = 0
42

    
43
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
44
 CLUSTER) = range(11)
45

    
46
(MATCH_PREFIX, MATCH_EXACT) = range(2)
47

    
48
inf = float('inf')
49

    
50

    
51
def strnextling(prefix):
52
    """Return the first unicode string
53
       greater than but not starting with given prefix.
54
       strnextling('hello') -> 'hellp'
55
    """
56
    if not prefix:
57
        ## all strings start with the null string,
58
        ## therefore we have to approximate strnextling('')
59
        ## with the last unicode character supported by python
60
        ## 0x10ffff for wide (32-bit unicode) python builds
61
        ## 0x00ffff for narrow (16-bit unicode) python builds
62
        ## We will not autodetect. 0xffff is safe enough.
63
        return unichr(0xffff)
64
    s = prefix[:-1]
65
    c = ord(prefix[-1])
66
    if c >= 0xffff:
67
        raise RuntimeError
68
    s += unichr(c + 1)
69
    return s
70

    
71

    
72
def strprevling(prefix):
73
    """Return an approximation of the last unicode string
74
       less than but not starting with given prefix.
75
       strprevling(u'hello') -> u'helln\\xffff'
76
    """
77
    if not prefix:
78
        ## There is no prevling for the null string
79
        return prefix
80
    s = prefix[:-1]
81
    c = ord(prefix[-1])
82
    if c > 0:
83
        s += unichr(c - 1) + unichr(0xffff)
84
    return s
85

    
86

    
87
_propnames = {
88
    'serial': 0,
89
    'node': 1,
90
    'hash': 2,
91
    'size': 3,
92
    'type': 4,
93
    'source': 5,
94
    'mtime': 6,
95
    'muser': 7,
96
    'uuid': 8,
97
    'checksum': 9,
98
    'cluster': 10
99
}
100

    
101

    
102
class Node(DBWorker):
103
    """Nodes store path organization and have multiple versions.
104
       Versions store object history and have multiple attributes.
105
       Attributes store metadata.
106
    """
107

    
108
    # TODO: Provide an interface for included and excluded clusters.
109

    
110
    def __init__(self, **params):
111
        DBWorker.__init__(self, **params)
112
        execute = self.execute
113

    
114
        execute(""" pragma foreign_keys = on """)
115

    
116
        execute(""" create table if not exists nodes
117
                          ( node       integer primary key,
118
                            parent     integer default 0,
119
                            path       text    not null default '',
120
                            latest_version     integer,
121
                            foreign key (parent)
122
                            references nodes(node)
123
                            on update cascade
124
                            on delete cascade ) """)
125
        execute(""" create unique index if not exists idx_nodes_path
126
                    on nodes(path) """)
127
        execute(""" create index if not exists idx_nodes_parent
128
                    on nodes(parent) """)
129

    
130
        execute(""" create table if not exists policy
131
                          ( node   integer,
132
                            key    text,
133
                            value  text,
134
                            primary key (node, key)
135
                            foreign key (node)
136
                            references nodes(node)
137
                            on update cascade
138
                            on delete cascade ) """)
139

    
140
        execute(""" create table if not exists statistics
141
                          ( node       integer,
142
                            population integer not null default 0,
143
                            size       integer not null default 0,
144
                            mtime      integer,
145
                            cluster    integer not null default 0,
146
                            primary key (node, cluster)
147
                            foreign key (node)
148
                            references nodes(node)
149
                            on update cascade
150
                            on delete cascade ) """)
151

    
152
        execute(""" create table if not exists versions
153
                          ( serial     integer primary key,
154
                            node       integer,
155
                            hash       text,
156
                            size       integer not null default 0,
157
                            type       text    not null default '',
158
                            source     integer,
159
                            mtime      integer,
160
                            muser      text    not null default '',
161
                            uuid       text    not null default '',
162
                            checksum   text    not null default '',
163
                            cluster    integer not null default 0,
164
                            foreign key (node)
165
                            references nodes(node)
166
                            on update cascade
167
                            on delete cascade ) """)
168
        execute(""" create index if not exists idx_versions_node_mtime
169
                    on versions(node, mtime) """)
170
        execute(""" create index if not exists idx_versions_node_uuid
171
                    on versions(uuid) """)
172

    
173
        execute(""" create table if not exists attributes
174
                          ( serial integer,
175
                            domain text,
176
                            key    text,
177
                            value  text,
178
                            primary key (serial, domain, key)
179
                            foreign key (serial)
180
                            references versions(serial)
181
                            on update cascade
182
                            on delete cascade ) """)
183

    
184
        q = "insert or ignore into nodes(node, parent) values (?, ?)"
185
        execute(q, (ROOTNODE, ROOTNODE))
186

    
187
    def node_create(self, parent, path):
188
        """Create a new node from the given properties.
189
           Return the node identifier of the new node.
190
        """
191

    
192
        q = ("insert into nodes (parent, path) "
193
             "values (?, ?)")
194
        props = (parent, path)
195
        return self.execute(q, props).lastrowid
196

    
197
    def node_lookup(self, path):
198
        """Lookup the current node of the given path.
199
           Return None if the path is not found.
200
        """
201

    
202
        q = "select node from nodes where path = ?"
203
        self.execute(q, (path,))
204
        r = self.fetchone()
205
        if r is not None:
206
            return r[0]
207
        return None
208

    
209
    def node_lookup_bulk(self, paths):
210
        """Lookup the current nodes for the given paths.
211
           Return () if the path is not found.
212
        """
213

    
214
        placeholders = ','.join('?' for path in paths)
215
        q = "select node from nodes where path in (%s)" % placeholders
216
        self.execute(q, paths)
217
        r = self.fetchall()
218
        if r is not None:
219
            return [row[0] for row in r]
220
        return None
221

    
222
    def node_get_properties(self, node):
223
        """Return the node's (parent, path).
224
           Return None if the node is not found.
225
        """
226

    
227
        q = "select parent, path from nodes where node = ?"
228
        self.execute(q, (node,))
229
        return self.fetchone()
230

    
231
    def node_get_versions(self, node, keys=(), propnames=_propnames):
232
        """Return the properties of all versions at node.
233
           If keys is empty, return all properties in the order
234
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
235
        """
236

    
237
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
238
             "from versions "
239
             "where node = ?")
240
        self.execute(q, (node,))
241
        r = self.fetchall()
242
        if r is None:
243
            return r
244

    
245
        if not keys:
246
            return r
247
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
248

    
249
    def node_count_children(self, node):
250
        """Return node's child count."""
251

    
252
        q = "select count(node) from nodes where parent = ? and node != 0"
253
        self.execute(q, (node,))
254
        r = self.fetchone()
255
        if r is None:
256
            return 0
257
        return r[0]
258

    
259
    def node_purge_children(self, parent, before=inf, cluster=0):
260
        """Delete all versions with the specified
261
           parent and cluster, and return
262
           the hashes and size of versions deleted.
263
           Clears out nodes with no remaining versions.
264
        """
265

    
266
        execute = self.execute
267
        q = ("select count(serial), sum(size) from versions "
268
             "where node in (select node "
269
             "from nodes "
270
             "where parent = ?) "
271
             "and cluster = ? "
272
             "and mtime <= ?")
273
        args = (parent, cluster, before)
274
        execute(q, args)
275
        nr, size = self.fetchone()
276
        if not nr:
277
            return (), 0
278
        mtime = time()
279
        self.statistics_update(parent, -nr, -size, mtime, cluster)
280
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
281

    
282
        q = ("select hash from versions "
283
             "where node in (select node "
284
             "from nodes "
285
             "where parent = ?) "
286
             "and cluster = ? "
287
             "and mtime <= ?")
288
        execute(q, args)
289
        hashes = []
290
        serials = []
291
        for r in self.fetchall():
292
            hashes += [r[0]]
293
            serials += [r[1]]
294
        
295
        q = ("delete from versions "
296
             "where node in (select node "
297
             "from nodes "
298
             "where parent = ?) "
299
             "and cluster = ? "
300
             "and mtime <= ?")
301
        execute(q, args)
302
        q = ("delete from nodes "
303
             "where node in (select node from nodes n "
304
             "where (select count(serial) "
305
             "from versions "
306
             "where node = n.node) = 0 "
307
             "and parent = ?)")
308
        execute(q, (parent,))
309
        return hashes, size, serials
310

    
311
    def node_purge(self, node, before=inf, cluster=0):
312
        """Delete all versions with the specified
313
           node and cluster, and return
314
           the hashes and size of versions deleted.
315
           Clears out the node if it has no remaining versions.
316
        """
317

    
318
        execute = self.execute
319
        q = ("select count(serial), sum(size) from versions "
320
             "where node = ? "
321
             "and cluster = ? "
322
             "and mtime <= ?")
323
        args = (node, cluster, before)
324
        execute(q, args)
325
        nr, size = self.fetchone()
326
        if not nr:
327
            return (), 0
328
        mtime = time()
329
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
330

    
331
        q = ("select hash from versions "
332
             "where node = ? "
333
             "and cluster = ? "
334
             "and mtime <= ?")
335
        execute(q, args)
336
        hashes = []
337
        serials = []
338
        for r in self.fetchall():
339
            hashes += [r[0]]
340
            serials += [r[1]]
341
        
342
        q = ("delete from versions "
343
             "where node = ? "
344
             "and cluster = ? "
345
             "and mtime <= ?")
346
        execute(q, args)
347
        q = ("delete from nodes "
348
             "where node in (select node from nodes n "
349
             "where (select count(serial) "
350
             "from versions "
351
             "where node = n.node) = 0 "
352
             "and node = ?)")
353
        execute(q, (node,))
354
        return hashes, size, serials
355

    
356
    def node_remove(self, node):
357
        """Remove the node specified.
358
           Return false if the node has children or is not found.
359
        """
360

    
361
        if self.node_count_children(node):
362
            return False
363

    
364
        mtime = time()
365
        q = ("select count(serial), sum(size), cluster "
366
             "from versions "
367
             "where node = ? "
368
             "group by cluster")
369
        self.execute(q, (node,))
370
        for population, size, cluster in self.fetchall():
371
            self.statistics_update_ancestors(
372
                node, -population, -size, mtime, cluster)
373

    
374
        q = "delete from nodes where node = ?"
375
        self.execute(q, (node,))
376
        return True
377

    
378
    def policy_get(self, node):
379
        q = "select key, value from policy where node = ?"
380
        self.execute(q, (node,))
381
        return dict(self.fetchall())
382

    
383
    def policy_set(self, node, policy):
384
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
385
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
386

    
387
    def statistics_get(self, node, cluster=0):
388
        """Return population, total size and last mtime
389
           for all versions under node that belong to the cluster.
390
        """
391

    
392
        q = ("select population, size, mtime from statistics "
393
             "where node = ? and cluster = ?")
394
        self.execute(q, (node, cluster))
395
        return self.fetchone()
396

    
397
    def statistics_update(self, node, population, size, mtime, cluster=0):
398
        """Update the statistics of the given node.
399
           Statistics keep track the population, total
400
           size of objects and mtime in the node's namespace.
401
           May be zero or positive or negative numbers.
402
        """
403

    
404
        qs = ("select population, size from statistics "
405
              "where node = ? and cluster = ?")
406
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
407
              "values (?, ?, ?, ?, ?)")
408
        self.execute(qs, (node, cluster))
409
        r = self.fetchone()
410
        if r is None:
411
            prepopulation, presize = (0, 0)
412
        else:
413
            prepopulation, presize = r
414
        population += prepopulation
415
        size += presize
416
        self.execute(qu, (node, population, size, mtime, cluster))
417

    
418
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
419
        """Update the statistics of the given node's parent.
420
           Then recursively update all parents up to the root.
421
           Population is not recursive.
422
        """
423

    
424
        while True:
425
            if node == 0:
426
                break
427
            props = self.node_get_properties(node)
428
            if props is None:
429
                break
430
            parent, path = props
431
            self.statistics_update(parent, population, size, mtime, cluster)
432
            node = parent
433
            population = 0  # Population isn't recursive
434

    
435
    def statistics_latest(self, node, before=inf, except_cluster=0):
436
        """Return population, total size and last mtime
437
           for all latest versions under node that
438
           do not belong to the cluster.
439
        """
440

    
441
        execute = self.execute
442
        fetchone = self.fetchone
443

    
444
        # The node.
445
        props = self.node_get_properties(node)
446
        if props is None:
447
            return None
448
        parent, path = props
449

    
450
        # The latest version.
451
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
452
             "from versions v "
453
             "where serial = %s "
454
             "and cluster != ?")
455
        subq, args = self._construct_latest_version_subquery(
456
            node=node, before=before)
457
        execute(q % subq, args + [except_cluster])
458
        props = fetchone()
459
        if props is None:
460
            return None
461
        mtime = props[MTIME]
462

    
463
        # First level, just under node (get population).
464
        q = ("select count(serial), sum(size), max(mtime) "
465
             "from versions v "
466
             "where serial = %s "
467
             "and cluster != ? "
468
             "and node in (select node "
469
             "from nodes "
470
             "where parent = ?)")
471
        subq, args = self._construct_latest_version_subquery(
472
            node=None, before=before)
473
        execute(q % subq, args + [except_cluster, node])
474
        r = fetchone()
475
        if r is None:
476
            return None
477
        count = r[0]
478
        mtime = max(mtime, r[2])
479
        if count == 0:
480
            return (0, 0, mtime)
481

    
482
        # All children (get size and mtime).
483
        # This is why the full path is stored.
484
        q = ("select count(serial), sum(size), max(mtime) "
485
             "from versions v "
486
             "where serial = %s "
487
             "and cluster != ? "
488
             "and node in (select node "
489
             "from nodes "
490
             "where path like ? escape '\\')")
491
        subq, args = self._construct_latest_version_subquery(
492
            node=None, before=before)
493
        execute(
494
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
495
        r = fetchone()
496
        if r is None:
497
            return None
498
        size = r[1] - props[SIZE]
499
        mtime = max(mtime, r[2])
500
        return (count, size, mtime)
501

    
502
    def nodes_set_latest_version(self, node, serial):
503
        q = ("update nodes set latest_version = ? where node = ?")
504
        props = (serial, node)
505
        self.execute(q, props)
506

    
507
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
508
        """Create a new version from the given properties.
509
           Return the (serial, mtime) of the new version.
510
        """
511

    
512
        q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
513
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
514
        mtime = time()
515
        props = (node, hash, size, type, source, mtime, muser,
516
                 uuid, checksum, cluster)
517
        serial = self.execute(q, props).lastrowid
518
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
519

    
520
        self.nodes_set_latest_version(node, serial)
521

    
522
        return serial, mtime
523

    
524
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
525
        """Lookup the current version of the given node.
526
           Return a list with its properties:
527
           (serial, node, hash, size, type, source, mtime,
528
            muser, uuid, checksum, cluster)
529
           or None if the current version is not found in the given cluster.
530
        """
531

    
532
        q = ("select %s "
533
             "from versions v "
534
             "where serial = %s "
535
             "and cluster = ?")
536
        subq, args = self._construct_latest_version_subquery(
537
            node=node, before=before)
538
        if not all_props:
539
            q = q % ("serial", subq)
540
        else:
541
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
542

    
543
        self.execute(q, args + [cluster])
544
        props = self.fetchone()
545
        if props is not None:
546
            return props
547
        return None
548

    
549
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
550
        """Lookup the current versions of the given nodes.
551
           Return a list with their properties:
552
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
553
        """
554

    
555
        if not nodes:
556
            return ()
557
        q = ("select %s "
558
             "from versions "
559
             "where serial in %s "
560
             "and cluster = ? %s")
561
        subq, args = self._construct_latest_versions_subquery(
562
            nodes=nodes, before=before)
563
        if not all_props:
564
            q = q % ("serial", subq, '')
565
        else:
566
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq, 'order by node')
567

    
568
        args += [cluster]
569
        self.execute(q, args)
570
        return self.fetchall()
571

    
572
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
573
        """Return a sequence of values for the properties of
574
           the version specified by serial and the keys, in the order given.
575
           If keys is empty, return all properties in the order
576
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
577
        """
578

    
579
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
580
             "from versions "
581
             "where serial = ?")
582
        self.execute(q, (serial,))
583
        r = self.fetchone()
584
        if r is None:
585
            return r
586

    
587
        if not keys:
588
            return r
589
        return [r[propnames[k]] for k in keys if k in propnames]
590

    
591
    def version_put_property(self, serial, key, value):
592
        """Set value for the property of version specified by key."""
593

    
594
        if key not in _propnames:
595
            return
596
        q = "update versions set %s = ? where serial = ?" % key
597
        self.execute(q, (value, serial))
598

    
599
    def version_recluster(self, serial, cluster):
600
        """Move the version into another cluster."""
601

    
602
        props = self.version_get_properties(serial)
603
        if not props:
604
            return
605
        node = props[NODE]
606
        size = props[SIZE]
607
        oldcluster = props[CLUSTER]
608
        if cluster == oldcluster:
609
            return
610

    
611
        mtime = time()
612
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
613
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
614

    
615
        q = "update versions set cluster = ? where serial = ?"
616
        self.execute(q, (cluster, serial))
617

    
618
    def version_remove(self, serial):
619
        """Remove the serial specified."""
620

    
621
        props = self.version_get_properties(serial)
622
        if not props:
623
            return
624
        node = props[NODE]
625
        hash = props[HASH]
626
        size = props[SIZE]
627
        cluster = props[CLUSTER]
628

    
629
        mtime = time()
630
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
631

    
632
        q = "delete from versions where serial = ?"
633
        self.execute(q, (serial,))
634

    
635
        props = self.version_lookup(node, cluster=cluster, all_props=False)
636
        if props:
637
            self.nodes_set_latest_version(node, props[0])
638
        return hash, size
639

    
640
    def attribute_get(self, serial, domain, keys=()):
641
        """Return a list of (key, value) pairs of the version specified by serial.
642
           If keys is empty, return all attributes.
643
           Othwerise, return only those specified.
644
        """
645

    
646
        execute = self.execute
647
        if keys:
648
            marks = ','.join('?' for k in keys)
649
            q = ("select key, value from attributes "
650
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
651
            execute(q, keys + (serial, domain))
652
        else:
653
            q = "select key, value from attributes where serial = ? and domain = ?"
654
            execute(q, (serial, domain))
655
        return self.fetchall()
656

    
657
    def attribute_set(self, serial, domain, items):
658
        """Set the attributes of the version specified by serial.
659
           Receive attributes as an iterable of (key, value) pairs.
660
        """
661

    
662
        q = ("insert or replace into attributes (serial, domain, key, value) "
663
             "values (?, ?, ?, ?)")
664
        self.executemany(q, ((serial, domain, k, v) for k, v in items))
665

    
666
    def attribute_del(self, serial, domain, keys=()):
667
        """Delete attributes of the version specified by serial.
668
           If keys is empty, delete all attributes.
669
           Otherwise delete those specified.
670
        """
671

    
672
        if keys:
673
            q = "delete from attributes where serial = ? and domain = ? and key = ?"
674
            self.executemany(q, ((serial, domain, key) for key in keys))
675
        else:
676
            q = "delete from attributes where serial = ? and domain = ?"
677
            self.execute(q, (serial, domain))
678

    
679
    def attribute_copy(self, source, dest):
680
        q = ("insert or replace into attributes "
681
             "select ?, domain, key, value from attributes "
682
             "where serial = ?")
683
        self.execute(q, (dest, source))
684

    
685
    def _construct_filters(self, domain, filterq):
686
        if not domain or not filterq:
687
            return None, None
688

    
689
        subqlist = []
690
        append = subqlist.append
691
        included, excluded, opers = parse_filters(filterq)
692
        args = []
693

    
694
        if included:
695
            subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
696
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
697
            subq += ")"
698
            args += [domain]
699
            args += included
700
            append(subq)
701

    
702
        if excluded:
703
            subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
704
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
705
            subq += ")"
706
            args += [domain]
707
            args += excluded
708
            append(subq)
709

    
710
        if opers:
711
            for k, o, v in opers:
712
                subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
713
                subq += "key = ? and value %s ?" % (o,)
714
                subq += ")"
715
                args += [domain, k, v]
716
                append(subq)
717

    
718
        if not subqlist:
719
            return None, None
720

    
721
        subq = ' and ' + ' and '.join(subqlist)
722

    
723
        return subq, args
724

    
725
    def _construct_paths(self, pathq):
726
        if not pathq:
727
            return None, None
728

    
729
        subqlist = []
730
        args = []
731
        for path, match in pathq:
732
            if match == MATCH_PREFIX:
733
                subqlist.append("n.path like ? escape '\\'")
734
                args.append(self.escape_like(path) + '%')
735
            elif match == MATCH_EXACT:
736
                subqlist.append("n.path = ?")
737
                args.append(path)
738

    
739
        subq = ' and (' + ' or '.join(subqlist) + ')'
740
        args = tuple(args)
741

    
742
        return subq, args
743

    
744
    def _construct_size(self, sizeq):
745
        if not sizeq or len(sizeq) != 2:
746
            return None, None
747

    
748
        subq = ''
749
        args = []
750
        if sizeq[0]:
751
            subq += " and v.size >= ?"
752
            args += [sizeq[0]]
753
        if sizeq[1]:
754
            subq += " and v.size < ?"
755
            args += [sizeq[1]]
756

    
757
        return subq, args
758

    
759
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
760
        if before == inf:
761
            q = ("n.latest_version ")
762
            args = []
763
        else:
764
            q = ("(select max(serial) "
765
                 "from versions "
766
                 "where node = v.node and mtime < ?) ")
767
            args = [before]
768
        return q, args
769

    
770
    def _construct_latest_version_subquery(self, node=None, before=inf):
771
        where_cond = "node = v.node"
772
        args = []
773
        if node:
774
            where_cond = "node = ? "
775
            args = [node]
776

    
777
        if before == inf:
778
            q = ("(select latest_version "
779
                 "from nodes "
780
                 "where %s) ")
781
        else:
782
            q = ("(select max(serial) "
783
                 "from versions "
784
                 "where %s and mtime < ?) ")
785
            args += [before]
786
        return q % where_cond, args
787

    
788
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
789
        where_cond = ""
790
        args = []
791
        if nodes:
792
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
793
            args = nodes
794

    
795
        if before == inf:
796
            q = ("(select latest_version "
797
                 "from nodes "
798
                 "where %s ) ")
799
        else:
800
            q = ("(select max(serial) "
801
                 "from versions "
802
                 "where %s and mtime < ? group by node) ")
803
            args += [before]
804
        return q % where_cond, args
805

    
806
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
807
        """Return a list with all keys pairs defined
808
           for all latest versions under parent that
809
           do not belong to the cluster.
810
        """
811

    
812
        # TODO: Use another table to store before=inf results.
813
        q = ("select distinct a.key "
814
             "from attributes a, versions v, nodes n "
815
             "where v.serial = %s "
816
             "and v.cluster != ? "
817
             "and v.node in (select node "
818
             "from nodes "
819
             "where parent = ?) "
820
             "and a.serial = v.serial "
821
             "and a.domain = ? "
822
             "and n.node = v.node")
823
        subq, subargs = self._construct_latest_version_subquery(
824
            node=None, before=before)
825
        args = subargs + [except_cluster, parent, domain]
826
        q = q % subq
827
        subq, subargs = self._construct_paths(pathq)
828
        if subq is not None:
829
            q += subq
830
            args += subargs
831
        self.execute(q, args)
832
        return [r[0] for r in self.fetchall()]
833

    
834
    def latest_version_list(self, parent, prefix='', delimiter=None,
835
                            start='', limit=10000, before=inf,
836
                            except_cluster=0, pathq=[], domain=None,
837
                            filterq=[], sizeq=None, all_props=False):
838
        """Return a (list of (path, serial) tuples, list of common prefixes)
839
           for the current versions of the paths with the given parent,
840
           matching the following criteria.
841

842
           The property tuple for a version is returned if all
843
           of these conditions are true:
844

845
                a. parent matches
846

847
                b. path > start
848

849
                c. path starts with prefix (and paths in pathq)
850

851
                d. version is the max up to before
852

853
                e. version is not in cluster
854

855
                f. the path does not have the delimiter occuring
856
                   after the prefix, or ends with the delimiter
857

858
                g. serial matches the attribute filter query.
859

860
                   A filter query is a comma-separated list of
861
                   terms in one of these three forms:
862

863
                   key
864
                       an attribute with this key must exist
865

866
                   !key
867
                       an attribute with this key must not exist
868

869
                   key ?op value
870
                       the attribute with this key satisfies the value
871
                       where ?op is one of =, != <=, >=, <, >.
872

873
                h. the size is in the range set by sizeq
874

875
           The list of common prefixes includes the prefixes
876
           matching up to the first delimiter after prefix,
877
           and are reported only once, as "virtual directories".
878
           The delimiter is included in the prefixes.
879

880
           If arguments are None, then the corresponding matching rule
881
           will always match.
882

883
           Limit applies to the first list of tuples returned.
884

885
           If all_props is True, return all properties after path, not just serial.
886
        """
887

    
888
        execute = self.execute
889

    
890
        if not start or start < prefix:
891
            start = strprevling(prefix)
892
        nextling = strnextling(prefix)
893

    
894
        q = ("select distinct n.path, %s "
895
             "from versions v, nodes n "
896
             "where v.serial = %s "
897
             "and v.cluster != ? "
898
             "and v.node in (select node "
899
             "from nodes "
900
             "where parent = ?) "
901
             "and n.node = v.node "
902
             "and n.path > ? and n.path < ?")
903
        subq, args = self._construct_versions_nodes_latest_version_subquery(
904
            before)
905
        if not all_props:
906
            q = q % ("v.serial", subq)
907
        else:
908
            q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
909
        args += [except_cluster, parent, start, nextling]
910
        start_index = len(args) - 2
911

    
912
        subq, subargs = self._construct_paths(pathq)
913
        if subq is not None:
914
            q += subq
915
            args += subargs
916
        subq, subargs = self._construct_size(sizeq)
917
        if subq is not None:
918
            q += subq
919
            args += subargs
920
        subq, subargs = self._construct_filters(domain, filterq)
921
        if subq is not None:
922
            q += subq
923
            args += subargs
924
        else:
925
            q = q.replace("attributes a, ", "")
926
            q = q.replace("and a.serial = v.serial ", "")
927
        q += " order by n.path"
928

    
929
        if not delimiter:
930
            q += " limit ?"
931
            args.append(limit)
932
            execute(q, args)
933
            return self.fetchall(), ()
934

    
935
        pfz = len(prefix)
936
        dz = len(delimiter)
937
        count = 0
938
        fetchone = self.fetchone
939
        prefixes = []
940
        pappend = prefixes.append
941
        matches = []
942
        mappend = matches.append
943

    
944
        execute(q, args)
945
        while True:
946
            props = fetchone()
947
            if props is None:
948
                break
949
            path = props[0]
950
            serial = props[1]
951
            idx = path.find(delimiter, pfz)
952

    
953
            if idx < 0:
954
                mappend(props)
955
                count += 1
956
                if count >= limit:
957
                    break
958
                continue
959

    
960
            if idx + dz == len(path):
961
                mappend(props)
962
                count += 1
963
                continue  # Get one more, in case there is a path.
964
            pf = path[:idx + dz]
965
            pappend(pf)
966
            if count >= limit:
967
                break
968

    
969
            args[start_index] = strnextling(pf)  # New start.
970
            execute(q, args)
971

    
972
        return matches, prefixes
973

    
974
    def latest_uuid(self, uuid):
975
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
976

    
977
        q = ("select n.path, v.serial "
978
             "from versions v, nodes n "
979
             "where v.serial = (select max(serial) "
980
             "from versions "
981
             "where uuid = ?) "
982
             "and n.node = v.node")
983
        self.execute(q, (uuid,))
984
        return self.fetchone()