Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ 5576e6dd

History | View | Annotate | Download (35.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from dbworker import DBWorker
39

    
40
from pithos.backends.filter import parse_filters
41

    
42

    
43
ROOTNODE = 0
44

    
45
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
46
 CLUSTER) = range(11)
47

    
48
(MATCH_PREFIX, MATCH_EXACT) = range(2)
49

    
50
inf = float('inf')
51

    
52

    
53
def strnextling(prefix):
54
    """Return the first unicode string
55
       greater than but not starting with given prefix.
56
       strnextling('hello') -> 'hellp'
57
    """
58
    if not prefix:
59
        ## all strings start with the null string,
60
        ## therefore we have to approximate strnextling('')
61
        ## with the last unicode character supported by python
62
        ## 0x10ffff for wide (32-bit unicode) python builds
63
        ## 0x00ffff for narrow (16-bit unicode) python builds
64
        ## We will not autodetect. 0xffff is safe enough.
65
        return unichr(0xffff)
66
    s = prefix[:-1]
67
    c = ord(prefix[-1])
68
    if c >= 0xffff:
69
        raise RuntimeError
70
    s += unichr(c + 1)
71
    return s
72

    
73

    
74
def strprevling(prefix):
75
    """Return an approximation of the last unicode string
76
       less than but not starting with given prefix.
77
       strprevling(u'hello') -> u'helln\\xffff'
78
    """
79
    if not prefix:
80
        ## There is no prevling for the null string
81
        return prefix
82
    s = prefix[:-1]
83
    c = ord(prefix[-1])
84
    if c > 0:
85
        s += unichr(c - 1) + unichr(0xffff)
86
    return s
87

    
88

    
89
_propnames = {
90
    'serial': 0,
91
    'node': 1,
92
    'hash': 2,
93
    'size': 3,
94
    'type': 4,
95
    'source': 5,
96
    'mtime': 6,
97
    'muser': 7,
98
    'uuid': 8,
99
    'checksum': 9,
100
    'cluster': 10
101
}
102

    
103

    
104
class Node(DBWorker):
105
    """Nodes store path organization and have multiple versions.
106
       Versions store object history and have multiple attributes.
107
       Attributes store metadata.
108
    """
109

    
110
    # TODO: Provide an interface for included and excluded clusters.
111

    
112
    def __init__(self, **params):
113
        DBWorker.__init__(self, **params)
114
        execute = self.execute
115

    
116
        execute(""" pragma foreign_keys = on """)
117

    
118
        execute(""" create table if not exists nodes
119
                          ( node       integer primary key,
120
                            parent     integer default 0,
121
                            path       text    not null default '',
122
                            latest_version     integer,
123
                            foreign key (parent)
124
                            references nodes(node)
125
                            on update cascade
126
                            on delete cascade ) """)
127
        execute(""" create unique index if not exists idx_nodes_path
128
                    on nodes(path) """)
129
        execute(""" create index if not exists idx_nodes_parent
130
                    on nodes(parent) """)
131

    
132
        execute(""" create table if not exists policy
133
                          ( node   integer,
134
                            key    text,
135
                            value  text,
136
                            primary key (node, key)
137
                            foreign key (node)
138
                            references nodes(node)
139
                            on update cascade
140
                            on delete cascade ) """)
141

    
142
        execute(""" create table if not exists statistics
143
                          ( node       integer,
144
                            population integer not null default 0,
145
                            size       integer not null default 0,
146
                            mtime      integer,
147
                            cluster    integer not null default 0,
148
                            primary key (node, cluster)
149
                            foreign key (node)
150
                            references nodes(node)
151
                            on update cascade
152
                            on delete cascade ) """)
153

    
154
        execute(""" create table if not exists versions
155
                          ( serial     integer primary key,
156
                            node       integer,
157
                            hash       text,
158
                            size       integer not null default 0,
159
                            type       text    not null default '',
160
                            source     integer,
161
                            mtime      integer,
162
                            muser      text    not null default '',
163
                            uuid       text    not null default '',
164
                            checksum   text    not null default '',
165
                            cluster    integer not null default 0,
166
                            foreign key (node)
167
                            references nodes(node)
168
                            on update cascade
169
                            on delete cascade ) """)
170
        execute(""" create index if not exists idx_versions_node_mtime
171
                    on versions(node, mtime) """)
172
        execute(""" create index if not exists idx_versions_node_uuid
173
                    on versions(uuid) """)
