Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ cb787cc4

History | View | Annotate | Download (33.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35

    
36
from dbworker import DBWorker
37

    
38
from pithos.backends.filter import parse_filters
39

    
40

    
41
ROOTNODE = 0
42

    
43
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
44
 CLUSTER) = range(11)
45

    
46
(MATCH_PREFIX, MATCH_EXACT) = range(2)
47

    
48
inf = float('inf')
49

    
50

    
51
def strnextling(prefix):
52
    """Return the first unicode string
53
       greater than but not starting with given prefix.
54
       strnextling('hello') -> 'hellp'
55
    """
56
    if not prefix:
57
        ## all strings start with the null string,
58
        ## therefore we have to approximate strnextling('')
59
        ## with the last unicode character supported by python
60
        ## 0x10ffff for wide (32-bit unicode) python builds
61
        ## 0x00ffff for narrow (16-bit unicode) python builds
62
        ## We will not autodetect. 0xffff is safe enough.
63
        return unichr(0xffff)
64
    s = prefix[:-1]
65
    c = ord(prefix[-1])
66
    if c >= 0xffff:
67
        raise RuntimeError
68
    s += unichr(c + 1)
69
    return s
70

    
71

    
72
def strprevling(prefix):
73
    """Return an approximation of the last unicode string
74
       less than but not starting with given prefix.
75
       strprevling(u'hello') -> u'helln\\xffff'
76
    """
77
    if not prefix:
78
        ## There is no prevling for the null string
79
        return prefix
80
    s = prefix[:-1]
81
    c = ord(prefix[-1])
82
    if c > 0:
83
        s += unichr(c - 1) + unichr(0xffff)
84
    return s
85

    
86

    
87
_propnames = {
88
    'serial': 0,
89
    'node': 1,
90
    'hash': 2,
91
    'size': 3,
92
    'type': 4,
93
    'source': 5,
94
    'mtime': 6,
95
    'muser': 7,
96
    'uuid': 8,
97
    'checksum': 9,
98
    'cluster': 10
99
}
100

    
101

    
102
class Node(DBWorker):
103
    """Nodes store path organization and have multiple versions.
104
       Versions store object history and have multiple attributes.
105
       Attributes store metadata.
106
    """
107

    
108
    # TODO: Provide an interface for included and excluded clusters.
109

    
110
    def __init__(self, **params):
111
        DBWorker.__init__(self, **params)
112
        execute = self.execute
113

    
114
        execute(""" pragma foreign_keys = on """)
115

    
116
        execute(""" create table if not exists nodes
117
                          ( node       integer primary key,
118
                            parent     integer default 0,
119
                            path       text    not null default '',
120
                            latest_version     integer,
121
                            foreign key (parent)
122
                            references nodes(node)
123
                            on update cascade
124
                            on delete cascade ) """)
125
        execute(""" create unique index if not exists idx_nodes_path
126
                    on nodes(path) """)
127
        execute(""" create index if not exists idx_nodes_parent
128
                    on nodes(parent) """)
129

    
130
        execute(""" create table if not exists policy
131
                          ( node   integer,
132
                            key    text,
133
                            value  text,
134
                            primary key (node, key)
135
                            foreign key (node)
136
                            references nodes(node)
137
                            on update cascade
138
                            on delete cascade ) """)
139

    
140
        execute(""" create table if not exists statistics
141
                          ( node       integer,
142
                            population integer not null default 0,
143
                            size       integer not null default 0,
144
                            mtime      integer,
145
                            cluster    integer not null default 0,
146
                            primary key (node, cluster)
147
                            foreign key (node)
148
                            references nodes(node)
149
                            on update cascade
150
                            on delete cascade ) """)
151

    
152
        execute(""" create table if not exists versions
153
                          ( serial     integer primary key,
154
                            node       integer,
155
                            hash       text,
156
                            size       integer not null default 0,
157
                            type       text    not null default '',
158
                            source     integer,
159
                            mtime      integer,
160
                            muser      text    not null default '',
161
                            uuid       text    not null default '',
162
                            checksum   text    not null default '',
163
                            cluster    integer not null default 0,
164
                            foreign key (node)
165
                            references nodes(node)
166
                            on update cascade
167
                            on delete cascade ) """)