174

    
175
        execute(""" create table if not exists attributes
176
                          ( serial integer,
177
                            domain text,
178
                            key    text,
179
                            value  text,
180
                            primary key (serial, domain, key)
181
                            foreign key (serial)
182
                            references versions(serial)
183
                            on update cascade
184
                            on delete cascade ) """)
185
        execute(""" create index if not exists idx_attributes_domain
186
                    on attributes(domain) """)
187

    
188
        wrapper = self.wrapper
189
        wrapper.execute()
190
        try:
191
            q = "insert or ignore into nodes(node, parent) values (?, ?)"
192
            execute(q, (ROOTNODE, ROOTNODE))
193
        finally:
194
            wrapper.commit()
195

    
196
    def node_create(self, parent, path):
197
        """Create a new node from the given properties.
198
           Return the node identifier of the new node.
199
        """
200

    
201
        q = ("insert into nodes (parent, path) "
202
             "values (?, ?)")
203
        props = (parent, path)
204
        return self.execute(q, props).lastrowid
205

    
206
    def node_lookup(self, path):
207
        """Lookup the current node of the given path.
208
           Return None if the path is not found.
209
        """
210

    
211
        q = "select node from nodes where path = ?"
212
        self.execute(q, (path,))
213
        r = self.fetchone()
214
        if r is not None:
215
            return r[0]
216
        return None
217

    
218
    def node_lookup_bulk(self, paths):
219
        """Lookup the current nodes for the given paths.
220
           Return () if the path is not found.
221
        """
222

    
223
        placeholders = ','.join('?' for path in paths)
224
        q = "select node from nodes where path in (%s)" % placeholders
225
        self.execute(q, paths)
226
        r = self.fetchall()
227
        if r is not None:
228
            return [row[0] for row in r]
229
        return None
230

    
231
    def node_get_properties(self, node):
232
        """Return the node's (parent, path).
233
           Return None if the node is not found.
234
        """
235

    
236
        q = "select parent, path from nodes where node = ?"
237
        self.execute(q, (node,))
238
        return self.fetchone()
239

    
240
    def node_get_versions(self, node, keys=(), propnames=_propnames):
241
        """Return the properties of all versions at node.
242
           If keys is empty, return all properties in the order
243
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
244
        """
245

    
246
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
247
             "from versions "
248
             "where node = ?")
249
        self.execute(q, (node,))
250
        r = self.fetchall()
251
        if r is None:
252
            return r
253

    
254
        if not keys:
255
            return r
256
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
257

    
258
    def node_count_children(self, node):
259
        """Return node's child count."""
260

    
261
        q = "select count(node) from nodes where parent = ? and node != 0"
262
        self.execute(q, (node,))
263
        r = self.fetchone()
264
        if r is None:
265
            return 0
266
        return r[0]
267

    
268
    def node_purge_children(self, parent, before=inf, cluster=0):
269
        """Delete all versions with the specified
270
           parent and cluster, and return
271
           the hashes, the size and the serials of versions deleted.
272
           Clears out nodes with no remaining versions.
273
        """
274

    
275
        execute = self.execute
276
        q = ("select count(serial), sum(size) from versions "
277
             "where node in (select node "
278
             "from nodes "
279
             "where parent = ?) "
280
             "and cluster = ? "
281
             "and mtime <= ?")
282
        args = (parent, cluster, before)
283
        execute(q, args)
284
        nr, size = self.fetchone()
285
        if not nr:
286
            return (), 0, ()
287
        mtime = time()
288
        self.statistics_update(parent, -nr, -size, mtime, cluster)
289
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
290

    
291
        q = ("select hash, serial from versions "
292
             "where node in (select node "
293
             "from nodes "
294
             "where parent = ?) "
295
             "and cluster = ? "
296
             "and mtime <= ?")
297
        execute(q, args)
298
        hashes = []
299
        serials = []
300
        for r in self.fetchall():
301
            hashes += [r[0]]
302
            serials += [r[1]]
303
        
304
        q = ("delete from versions "
305
             "where node in (select node "
306
             "from nodes "
307
             "where parent = ?) "
308
             "and cluster = ? "
309
             "and mtime <= ?")
310
        execute(q, args)
311
        q = ("delete from nodes "
312
             "where node in (select node from nodes n "
313
             "where (select count(serial) "
314
             "from versions "
315
             "where node = n.node) = 0 "
316
             "and parent = ?)")