168
        execute(""" create index if not exists idx_versions_node_mtime
169
                    on versions(node, mtime) """)
170
        execute(""" create index if not exists idx_versions_node_uuid
171
                    on versions(uuid) """)
172

    
173
        execute(""" create table if not exists attributes
174
                          ( serial integer,
175
                            domain text,
176
                            key    text,
177
                            value  text,
178
                            primary key (serial, domain, key)
179
                            foreign key (serial)
180
                            references versions(serial)
181
                            on update cascade
182
                            on delete cascade ) """)
183

    
184
        q = "insert or ignore into nodes(node, parent) values (?, ?)"
185
        execute(q, (ROOTNODE, ROOTNODE))
186

    
187
    def node_create(self, parent, path):
188
        """Create a new node from the given properties.
189
           Return the node identifier of the new node.
190
        """
191

    
192
        q = ("insert into nodes (parent, path) "
193
             "values (?, ?)")
194
        props = (parent, path)
195
        return self.execute(q, props).lastrowid
196

    
197
    def node_lookup(self, path):
198
        """Lookup the current node of the given path.
199
           Return None if the path is not found.
200
        """
201

    
202
        q = "select node from nodes where path = ?"
203
        self.execute(q, (path,))
204
        r = self.fetchone()
205
        if r is not None:
206
            return r[0]
207
        return None
208

    
209
    def node_lookup_bulk(self, paths):
210
        """Lookup the current nodes for the given paths.
211
           Return () if the path is not found.
212
        """
213

    
214
        placeholders = ','.join('?' for path in paths)
215
        q = "select node from nodes where path in (%s)" % placeholders
216
        self.execute(q, paths)
217
        r = self.fetchall()
218
        if r is not None:
219
            return [row[0] for row in r]
220
        return None
221

    
222
    def node_get_properties(self, node):
223
        """Return the node's (parent, path).
224
           Return None if the node is not found.
225
        """
226

    
227
        q = "select parent, path from nodes where node = ?"
228
        self.execute(q, (node,))
229
        return self.fetchone()
230

    
231
    def node_get_versions(self, node, keys=(), propnames=_propnames):
232
        """Return the properties of all versions at node.
233
           If keys is empty, return all properties in the order
234
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
235
        """
236

    
237
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
238
             "from versions "
239
             "where node = ?")
240
        self.execute(q, (node,))
241
        r = self.fetchall()
242
        if r is None:
243
            return r
244

    
245
        if not keys:
246
            return r
247
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
248

    
249
    def node_count_children(self, node):
250
        """Return node's child count."""
251

    
252
        q = "select count(node) from nodes where parent = ? and node != 0"
253
        self.execute(q, (node,))
254
        r = self.fetchone()
255
        if r is None:
256
            return 0
257
        return r[0]
258

    
259
    def node_purge_children(self, parent, before=inf, cluster=0):
260
        """Delete all versions with the specified
261
           parent and cluster, and return
262
           the hashes and size of versions deleted.
263
           Clears out nodes with no remaining versions.
264
        """
265

    
266
        execute = self.execute
267
        q = ("select count(serial), sum(size) from versions "
268
             "where node in (select node "
269
             "from nodes "
270
             "where parent = ?) "
271
             "and cluster = ? "
272
             "and mtime <= ?")
273
        args = (parent, cluster, before)
274
        execute(q, args)
275
        nr, size = self.fetchone()
276
        if not nr:
277
            return (), 0
278
        mtime = time()
279
        self.statistics_update(parent, -nr, -size, mtime, cluster)
280
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
281

    
282
        q = ("select hash from versions "
283
             "where node in (select node "
284
             "from nodes "
285
             "where parent = ?) "
286
             "and cluster = ? "
287
             "and mtime <= ?")
288
        execute(q, args)
289
        hashes = [r[0] for r in self.fetchall()]
290
        q = ("delete from versions "
291
             "where node in (select node "
292
             "from nodes "
293
             "where parent = ?) "
294
             "and cluster = ? "
295
             "and mtime <= ?")
296
        execute(q, args)
297
        q = ("delete from nodes "
298
             "where node in (select node from nodes n "
299
             "where (select count(serial) "
300
             "from versions "
301
             "where node = n.node) = 0 "
302
             "and parent = ?)")
303
        execute(q, (parent,))
304
        return hashes, size
305

    
306
    def node_purge(self, node, before=inf, cluster=0):
307
        """Delete all versions with the specified
308
           node and cluster, and return
309
           the hashes and size of versions deleted.
310
           Clears out the node if it has no remaining versions.
311
        """
312

    
313
        execute = self.execute
314
        q = ("select count(serial), sum(size) from versions "
315
             "where node = ? "
316
             "and cluster = ? "
317
             "and mtime <= ?")
318
        args = (node, cluster, before)
319
        execute(q, args)
320
        nr, size = self.fetchone()
321
        if not nr:
322
            return (), 0
323
        mtime = time()
324
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
325

    
326
        q = ("select hash from versions "
327
             "where node = ? "
328
             "and cluster = ? "
329
             "and mtime <= ?")
330
        execute(q, args)
331
        hashes = [r[0] for r in self.fetchall()]
332
        q = ("delete from versions "
333
             "where node = ? "
334
             "and cluster = ? "
335
             "and mtime <= ?")
336
        execute(q, args)
337
        q = ("delete from nodes "
338
             "where node in (select node from nodes n "
339
             "where (select count(serial) "
340
             "from versions "
341
             "where node = n.node) = 0 "
342
             "and node = ?)")
343
        execute(q, (node,))
344
        return hashes, size
345

    
346
    def node_remove(self, node):
347
        """Remove the node specified.
348
           Return false if the node has children or is not found.
349
        """
350

    
351
        if self.node_count_children(node):
352
            return False
353

    
354
        mtime = time()
355
        q = ("select count(serial), sum(size), cluster "
356
             "from versions "
357
             "where node = ? "
358
             "group by cluster")
359
        self.execute(q, (node,))
360
        for population, size, cluster in self.fetchall():
361
            self.statistics_update_ancestors(
362
                node, -population, -size, mtime, cluster)
363

    
364
        q = "delete from nodes where node = ?"
365
        self.execute(q, (node,))
366
        return True
367

    
368
    def policy_get(self, node):
369
        q = "select key, value from policy where node = ?"
370
        self.execute(q, (node,))
371
        return dict(self.fetchall())
372

    
373
    def policy_set(self, node, policy):
374
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
375
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
376

    
377
    def statistics_get(self, node, cluster=0):
378
        """Return population, total size and last mtime
379
           for all versions under node that belong to the cluster.
380
        """
381

    
382
        q = ("select population, size, mtime from statistics "
383
             "where node = ? and cluster = ?")
384
        self.execute(q, (node, cluster))
385
        return self.fetchone()
386

    
387
    def statistics_update(self, node, population, size, mtime, cluster=0):
388
        """Update the statistics of the given node.
389
           Statistics keep track the population, total
390
           size of objects and mtime in the node's namespace.
391
           May be zero or positive or negative numbers.
392
        """
393

    
394
        qs = ("select population, size from statistics "
395
              "where node = ? and cluster = ?")
396
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
397
              "values (?, ?, ?, ?, ?)")
398
        self.execute(qs, (node, cluster))
399
        r = self.fetchone()
400
        if r is None:
401
            prepopulation, presize = (0, 0)
402
        else:
403
            prepopulation, presize = r
404
        population += prepopulation
405
        size += presize
406
        self.execute(qu, (node, population, size, mtime, cluster))
407

    
408
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
409
        """Update the statistics of the given node's parent.
410
           Then recursively update all parents up to the root.
411
           Population is not recursive.
412
        """
413

    
414
        while True:
415
            if node == 0:
416
                break
417
            props = self.node_get_properties(node)
418
            if props is None:
419
                break
420
            parent, path = props
421
            self.statistics_update(parent, population, size, mtime, cluster)
422
            node = parent
423
            population = 0  # Population isn't recursive