317
        execute(q, (parent,))
318
        return hashes, size, serials
319

    
320
    def node_purge(self, node, before=inf, cluster=0):
321
        """Delete all versions with the specified
322
           node and cluster, and return
323
           the hashes, the size and the serials of versions deleted.
324
           Clears out the node if it has no remaining versions.
325
        """
326

    
327
        execute = self.execute
328
        q = ("select count(serial), sum(size) from versions "
329
             "where node = ? "
330
             "and cluster = ? "
331
             "and mtime <= ?")
332
        args = (node, cluster, before)
333
        execute(q, args)
334
        nr, size = self.fetchone()
335
        if not nr:
336
            return (), 0, ()
337
        mtime = time()
338
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
339

    
340
        q = ("select hash, serial from versions "
341
             "where node = ? "
342
             "and cluster = ? "
343
             "and mtime <= ?")
344
        execute(q, args)
345
        hashes = []
346
        serials = []
347
        for r in self.fetchall():
348
            hashes += [r[0]]
349
            serials += [r[1]]
350

    
351
        q = ("delete from versions "
352
             "where node = ? "
353
             "and cluster = ? "
354
             "and mtime <= ?")
355
        execute(q, args)
356
        q = ("delete from nodes "
357
             "where node in (select node from nodes n "
358
             "where (select count(serial) "
359
             "from versions "
360
             "where node = n.node) = 0 "
361
             "and node = ?)")
362
        execute(q, (node,))
363
        return hashes, size, serials
364

    
365
    def node_remove(self, node):
366
        """Remove the node specified.
367
           Return false if the node has children or is not found.
368
        """
369

    
370
        if self.node_count_children(node):
371
            return False
372

    
373
        mtime = time()
374
        q = ("select count(serial), sum(size), cluster "
375
             "from versions "
376
             "where node = ? "
377
             "group by cluster")
378
        self.execute(q, (node,))
379
        for population, size, cluster in self.fetchall():
380
            self.statistics_update_ancestors(
381
                node, -population, -size, mtime, cluster)
382

    
383
        q = "delete from nodes where node = ?"
384
        self.execute(q, (node,))
385
        return True
386

    
387
    def policy_get(self, node):
388
        q = "select key, value from policy where node = ?"
389
        self.execute(q, (node,))
390
        return dict(self.fetchall())
391

    
392
    def policy_set(self, node, policy):
393
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
394
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
395

    
396
    def statistics_get(self, node, cluster=0):
397
        """Return population, total size and last mtime
398
           for all versions under node that belong to the cluster.
399
        """
400

    
401
        q = ("select population, size, mtime from statistics "
402
             "where node = ? and cluster = ?")
403
        self.execute(q, (node, cluster))
404
        return self.fetchone()
405

    
406
    def statistics_update(self, node, population, size, mtime, cluster=0):
407
        """Update the statistics of the given node.
408
           Statistics keep track the population, total
409
           size of objects and mtime in the node's namespace.
410
           May be zero or positive or negative numbers.
411
        """
412

    
413
        qs = ("select population, size from statistics "
414
              "where node = ? and cluster = ?")
415
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
416
              "values (?, ?, ?, ?, ?)")
417
        self.execute(qs, (node, cluster))
418
        r = self.fetchone()
419
        if r is None:
420
            prepopulation, presize = (0, 0)
421
        else:
422
            prepopulation, presize = r
423
        population += prepopulation
424
        population = max(population, 0)
425
        size += presize
426
        self.execute(qu, (node, population, size, mtime, cluster))
427

    
428
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
429
        """Update the statistics of the given node's parent.
430
           Then recursively update all parents up to the root.
431
           Population is not recursive.
432
        """
433

    
434
        while True:
435
            if node == 0:
436
                break
437
            props = self.node_get_properties(node)
438
            if props is None:
439
                break
440
            parent, path = props
441
            self.statistics_update(parent, population, size, mtime, cluster)
442
            node = parent
443
            population = 0  # Population isn't recursive
444

    
445
    def statistics_latest(self, node, before=inf, except_cluster=0):
446
        """Return population, total size and last mtime
447
           for all latest versions under node that
448
           do not belong to the cluster.
449
        """
450

    
451
        execute = self.execute
452
        fetchone = self.fetchone
453

    
454
        # The node.
455
        props = self.node_get_properties(node)
456
        if props is None:
457
            return None
458
        parent, path = props
459

    
460
        # The latest version.
461
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
462
             "from versions v "
463
             "where serial = %s "
464
             "and cluster != ?")
465
        subq, args = self._construct_latest_version_subquery(
466
            node=node, before=before)
467
        execute(q % subq, args + [except_cluster])
468
        props = fetchone()
469
        if props is None:
470
            return None
471
        mtime = props[MTIME]
472

    
473
        # First level, just under node (get population).
474
        q = ("select count(serial), sum(size), max(mtime) "
475
             "from versions v "
476
             "where serial = %s "
477
             "and cluster != ? "
478
             "and node in (select node "
479
             "from nodes "
480
             "where parent = ?)")
481
        subq, args = self._construct_latest_version_subquery(
482
            node=None, before=before)
483
        execute(q % subq, args + [except_cluster, node])
484
        r = fetchone()
485
        if r is None:
486
            return None
487
        count = r[0]
488
        mtime = max(mtime, r[2])
489
        if count == 0:
490
            return (0, 0, mtime)
491

    
492
        # All children (get size and mtime).
493
        # This is why the full path is stored.
494
        q = ("select count(serial), sum(size), max(mtime) "
495
             "from versions v "
496
             "where serial = %s "
497
             "and cluster != ? "
498
             "and node in (select node "
499
             "from nodes "
500
             "where path like ? escape '\\')")
501
        subq, args = self._construct_latest_version_subquery(
502
            node=None, before=before)
503
        execute(
504
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
505
        r = fetchone()
506
        if r is None:
507
            return None
508
        size = r[1] - props[SIZE]
509
        mtime = max(mtime, r[2])
510
        return (count, size, mtime)
511

    
512
    def nodes_set_latest_version(self, node, serial):
513
        q = ("update nodes set latest_version = ? where node = ?")
514
        props = (serial, node)
515
        self.execute(q, props)
516

    
517
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
518
        """Create a new version from the given properties.
519
           Return the (serial, mtime) of the new version.
520
        """
521

    
522
        q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
523
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
524
        mtime = time()
525
        props = (node, hash, size, type, source, mtime, muser,
526
                 uuid, checksum, cluster)
527
        serial = self.execute(q, props).lastrowid
528
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
529

    
530
        self.nodes_set_latest_version(node, serial)
531

    
532
        return serial, mtime
533

    
534
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
535
        """Lookup the current version of the given node.
536
           Return a list with its properties:
537
           (serial, node, hash, size, type, source, mtime,
538
            muser, uuid, checksum, cluster)
539
           or None if the current version is not found in the given cluster.
540
        """
541

    
542
        q = ("select %s "
543
             "from versions v "
544
             "where serial = %s "
545
             "and cluster = ?")
546
        subq, args = self._construct_latest_version_subquery(
547
            node=node, before=before)
548
        if not all_props:
549
            q = q % ("serial", subq)
550
        else:
551
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
552

    
553
        self.execute(q, args + [cluster])
554
        props = self.fetchone()
555
        if props is not None:
556
            return props
557
        return None
558

    
559
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
560
        """Lookup the current versions of the given nodes.
561
           Return a list with their properties:
562
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
563
        """
564

    
565
        if not nodes:
566
            return ()
567
        q = ("select %s "
568
             "from versions "
569
             "where serial in %s "
570
             "and cluster = ? %s")
571
        subq, args = self._construct_latest_versions_subquery(
572
            nodes=nodes, before=before)
573
        if not all_props:
574
            q = q % ("serial", subq, '')
575
        else:
576
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq, 'order by node')
577

    
578
        args += [cluster]
579
        self.execute(q, args)
580
        return self.fetchall()
581

    
582
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
583
        """Return a sequence of values for the properties of
584
           the version specified by serial and the keys, in the order given.
585
           If keys is empty, return all properties in the order
586
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
587
        """
588

    
589
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
590
             "from versions "
591
             "where serial = ?")
592
        self.execute(q, (serial,))
593
        r = self.fetchone()
594
        if r is None:
595
            return r
596

    
597
        if not keys:
598
            return r
599
        return [r[propnames[k]] for k in keys if k in propnames]
600

    
601
    def version_put_property(self, serial, key, value):
602
        """Set value for the property of version specified by key."""
603

    
604
        if key not in _propnames:
605
            return
606
        q = "update versions set %s = ? where serial = ?" % key
607
        self.execute(q, (value, serial))