424

    
425
    def statistics_latest(self, node, before=inf, except_cluster=0):
426
        """Return population, total size and last mtime
427
           for all latest versions under node that
428
           do not belong to the cluster.
429
        """
430

    
431
        execute = self.execute
432
        fetchone = self.fetchone
433

    
434
        # The node.
435
        props = self.node_get_properties(node)
436
        if props is None:
437
            return None
438
        parent, path = props
439

    
440
        # The latest version.
441
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
442
             "from versions v "
443
             "where serial = %s "
444
             "and cluster != ?")
445
        subq, args = self._construct_latest_version_subquery(
446
            node=node, before=before)
447
        execute(q % subq, args + [except_cluster])
448
        props = fetchone()
449
        if props is None:
450
            return None
451
        mtime = props[MTIME]
452

    
453
        # First level, just under node (get population).
454
        q = ("select count(serial), sum(size), max(mtime) "
455
             "from versions v "
456
             "where serial = %s "
457
             "and cluster != ? "
458
             "and node in (select node "
459
             "from nodes "
460
             "where parent = ?)")
461
        subq, args = self._construct_latest_version_subquery(
462
            node=None, before=before)
463
        execute(q % subq, args + [except_cluster, node])
464
        r = fetchone()
465
        if r is None:
466
            return None
467
        count = r[0]
468
        mtime = max(mtime, r[2])
469
        if count == 0:
470
            return (0, 0, mtime)
471

    
472
        # All children (get size and mtime).
473
        # This is why the full path is stored.
474
        q = ("select count(serial), sum(size), max(mtime) "
475
             "from versions v "
476
             "where serial = %s "
477
             "and cluster != ? "
478
             "and node in (select node "
479
             "from nodes "
480
             "where path like ? escape '\\')")
481
        subq, args = self._construct_latest_version_subquery(
482
            node=None, before=before)
483
        execute(
484
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
485
        r = fetchone()
486
        if r is None:
487
            return None
488
        size = r[1] - props[SIZE]
489
        mtime = max(mtime, r[2])
490
        return (count, size, mtime)
491

    
492
    def nodes_set_latest_version(self, node, serial):
493
        q = ("update nodes set latest_version = ? where node = ?")
494
        props = (serial, node)
495
        self.execute(q, props)
496

    
497
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
498
        """Create a new version from the given properties.
499
           Return the (serial, mtime) of the new version.
500
        """
501

    
502
        q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
503
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
504
        mtime = time()
505
        props = (node, hash, size, type, source, mtime, muser,
506
                 uuid, checksum, cluster)
507
        serial = self.execute(q, props).lastrowid
508
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
509

    
510
        self.nodes_set_latest_version(node, serial)
511

    
512
        return serial, mtime
513

    
514
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
515
        """Lookup the current version of the given node.
516
           Return a list with its properties:
517
           (serial, node, hash, size, type, source, mtime,
518
            muser, uuid, checksum, cluster)
519
           or None if the current version is not found in the given cluster.
520
        """
521

    
522
        q = ("select %s "
523
             "from versions v "
524
             "where serial = %s "
525
             "and cluster = ?")
526
        subq, args = self._construct_latest_version_subquery(
527
            node=node, before=before)
528
        if not all_props:
529
            q = q % ("serial", subq)
530
        else:
531
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
532

    
533
        self.execute(q, args + [cluster])
534
        props = self.fetchone()
535
        if props is not None:
536
            return props
537
        return None
538

    
539
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
540
        """Lookup the current versions of the given nodes.
541
           Return a list with their properties:
542
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
543
        """
544

    
545
        if not nodes:
546
            return ()
547
        q = ("select %s "
548
             "from versions "
549
             "where serial in %s "
550
             "and cluster = ? %s")
551
        subq, args = self._construct_latest_versions_subquery(
552
            nodes=nodes, before=before)
553
        if not all_props:
554
            q = q % ("serial", subq, '')
555
        else:
556
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq, 'order by node')
557

    
558
        args += [cluster]
559
        self.execute(q, args)
560
        return self.fetchall()