608

    
609
    def version_recluster(self, serial, cluster):
610
        """Move the version into another cluster."""
611

    
612
        props = self.version_get_properties(serial)
613
        if not props:
614
            return
615
        node = props[NODE]
616
        size = props[SIZE]
617
        oldcluster = props[CLUSTER]
618
        if cluster == oldcluster:
619
            return
620

    
621
        mtime = time()
622
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
623
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
624

    
625
        q = "update versions set cluster = ? where serial = ?"
626
        self.execute(q, (cluster, serial))
627

    
628
    def version_remove(self, serial):
629
        """Remove the serial specified."""
630

    
631
        props = self.version_get_properties(serial)
632
        if not props:
633
            return
634
        node = props[NODE]
635
        hash = props[HASH]
636
        size = props[SIZE]
637
        cluster = props[CLUSTER]
638

    
639
        mtime = time()
640
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
641

    
642
        q = "delete from versions where serial = ?"
643
        self.execute(q, (serial,))
644

    
645
        props = self.version_lookup(node, cluster=cluster, all_props=False)
646
        if props:
647
            self.nodes_set_latest_version(node, props[0])
648
        return hash, size
649

    
650
    def attribute_get(self, serial, domain, keys=()):
651
        """Return a list of (key, value) pairs of the version specified by serial.
652
           If keys is empty, return all attributes.
653
           Othwerise, return only those specified.
654
        """
655

    
656
        execute = self.execute
657
        if keys:
658
            marks = ','.join('?' for k in keys)
659
            q = ("select key, value from attributes "
660
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
661
            execute(q, keys + (serial, domain))
662
        else:
663
            q = "select key, value from attributes where serial = ? and domain = ?"
664
            execute(q, (serial, domain))
665
        return self.fetchall()
666

    
667
    def attribute_set(self, serial, domain, items):
668
        """Set the attributes of the version specified by serial.
669
           Receive attributes as an iterable of (key, value) pairs.
670
        """
671

    
672
        q = ("insert or replace into attributes (serial, domain, key, value) "
673
             "values (?, ?, ?, ?)")
674
        self.executemany(q, ((serial, domain, k, v) for k, v in items))
675

    
676
    def attribute_del(self, serial, domain, keys=()):
677
        """Delete attributes of the version specified by serial.
678
           If keys is empty, delete all attributes.
679
           Otherwise delete those specified.
680
        """
681

    
682
        if keys:
683
            q = "delete from attributes where serial = ? and domain = ? and key = ?"
684
            self.executemany(q, ((serial, domain, key) for key in keys))
685
        else:
686
            q = "delete from attributes where serial = ? and domain = ?"
687
            self.execute(q, (serial, domain))
688

    
689
    def attribute_copy(self, source, dest):
690
        q = ("insert or replace into attributes "
691
             "select ?, domain, key, value from attributes "
692
             "where serial = ?")
693
        self.execute(q, (dest, source))
694

    
695
    def _construct_filters(self, domain, filterq):
696
        if not domain or not filterq:
697
            return None, None
698

    
699
        subqlist = []
700
        append = subqlist.append
701
        included, excluded, opers = parse_filters(filterq)
702
        args = []
703

    
704
        if included:
705
            subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
706
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
707
            subq += ")"
708
            args += [domain]
709
            args += included
710
            append(subq)
711

    
712
        if excluded:
713
            subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
714
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
715
            subq += ")"
716
            args += [domain]
717
            args += excluded
718
            append(subq)
719

    
720
        if opers:
721
            for k, o, v in opers:
722
                subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
723
                subq += "key = ? and value %s ?" % (o,)
724
                subq += ")"
725
                args += [domain, k, v]
726
                append(subq)
727

    
728
        if not subqlist:
729
            return None, None
730

    
731
        subq = ' and ' + ' and '.join(subqlist)
732

    
733
        return subq, args
734

    
735
    def _construct_paths(self, pathq):
736
        if not pathq:
737
            return None, None
738

    
739
        subqlist = []
740
        args = []
741
        for path, match in pathq:
742
            if match == MATCH_PREFIX:
743
                subqlist.append("n.path like ? escape '\\'")
744
                args.append(self.escape_like(path) + '%')
745
            elif match == MATCH_EXACT:
746
                subqlist.append("n.path = ?")
747
                args.append(path)
748

    
749
        subq = ' and (' + ' or '.join(subqlist) + ')'
750
        args = tuple(args)
751

    
752
        return subq, args
753

    
754
    def _construct_size(self, sizeq):
755
        if not sizeq or len(sizeq) != 2:
756
            return None, None
757

    
758
        subq = ''
759
        args = []
760
        if sizeq[0]:
761
            subq += " and v.size >= ?"
762
            args += [sizeq[0]]
763
        if sizeq[1]:
764
            subq += " and v.size < ?"
765
            args += [sizeq[1]]
766

    
767
        return subq, args
768

    
769
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
770
        if before == inf:
771
            q = ("n.latest_version ")
772
            args = []
773
        else:
774
            q = ("(select max(serial) "
775
                 "from versions "
776
                 "where node = v.node and mtime < ?) ")
777
            args = [before]
778
        return q, args
779

    
780
    def _construct_latest_version_subquery(self, node=None, before=inf):
781
        where_cond = "node = v.node"
782
        args = []
783
        if node:
784
            where_cond = "node = ? "
785
            args = [node]
786

    
787
        if before == inf:
788
            q = ("(select latest_version "
789
                 "from nodes "
790
                 "where %s) ")
791
        else:
792
            q = ("(select max(serial) "
793
                 "from versions "
794
                 "where %s and mtime < ?) ")
795
            args += [before]
796
        return q % where_cond, args
797

    
798
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
799
        where_cond = ""
800
        args = []
801
        if nodes:
802
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
803
            args = nodes
804

    
805
        if before == inf:
806
            q = ("(select latest_version "
807
                 "from nodes "
808
                 "where %s ) ")
809
        else:
810
            q = ("(select max(serial) "
811
                 "from versions "
812
                 "where %s and mtime < ? group by node) ")
813
            args += [before]
814
        return q % where_cond, args
815

    
816
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=None):
817
        """Return a list with all keys pairs defined
818
           for all latest versions under parent that
819
           do not belong to the cluster.
820
        """