561

    
562
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
563
        """Return a sequence of values for the properties of
564
           the version specified by serial and the keys, in the order given.
565
           If keys is empty, return all properties in the order
566
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
567
        """
568

    
569
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
570
             "from versions "
571
             "where serial = ?")
572
        self.execute(q, (serial,))
573
        r = self.fetchone()
574
        if r is None:
575
            return r
576

    
577
        if not keys:
578
            return r
579
        return [r[propnames[k]] for k in keys if k in propnames]
580

    
581
    def version_put_property(self, serial, key, value):
582
        """Set value for the property of version specified by key."""
583

    
584
        if key not in _propnames:
585
            return
586
        q = "update versions set %s = ? where serial = ?" % key
587
        self.execute(q, (value, serial))
588

    
589
    def version_recluster(self, serial, cluster):
590
        """Move the version into another cluster."""
591

    
592
        props = self.version_get_properties(serial)
593
        if not props:
594
            return
595
        node = props[NODE]
596
        size = props[SIZE]
597
        oldcluster = props[CLUSTER]
598
        if cluster == oldcluster:
599
            return
600

    
601
        mtime = time()
602
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
603
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
604

    
605
        q = "update versions set cluster = ? where serial = ?"
606
        self.execute(q, (cluster, serial))
607

    
608
    def version_remove(self, serial):
609
        """Remove the serial specified."""
610

    
611
        props = self.version_get_properties(serial)
612
        if not props:
613
            return
614
        node = props[NODE]
615
        hash = props[HASH]
616
        size = props[SIZE]
617
        cluster = props[CLUSTER]
618

    
619
        mtime = time()
620
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
621

    
622
        q = "delete from versions where serial = ?"
623
        self.execute(q, (serial,))
624

    
625
        props = self.version_lookup(node, cluster=cluster, all_props=False)
626
        if props:
627
            self.nodes_set_latest_version(node, props[0])
628
        return hash, size
629

    
630
    def attribute_get(self, serial, domain, keys=()):
631
        """Return a list of (key, value) pairs of the version specified by serial.
632
           If keys is empty, return all attributes.
633
           Othwerise, return only those specified.
634
        """
635

    
636
        execute = self.execute
637
        if keys:
638
            marks = ','.join('?' for k in keys)