821

    
822
        pathq = pathq or []
823

    
824
        # TODO: Use another table to store before=inf results.
825
        q = ("select distinct a.key "
826
             "from attributes a, versions v, nodes n "
827
             "where v.serial = %s "
828
             "and v.cluster != ? "
829
             "and v.node in (select node "
830
             "from nodes "
831
             "where parent = ?) "
832
             "and a.serial = v.serial "
833
             "and a.domain = ? "
834
             "and n.node = v.node")
835
        subq, subargs = self._construct_latest_version_subquery(
836
            node=None, before=before)
837
        args = subargs + [except_cluster, parent, domain]
838
        q = q % subq
839
        subq, subargs = self._construct_paths(pathq)
840
        if subq is not None:
841
            q += subq
842
            args += subargs
843
        self.execute(q, args)
844
        return [r[0] for r in self.fetchall()]
845

    
846
    def latest_version_list(self, parent, prefix='', delimiter=None,
847
                            start='', limit=10000, before=inf,
848
                            except_cluster=0, pathq=[], domain=None,
849
                            filterq=[], sizeq=None, all_props=False):
850
        """Return a (list of (path, serial) tuples, list of common prefixes)
851
           for the current versions of the paths with the given parent,
852
           matching the following criteria.
853

854
           The property tuple for a version is returned if all
855
           of these conditions are true:
856

857
                a. parent matches
858

859
                b. path > start
860

861
                c. path starts with prefix (and paths in pathq)
862

863
                d. version is the max up to before
864

865
                e. version is not in cluster
866

867
                f. the path does not have the delimiter occuring
868
                   after the prefix, or ends with the delimiter
869

870
                g. serial matches the attribute filter query.
871

872
                   A filter query is a comma-separated list of
873
                   terms in one of these three forms:
874

875
                   key
876
                       an attribute with this key must exist
877

878
                   !key
879
                       an attribute with this key must not exist
880

881
                   key ?op value
882
                       the attribute with this key satisfies the value
883
                       where ?op is one of =, != <=, >=, <, >.
884

885
                h. the size is in the range set by sizeq
886

887
           The list of common prefixes includes the prefixes
888
           matching up to the first delimiter after prefix,
889
           and are reported only once, as "virtual directories".
890
           The delimiter is included in the prefixes.
891

892
           If arguments are None, then the corresponding matching rule
893
           will always match.
894

895
           Limit applies to the first list of tuples returned.
896

897
           If all_props is True, return all properties after path, not just serial.
898
        """
899

    
900
        execute = self.execute
901

    
902
        if not start or start < prefix:
903
            start = strprevling(prefix)
904
        nextling = strnextling(prefix)
905

    
906
        q = ("select distinct n.path, %s "
907
             "from versions v, nodes n "
908
             "where v.serial = %s "
909
             "and v.cluster != ? "
910
             "and v.node in (select node "
911
             "from nodes "
912
             "where parent = ?) "
913
             "and n.node = v.node "
914
             "and n.path > ? and n.path < ?")
915
        subq, args = self._construct_versions_nodes_latest_version_subquery(
916
            before)
917
        if not all_props:
918
            q = q % ("v.serial", subq)
919
        else:
920
            q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
921
        args += [except_cluster, parent, start, nextling]
922
        start_index = len(args) - 2
923

    
924
        subq, subargs = self._construct_paths(pathq)
925
        if subq is not None:
926
            q += subq
927
            args += subargs
928
        subq, subargs = self._construct_size(sizeq)
929
        if subq is not None:
930
            q += subq
931
            args += subargs
932
        subq, subargs = self._construct_filters(domain, filterq)
933
        if subq is not None:
934
            q += subq
935
            args += subargs
936
        else:
937
            q = q.replace("attributes a, ", "")
938
            q = q.replace("and a.serial = v.serial ", "")
939
        q += " order by n.path"
940

    
941
        if not delimiter:
942
            q += " limit ?"
943
            args.append(limit)
944
            execute(q, args)
945
            return self.fetchall(), ()
946

    
947
        pfz = len(prefix)
948
        dz = len(delimiter)
949
        count = 0
950
        fetchone = self.fetchone
951
        prefixes = []
952
        pappend = prefixes.append
953
        matches = []
954
        mappend = matches.append
955

    
956
        execute(q, args)
957
        while True:
958
            props = fetchone()
959
            if props is None:
960
                break
961
            path = props[0]
962
            serial = props[1]
963
            idx = path.find(delimiter, pfz)
964

    
965
            if idx < 0:
966
                mappend(props)
967
                count += 1
968
                if count >= limit:
969
                    break
970
                continue
971

    
972
            if idx + dz == len(path):
973
                mappend(props)
974
                count += 1
975
                continue  # Get one more, in case there is a path.
976
            pf = path[:idx + dz]
977
            pappend(pf)
978
            if count >= limit:
979
                break
980

    
981
            args[start_index] = strnextling(pf)  # New start.
982
            execute(q, args)
983

    
984
        return matches, prefixes
985

    
986
    def latest_uuid(self, uuid, cluster):
987
        """Return the latest version of the given uuid and cluster.
988

989
        Return a (path, serial) tuple.
990
        If cluster is None, all clusters are considered.
991

992
        """
993
        if cluster is not None:
994
            cluster_where = "and cluster = ?"
995
            args = (uuid, int(cluster))
996
        else:
997
            cluster_where = ""
998
            args = (uuid,)
999

    
1000
        q = ("select n.path, v.serial "
1001
             "from versions v, nodes n "
1002
             "where v.serial = (select max(serial) "
1003
             "from versions "
1004
             "where uuid = ? %s) "
1005
             "and n.node = v.node") % cluster_where
1006
        self.execute(q, args)
1007
        return self.fetchone()
1008

    
1009
    def domain_object_list(self, domain, cluster=None):
1010
        """Return a list of (path, property list, attribute dictionary)
1011
           for the objects in the specific domain and cluster.
1012
        """
1013

    
1014
        q = ("select n.path, v.serial, v.node, v.hash, "
1015
             "v.size, v.type, v.source, v.mtime, v.muser, "
1016
             "v.uuid, v.checksum, v.cluster, a.key, a.value "
1017
             "from nodes n, versions v, attributes a "
1018
             "where n.node = v.node and "
1019
             "n.latest_version = v.serial and "
1020
             "v.serial = a.serial and "
1021
             "a.domain = ? ")
1022
        args = [domain]
1023
        if cluster != None:
1024
            q += "and v.cluster = ?"
1025
            args += [cluster]
1026

    
1027
        self.execute(q, args)
1028
        rows = self.fetchall()
1029

    
1030
        group_by = itemgetter(slice(12))
1031
        rows.sort(key = group_by)
1032
        groups = groupby(rows, group_by)
1033
        return [(k[0], k[1:], dict([i[12:] for i in data])) \
1034
            for (k, data) in groups]