639
            q = ("select key, value from attributes "
640
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
641
            execute(q, keys + (serial, domain))
642
        else:
643
            q = "select key, value from attributes where serial = ? and domain = ?"
644
            execute(q, (serial, domain))
645
        return self.fetchall()
646

    
647
    def attribute_set(self, serial, domain, items):
648
        """Set the attributes of the version specified by serial.
649
           Receive attributes as an iterable of (key, value) pairs.
650
        """
651

    
652
        q = ("insert or replace into attributes (serial, domain, key, value) "
653
             "values (?, ?, ?, ?)")
654
        self.executemany(q, ((serial, domain, k, v) for k, v in items))
655

    
656
    def attribute_del(self, serial, domain, keys=()):
657
        """Delete attributes of the version specified by serial.
658
           If keys is empty, delete all attributes.
659
           Otherwise delete those specified.
660
        """
661

    
662
        if keys:
663
            q = "delete from attributes where serial = ? and domain = ? and key = ?"
664
            self.executemany(q, ((serial, domain, key) for key in keys))
665
        else:
666
            q = "delete from attributes where serial = ? and domain = ?"
667
            self.execute(q, (serial, domain))
668

    
669
    def attribute_copy(self, source, dest):
670
        q = ("insert or replace into attributes "
671
             "select ?, domain, key, value from attributes "
672
             "where serial = ?")
673
        self.execute(q, (dest, source))
674

    
675
    def _construct_filters(self, domain, filterq):
676
        if not domain or not filterq:
677
            return None, None
678

    
679
        subqlist = []
680
        append = subqlist.append
681
        included, excluded, opers = parse_filters(filterq)
682
        args = []
683

    
684
        if included:
685
            subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
686
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
687
            subq += ")"
688
            args += [domain]
689
            args += included
690
            append(subq)
691

    
692
        if excluded:
693
            subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
694
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
695
            subq += ")"
696
            args += [domain]
697
            args += excluded
698
            append(subq)
699

    
700
        if opers:
701
            for k, o, v in opers:
702
                subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
703
                subq += "key = ? and value %s ?" % (o,)
704
                subq += ")"
705
                args += [domain, k, v]
706
                append(subq)
707

    
708
        if not subqlist:
709
            return None, None
710

    
711
        subq = ' and ' + ' and '.join(subqlist)
712

    
713
        return subq, args
714

    
715
    def _construct_paths(self, pathq):
716
        if not pathq:
717
            return None, None
718

    
719
        subqlist = []
720
        args = []
721
        for path, match in pathq:
722
            if match == MATCH_PREFIX:
723
                subqlist.append("n.path like ? escape '\\'")
724
                args.append(self.escape_like(path) + '%')
725
            elif match == MATCH_EXACT:
726
                subqlist.append("n.path = ?")
727
                args.append(path)
728

    
729
        subq = ' and (' + ' or '.join(subqlist) + ')'
730
        args = tuple(args)
731

    
732
        return subq, args
733

    
734
    def _construct_size(self, sizeq):
735
        if not sizeq or len(sizeq) != 2:
736
            return None, None
737

    
738
        subq = ''
739
        args = []
740
        if sizeq[0]:
741
            subq += " and v.size >= ?"
742
            args += [sizeq[0]]
743
        if sizeq[1]:
744
            subq += " and v.size < ?"
745
            args += [sizeq[1]]
746

    
747
        return subq, args
748

    
749
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
750
        if before == inf:
751
            q = ("n.latest_version ")
752
            args = []
753
        else:
754
            q = ("(select max(serial) "
755
                 "from versions "
756
                 "where node = v.node and mtime < ?) ")
757
            args = [before]
758
        return q, args
759

    
760
    def _construct_latest_version_subquery(self, node=None, before=inf):
761
        where_cond = "node = v.node"
762
        args = []
763
        if node:
764
            where_cond = "node = ? "
765
            args = [node]
766

    
767
        if before == inf:
768
            q = ("(select latest_version "
769
                 "from nodes "
770
                 "where %s) ")
771
        else:
772
            q = ("(select max(serial) "
773
                 "from versions "
774
                 "where %s and mtime < ?) ")
775
            args += [before]
776
        return q % where_cond, args
777

    
778
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
779
        where_cond = ""
780
        args = []
781
        if nodes:
782
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
783
            args = nodes
784

    
785
        if before == inf:
786
            q = ("(select latest_version "
787
                 "from nodes "
788
                 "where %s ) ")
789
        else:
790
            q = ("(select max(serial) "
791
                 "from versions "
792
                 "where %s and mtime < ? group by node) ")
793
            args += [before]
794
        return q % where_cond, args
795

    
796
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
797
        """Return a list with all keys pairs defined
798
           for all latest versions under parent that
799
           do not belong to the cluster.
800
        """
801

    
802
        # TODO: Use another table to store before=inf results.
803
        q = ("select distinct a.key "
804
             "from attributes a, versions v, nodes n "
805
             "where v.serial = %s "
806
             "and v.cluster != ? "
807
             "and v.node in (select node "
808
             "from nodes "
809
             "where parent = ?) "
810
             "and a.serial = v.serial "
811
             "and a.domain = ? "
812
             "and n.node = v.node")
813
        subq, subargs = self._construct_latest_version_subquery(
814
            node=None, before=before)
815
        args = subargs + [except_cluster, parent, domain]
816
        q = q % subq
817
        subq, subargs = self._construct_paths(pathq)
818
        if subq is not None:
819
            q += subq
820
            args += subargs
821
        self.execute(q, args)
822
        return [r[0] for r in self.fetchall()]
823

    
824
    def latest_version_list(self, parent, prefix='', delimiter=None,
825
                            start='', limit=10000, before=inf,
826
                            except_cluster=0, pathq=[], domain=None,
827
                            filterq=[], sizeq=None, all_props=False):
828
        """Return a (list of (path, serial) tuples, list of common prefixes)
829
           for the current versions of the paths with the given parent,
830
           matching the following criteria.
831

832
           The property tuple for a version is returned if all
833
           of these conditions are true:
834

835
                a. parent matches
836

837
                b. path > start
838

839
                c. path starts with prefix (and paths in pathq)
840

841
                d. version is the max up to before
842

843
                e. version is not in cluster
844

845
                f. the path does not have the delimiter occuring
846
                   after the prefix, or ends with the delimiter
847

848
                g. serial matches the attribute filter query.
849

850
                   A filter query is a comma-separated list of
851
                   terms in one of these three forms:
852

853
                   key
854
                       an attribute with this key must exist
855

856
                   !key
857
                       an attribute with this key must not exist
858

859
                   key ?op value
860
                       the attribute with this key satisfies the value
861
                       where ?op is one of =, != <=, >=, <, >.
862

863
                h. the size is in the range set by sizeq
864

865
           The list of common prefixes includes the prefixes
866
           matching up to the first delimiter after prefix,
867
           and are reported only once, as "virtual directories".
868
           The delimiter is included in the prefixes.
869

870
           If arguments are None, then the corresponding matching rule
871
           will always match.
872

873
           Limit applies to the first list of tuples returned.
874

875
           If all_props is True, return all properties after path, not just serial.
876
        """
877

    
878
        execute = self.execute
879

    
880
        if not start or start < prefix:
881
            start = strprevling(prefix)
882
        nextling = strnextling(prefix)
883

    
884
        q = ("select distinct n.path, %s "
885
             "from versions v, nodes n "
886
             "where v.serial = %s "
887
             "and v.cluster != ? "
888
             "and v.node in (select node "
889
             "from nodes "
890
             "where parent = ?) "
891
             "and n.node = v.node "
892
             "and n.path > ? and n.path < ?")
893
        subq, args = self._construct_versions_nodes_latest_version_subquery(
894
            before)
895
        if not all_props:
896
            q = q % ("v.serial", subq)
897
        else:
898
            q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
899
        args += [except_cluster, parent, start, nextling]
900
        start_index = len(args) - 2
901

    
902
        subq, subargs = self._construct_paths(pathq)
903
        if subq is not None:
904
            q += subq
905
            args += subargs
906
        subq, subargs = self._construct_size(sizeq)
907
        if subq is not None:
908
            q += subq
909
            args += subargs
910
        subq, subargs = self._construct_filters(domain, filterq)
911
        if subq is not None:
912
            q += subq
913
            args += subargs
914
        else:
915
            q = q.replace("attributes a, ", "")
916
            q = q.replace("and a.serial = v.serial ", "")
917
        q += " order by n.path"
918

    
919
        if not delimiter:
920
            q += " limit ?"
921
            args.append(limit)
922
            execute(q, args)
923
            return self.fetchall(), ()
924

    
925
        pfz = len(prefix)
926
        dz = len(delimiter)
927
        count = 0
928
        fetchone = self.fetchone
929
        prefixes = []
930
        pappend = prefixes.append
931
        matches = []
932
        mappend = matches.append
933

    
934
        execute(q, args)
935
        while True:
936
            props = fetchone()
937
            if props is None:
938
                break
939
            path = props[0]
940
            serial = props[1]
941
            idx = path.find(delimiter, pfz)
942

    
943
            if idx < 0:
944
                mappend(props)
945
                count += 1
946
                if count >= limit:
947
                    break
948
                continue
949

    
950
            if idx + dz == len(path):
951
                mappend(props)
952
                count += 1
953
                continue  # Get one more, in case there is a path.
954
            pf = path[:idx + dz]
955
            pappend(pf)
956
            if count >= limit:
957
                break
958

    
959
            args[start_index] = strnextling(pf)  # New start.
960
            execute(q, args)
961

    
962
        return matches, prefixes
963

    
964
    def latest_uuid(self, uuid):
965
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
966

    
967
        q = ("select n.path, v.serial "
968
             "from versions v, nodes n "
969
             "where v.serial = (select max(serial) "
970
             "from versions "
971
             "where uuid = ?) "
972
             "and n.node = v.node")
973
        self.execute(q, (uuid,))
974
        return self.fetchone()