Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ cc412b78

History | View | Annotate | Download (36.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from dbworker import DBWorker
39

    
40
from pithos.backends.filter import parse_filters
41

    
42

    
43
ROOTNODE = 0
44

    
45
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
46
 CLUSTER) = range(11)
47

    
48
(MATCH_PREFIX, MATCH_EXACT) = range(2)
49

    
50
inf = float('inf')
51

    
52

    
53
def strnextling(prefix):
54
    """Return the first unicode string
55
       greater than but not starting with given prefix.
56
       strnextling('hello') -> 'hellp'
57
    """
58
    if not prefix:
59
        ## all strings start with the null string,
60
        ## therefore we have to approximate strnextling('')
61
        ## with the last unicode character supported by python
62
        ## 0x10ffff for wide (32-bit unicode) python builds
63
        ## 0x00ffff for narrow (16-bit unicode) python builds
64
        ## We will not autodetect. 0xffff is safe enough.
65
        return unichr(0xffff)
66
    s = prefix[:-1]
67
    c = ord(prefix[-1])
68
    if c >= 0xffff:
69
        raise RuntimeError
70
    s += unichr(c + 1)
71
    return s
72

    
73

    
74
def strprevling(prefix):
75
    """Return an approximation of the last unicode string
76
       less than but not starting with given prefix.
77
       strprevling(u'hello') -> u'helln\\xffff'
78
    """
79
    if not prefix:
80
        ## There is no prevling for the null string
81
        return prefix
82
    s = prefix[:-1]
83
    c = ord(prefix[-1])
84
    if c > 0:
85
        s += unichr(c - 1) + unichr(0xffff)
86
    return s
87

    
88

    
89
_propnames = {
90
    'serial': 0,
91
    'node': 1,
92
    'hash': 2,
93
    'size': 3,
94
    'type': 4,
95
    'source': 5,
96
    'mtime': 6,
97
    'muser': 7,
98
    'uuid': 8,
99
    'checksum': 9,
100
    'cluster': 10
101
}
102

    
103

    
104
class Node(DBWorker):
105
    """Nodes store path organization and have multiple versions.
106
       Versions store object history and have multiple attributes.
107
       Attributes store metadata.
108
    """
109

    
110
    # TODO: Provide an interface for included and excluded clusters.
111

    
112
    def __init__(self, **params):
113
        DBWorker.__init__(self, **params)
114
        execute = self.execute
115

    
116
        execute(""" pragma foreign_keys = on """)
117

    
118
        execute(""" create table if not exists nodes
119
                          ( node       integer primary key,
120
                            parent     integer default 0,
121
                            path       text    not null default '',
122
                            latest_version     integer,
123
                            foreign key (parent)
124
                            references nodes(node)
125
                            on update cascade
126
                            on delete cascade ) """)
127
        execute(""" create unique index if not exists idx_nodes_path
128
                    on nodes(path) """)
129
        execute(""" create index if not exists idx_nodes_parent
130
                    on nodes(parent) """)
131

    
132
        execute(""" create table if not exists policy
133
                          ( node   integer,
134
                            key    text,
135
                            value  text,
136
                            primary key (node, key)
137
                            foreign key (node)
138
                            references nodes(node)
139
                            on update cascade
140
                            on delete cascade ) """)
141

    
142
        execute(""" create table if not exists statistics
143
                          ( node       integer,
144
                            population integer not null default 0,
145
                            size       integer not null default 0,
146
                            mtime      integer,
147
                            cluster    integer not null default 0,
148
                            primary key (node, cluster)
149
                            foreign key (node)
150
                            references nodes(node)
151
                            on update cascade
152
                            on delete cascade ) """)
153

    
154
        execute(""" create table if not exists versions
155
                          ( serial     integer primary key,
156
                            node       integer,
157
                            hash       text,
158
                            size       integer not null default 0,
159
                            type       text    not null default '',
160
                            source     integer,
161
                            mtime      integer,
162
                            muser      text    not null default '',
163
                            uuid       text    not null default '',
164
                            checksum   text    not null default '',
165
                            cluster    integer not null default 0,
166
                            foreign key (node)
167
                            references nodes(node)
168
                            on update cascade
169
                            on delete cascade ) """)
170
        execute(""" create index if not exists idx_versions_node_mtime
171
                    on versions(node, mtime) """)
172
        execute(""" create index if not exists idx_versions_node_uuid
173
                    on versions(uuid) """)
174

    
175
        execute(""" create table if not exists attributes
176
                          ( serial integer,
177
                            domain text,
178
                            key    text,
179
                            value  text,
180
                            primary key (serial, domain, key)
181
                            foreign key (serial)
182
                            references versions(serial)
183
                            on update cascade
184
                            on delete cascade ) """)
185
        execute(""" create index if not exists idx_attributes_domain
186
                    on attributes(domain) """)
187

    
188
        wrapper = self.wrapper
189
        wrapper.execute()
190
        try:
191
            q = "insert or ignore into nodes(node, parent) values (?, ?)"
192
            execute(q, (ROOTNODE, ROOTNODE))
193
        finally:
194
            wrapper.commit()
195

    
196
    def node_create(self, parent, path):
197
        """Create a new node from the given properties.
198
           Return the node identifier of the new node.
199
        """
200

    
201
        q = ("insert into nodes (parent, path) "
202
             "values (?, ?)")
203
        props = (parent, path)
204
        return self.execute(q, props).lastrowid
205

    
206
    def node_lookup(self, path):
207
        """Lookup the current node of the given path.
208
           Return None if the path is not found.
209
        """
210

    
211
        q = "select node from nodes where path = ?"
212
        self.execute(q, (path,))
213
        r = self.fetchone()
214
        if r is not None:
215
            return r[0]
216
        return None
217

    
218
    def node_lookup_bulk(self, paths):
219
        """Lookup the current nodes for the given paths.
220
           Return () if the path is not found.
221
        """
222

    
223
        placeholders = ','.join('?' for path in paths)
224
        q = "select node from nodes where path in (%s)" % placeholders
225
        self.execute(q, paths)
226
        r = self.fetchall()
227
        if r is not None:
228
            return [row[0] for row in r]
229
        return None
230

    
231
    def node_get_properties(self, node):
232
        """Return the node's (parent, path).
233
           Return None if the node is not found.
234
        """
235

    
236
        q = "select parent, path from nodes where node = ?"
237
        self.execute(q, (node,))
238
        return self.fetchone()
239

    
240
    def node_get_versions(self, node, keys=(), propnames=_propnames):
241
        """Return the properties of all versions at node.
242
           If keys is empty, return all properties in the order
243
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
244
        """
245

    
246
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
247
             "from versions "
248
             "where node = ?")
249
        self.execute(q, (node,))
250
        r = self.fetchall()
251
        if r is None:
252
            return r
253

    
254
        if not keys:
255
            return r
256
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
257

    
258
    def node_count_children(self, node):
259
        """Return node's child count."""
260

    
261
        q = "select count(node) from nodes where parent = ? and node != 0"
262
        self.execute(q, (node,))
263
        r = self.fetchone()
264
        if r is None:
265
            return 0
266
        return r[0]
267

    
268
    def node_purge_children(self, parent, before=inf, cluster=0):
269
        """Delete all versions with the specified
270
           parent and cluster, and return
271
           the hashes, the size and the serials of versions deleted.
272
           Clears out nodes with no remaining versions.
273
        """
274

    
275
        execute = self.execute
276
        q = ("select count(serial), sum(size) from versions "
277
             "where node in (select node "
278
             "from nodes "
279
             "where parent = ?) "
280
             "and cluster = ? "
281
             "and mtime <= ?")
282
        args = (parent, cluster, before)
283
        execute(q, args)
284
        nr, size = self.fetchone()
285
        if not nr:
286
            return (), 0, ()
287
        mtime = time()
288
        self.statistics_update(parent, -nr, -size, mtime, cluster)
289
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
290

    
291
        q = ("select hash, serial from versions "
292
             "where node in (select node "
293
             "from nodes "
294
             "where parent = ?) "
295
             "and cluster = ? "
296
             "and mtime <= ?")
297
        execute(q, args)
298
        hashes = []
299
        serials = []
300
        for r in self.fetchall():
301
            hashes += [r[0]]
302
            serials += [r[1]]
303
        
304
        q = ("delete from versions "
305
             "where node in (select node "
306
             "from nodes "
307
             "where parent = ?) "
308
             "and cluster = ? "
309
             "and mtime <= ?")
310
        execute(q, args)
311
        q = ("delete from nodes "
312
             "where node in (select node from nodes n "
313
             "where (select count(serial) "
314
             "from versions "
315
             "where node = n.node) = 0 "
316
             "and parent = ?)")
317
        execute(q, (parent,))
318
        return hashes, size, serials
319

    
320
    def node_purge(self, node, before=inf, cluster=0):
321
        """Delete all versions with the specified
322
           node and cluster, and return
323
           the hashes, the size and the serials of versions deleted.
324
           Clears out the node if it has no remaining versions.
325
        """
326

    
327
        execute = self.execute
328
        q = ("select count(serial), sum(size) from versions "
329
             "where node = ? "
330
             "and cluster = ? "
331
             "and mtime <= ?")
332
        args = (node, cluster, before)
333
        execute(q, args)
334
        nr, size = self.fetchone()
335
        if not nr:
336
            return (), 0, ()
337
        mtime = time()
338
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
339

    
340
        q = ("select hash, serial from versions "
341
             "where node = ? "
342
             "and cluster = ? "
343
             "and mtime <= ?")
344
        execute(q, args)
345
        hashes = []
346
        serials = []
347
        for r in self.fetchall():
348
            hashes += [r[0]]
349
            serials += [r[1]]
350

    
351
        q = ("delete from versions "
352
             "where node = ? "
353
             "and cluster = ? "
354
             "and mtime <= ?")
355
        execute(q, args)
356
        q = ("delete from nodes "
357
             "where node in (select node from nodes n "
358
             "where (select count(serial) "
359
             "from versions "
360
             "where node = n.node) = 0 "
361
             "and node = ?)")
362
        execute(q, (node,))
363
        return hashes, size, serials
364

    
365
    def node_remove(self, node):
366
        """Remove the node specified.
367
           Return false if the node has children or is not found.
368
        """
369

    
370
        if self.node_count_children(node):
371
            return False
372

    
373
        mtime = time()
374
        q = ("select count(serial), sum(size), cluster "
375
             "from versions "
376
             "where node = ? "
377
             "group by cluster")
378
        self.execute(q, (node,))
379
        for population, size, cluster in self.fetchall():
380
            self.statistics_update_ancestors(
381
                node, -population, -size, mtime, cluster)
382

    
383
        q = "delete from nodes where node = ?"
384
        self.execute(q, (node,))
385
        return True
386

    
387
    def node_accounts(self, accounts=()):
388
        q = ("select path, node from nodes where node != 0 and parent == 0 ")
389
        args = []
390
        if accounts:
391
            placeholders = ','.join('?' for a in accounts)
392
            q += ("and path in (%s)" % placeholders)
393
            args += accounts
394
        return self.execute(q, args).fetchall()
395

    
396
    def node_account_usage(self, account_node, cluster):
397
        select_children = ("select node from nodes where parent = ?")
398
        select_descedents = ("select node from nodes "
399
                             "where parent in (%s) "
400
                             "or node in (%s) ") % ((select_children,)*2)
401
        args = [account_node]*2
402
        q = ("select sum(v.size) from versions v, nodes n "
403
             "where v.node = n.node "
404
             "and n.node in (%s) "
405
             "and v.cluster = ?") % select_descedents
406
        args += [cluster]
407

    
408
        self.execute(q, args)
409
        return self.fetchone()[0]
410

    
411
    def policy_get(self, node):
412
        q = "select key, value from policy where node = ?"
413
        self.execute(q, (node,))
414
        return dict(self.fetchall())
415

    
416
    def policy_set(self, node, policy):
417
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
418
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
419

    
420
    def statistics_get(self, node, cluster=0):
421
        """Return population, total size and last mtime
422
           for all versions under node that belong to the cluster.
423
        """
424

    
425
        q = ("select population, size, mtime from statistics "
426
             "where node = ? and cluster = ?")
427
        self.execute(q, (node, cluster))
428
        return self.fetchone()
429

    
430
    def statistics_update(self, node, population, size, mtime, cluster=0):
431
        """Update the statistics of the given node.
432
           Statistics keep track the population, total
433
           size of objects and mtime in the node's namespace.
434
           May be zero or positive or negative numbers.
435
        """
436

    
437
        qs = ("select population, size from statistics "
438
              "where node = ? and cluster = ?")
439
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
440
              "values (?, ?, ?, ?, ?)")
441
        self.execute(qs, (node, cluster))
442
        r = self.fetchone()
443
        if r is None:
444
            prepopulation, presize = (0, 0)
445
        else:
446
            prepopulation, presize = r
447
        population += prepopulation
448
        population = max(population, 0)
449
        size += presize
450
        self.execute(qu, (node, population, size, mtime, cluster))
451

    
452
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
453
        """Update the statistics of the given node's parent.
454
           Then recursively update all parents up to the root.
455
           Population is not recursive.
456
        """
457

    
458
        while True:
459
            if node == 0:
460
                break
461
            props = self.node_get_properties(node)
462
            if props is None:
463
                break
464
            parent, path = props
465
            self.statistics_update(parent, population, size, mtime, cluster)
466
            node = parent
467
            population = 0  # Population isn't recursive
468

    
469
    def statistics_latest(self, node, before=inf, except_cluster=0):
470
        """Return population, total size and last mtime
471
           for all latest versions under node that
472
           do not belong to the cluster.
473
        """
474

    
475
        execute = self.execute
476
        fetchone = self.fetchone
477

    
478
        # The node.
479
        props = self.node_get_properties(node)
480
        if props is None:
481
            return None
482
        parent, path = props
483

    
484
        # The latest version.
485
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
486
             "from versions v "
487
             "where serial = %s "
488
             "and cluster != ?")
489
        subq, args = self._construct_latest_version_subquery(
490
            node=node, before=before)
491
        execute(q % subq, args + [except_cluster])
492
        props = fetchone()
493
        if props is None:
494
            return None
495
        mtime = props[MTIME]
496

    
497
        # First level, just under node (get population).
498
        q = ("select count(serial), sum(size), max(mtime) "
499
             "from versions v "
500
             "where serial = %s "
501
             "and cluster != ? "
502
             "and node in (select node "
503
             "from nodes "
504
             "where parent = ?)")
505
        subq, args = self._construct_latest_version_subquery(
506
            node=None, before=before)
507
        execute(q % subq, args + [except_cluster, node])
508
        r = fetchone()
509
        if r is None:
510
            return None
511
        count = r[0]
512
        mtime = max(mtime, r[2])
513
        if count == 0:
514
            return (0, 0, mtime)
515

    
516
        # All children (get size and mtime).
517
        # This is why the full path is stored.
518
        q = ("select count(serial), sum(size), max(mtime) "
519
             "from versions v "
520
             "where serial = %s "
521
             "and cluster != ? "
522
             "and node in (select node "
523
             "from nodes "
524
             "where path like ? escape '\\')")
525
        subq, args = self._construct_latest_version_subquery(
526
            node=None, before=before)
527
        execute(
528
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
529
        r = fetchone()
530
        if r is None:
531
            return None
532
        size = r[1] - props[SIZE]
533
        mtime = max(mtime, r[2])
534
        return (count, size, mtime)
535

    
536
    def nodes_set_latest_version(self, node, serial):
537
        q = ("update nodes set latest_version = ? where node = ?")
538
        props = (serial, node)
539
        self.execute(q, props)
540

    
541
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
542
        """Create a new version from the given properties.
543
           Return the (serial, mtime) of the new version.
544
        """
545

    
546
        q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
547
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
548
        mtime = time()
549
        props = (node, hash, size, type, source, mtime, muser,
550
                 uuid, checksum, cluster)
551
        serial = self.execute(q, props).lastrowid
552
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
553

    
554
        self.nodes_set_latest_version(node, serial)
555

    
556
        return serial, mtime
557

    
558
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
559
        """Lookup the current version of the given node.
560
           Return a list with its properties:
561
           (serial, node, hash, size, type, source, mtime,
562
            muser, uuid, checksum, cluster)
563
           or None if the current version is not found in the given cluster.
564
        """
565

    
566
        q = ("select %s "
567
             "from versions v "
568
             "where serial = %s "
569
             "and cluster = ?")
570
        subq, args = self._construct_latest_version_subquery(
571
            node=node, before=before)
572
        if not all_props:
573
            q = q % ("serial", subq)
574
        else:
575
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
576

    
577
        self.execute(q, args + [cluster])
578
        props = self.fetchone()
579
        if props is not None:
580
            return props
581
        return None
582

    
583
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
584
        """Lookup the current versions of the given nodes.
585
           Return a list with their properties:
586
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
587
        """
588

    
589
        if not nodes:
590
            return ()
591
        q = ("select %s "
592
             "from versions "
593
             "where serial in %s "
594
             "and cluster = ? %s")
595
        subq, args = self._construct_latest_versions_subquery(
596
            nodes=nodes, before=before)
597
        if not all_props:
598
            q = q % ("serial", subq, '')
599
        else:
600
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq, 'order by node')
601

    
602
        args += [cluster]
603
        self.execute(q, args)
604
        return self.fetchall()
605

    
606
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
607
        """Return a sequence of values for the properties of
608
           the version specified by serial and the keys, in the order given.
609
           If keys is empty, return all properties in the order
610
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
611
        """
612

    
613
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
614
             "from versions "
615
             "where serial = ?")
616
        self.execute(q, (serial,))
617
        r = self.fetchone()
618
        if r is None:
619
            return r
620

    
621
        if not keys:
622
            return r
623
        return [r[propnames[k]] for k in keys if k in propnames]
624

    
625
    def version_put_property(self, serial, key, value):
626
        """Set value for the property of version specified by key."""
627

    
628
        if key not in _propnames:
629
            return
630
        q = "update versions set %s = ? where serial = ?" % key
631
        self.execute(q, (value, serial))
632

    
633
    def version_recluster(self, serial, cluster):
634
        """Move the version into another cluster."""
635

    
636
        props = self.version_get_properties(serial)
637
        if not props:
638
            return
639
        node = props[NODE]
640
        size = props[SIZE]
641
        oldcluster = props[CLUSTER]
642
        if cluster == oldcluster:
643
            return
644

    
645
        mtime = time()
646
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
647
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
648

    
649
        q = "update versions set cluster = ? where serial = ?"
650
        self.execute(q, (cluster, serial))
651

    
652
    def version_remove(self, serial):
653
        """Remove the serial specified."""
654

    
655
        props = self.version_get_properties(serial)
656
        if not props:
657
            return
658
        node = props[NODE]
659
        hash = props[HASH]
660
        size = props[SIZE]
661
        cluster = props[CLUSTER]
662

    
663
        mtime = time()
664
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
665

    
666
        q = "delete from versions where serial = ?"
667
        self.execute(q, (serial,))
668

    
669
        props = self.version_lookup(node, cluster=cluster, all_props=False)
670
        if props:
671
            self.nodes_set_latest_version(node, props[0])
672
        return hash, size
673

    
674
    def attribute_get(self, serial, domain, keys=()):
675
        """Return a list of (key, value) pairs of the version specified by serial.
676
           If keys is empty, return all attributes.
677
           Othwerise, return only those specified.
678
        """
679

    
680
        execute = self.execute
681
        if keys:
682
            marks = ','.join('?' for k in keys)
683
            q = ("select key, value from attributes "
684
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
685
            execute(q, keys + (serial, domain))
686
        else:
687
            q = "select key, value from attributes where serial = ? and domain = ?"
688
            execute(q, (serial, domain))
689
        return self.fetchall()
690

    
691
    def attribute_set(self, serial, domain, items):
692
        """Set the attributes of the version specified by serial.
693
           Receive attributes as an iterable of (key, value) pairs.
694
        """
695

    
696
        q = ("insert or replace into attributes (serial, domain, key, value) "
697
             "values (?, ?, ?, ?)")
698
        self.executemany(q, ((serial, domain, k, v) for k, v in items))
699

    
700
    def attribute_del(self, serial, domain, keys=()):
701
        """Delete attributes of the version specified by serial.
702
           If keys is empty, delete all attributes.
703
           Otherwise delete those specified.
704
        """
705

    
706
        if keys:
707
            q = "delete from attributes where serial = ? and domain = ? and key = ?"
708
            self.executemany(q, ((serial, domain, key) for key in keys))
709
        else:
710
            q = "delete from attributes where serial = ? and domain = ?"
711
            self.execute(q, (serial, domain))
712

    
713
    def attribute_copy(self, source, dest):
714
        q = ("insert or replace into attributes "
715
             "select ?, domain, key, value from attributes "
716
             "where serial = ?")
717
        self.execute(q, (dest, source))
718

    
719
    def _construct_filters(self, domain, filterq):
720
        if not domain or not filterq:
721
            return None, None
722

    
723
        subqlist = []
724
        append = subqlist.append
725
        included, excluded, opers = parse_filters(filterq)
726
        args = []
727

    
728
        if included:
729
            subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
730
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
731
            subq += ")"
732
            args += [domain]
733
            args += included
734
            append(subq)
735

    
736
        if excluded:
737
            subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
738
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
739
            subq += ")"
740
            args += [domain]
741
            args += excluded
742
            append(subq)
743

    
744
        if opers:
745
            for k, o, v in opers:
746
                subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
747
                subq += "key = ? and value %s ?" % (o,)
748
                subq += ")"
749
                args += [domain, k, v]
750
                append(subq)
751

    
752
        if not subqlist:
753
            return None, None
754

    
755
        subq = ' and ' + ' and '.join(subqlist)
756

    
757
        return subq, args
758

    
759
    def _construct_paths(self, pathq):
760
        if not pathq:
761
            return None, None
762

    
763
        subqlist = []
764
        args = []
765
        for path, match in pathq:
766
            if match == MATCH_PREFIX:
767
                subqlist.append("n.path like ? escape '\\'")
768
                args.append(self.escape_like(path) + '%')
769
            elif match == MATCH_EXACT:
770
                subqlist.append("n.path = ?")
771
                args.append(path)
772

    
773
        subq = ' and (' + ' or '.join(subqlist) + ')'
774
        args = tuple(args)
775

    
776
        return subq, args
777

    
778
    def _construct_size(self, sizeq):
779
        if not sizeq or len(sizeq) != 2:
780
            return None, None
781

    
782
        subq = ''
783
        args = []
784
        if sizeq[0]:
785
            subq += " and v.size >= ?"
786
            args += [sizeq[0]]
787
        if sizeq[1]:
788
            subq += " and v.size < ?"
789
            args += [sizeq[1]]
790

    
791
        return subq, args
792

    
793
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
794
        if before == inf:
795
            q = ("n.latest_version ")
796
            args = []
797
        else:
798
            q = ("(select max(serial) "
799
                 "from versions "
800
                 "where node = v.node and mtime < ?) ")
801
            args = [before]
802
        return q, args
803

    
804
    def _construct_latest_version_subquery(self, node=None, before=inf):
805
        where_cond = "node = v.node"
806
        args = []
807
        if node:
808
            where_cond = "node = ? "
809
            args = [node]
810

    
811
        if before == inf:
812
            q = ("(select latest_version "
813
                 "from nodes "
814
                 "where %s) ")
815
        else:
816
            q = ("(select max(serial) "
817
                 "from versions "
818
                 "where %s and mtime < ?) ")
819
            args += [before]
820
        return q % where_cond, args
821

    
822
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
823
        where_cond = ""
824
        args = []
825
        if nodes:
826
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
827
            args = nodes
828

    
829
        if before == inf:
830
            q = ("(select latest_version "
831
                 "from nodes "
832
                 "where %s ) ")
833
        else:
834
            q = ("(select max(serial) "
835
                 "from versions "
836
                 "where %s and mtime < ? group by node) ")
837
            args += [before]
838
        return q % where_cond, args
839

    
840
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=None):
841
        """Return a list with all keys pairs defined
842
           for all latest versions under parent that
843
           do not belong to the cluster.
844
        """
845

    
846
        pathq = pathq or []
847

    
848
        # TODO: Use another table to store before=inf results.
849
        q = ("select distinct a.key "
850
             "from attributes a, versions v, nodes n "
851
             "where v.serial = %s "
852
             "and v.cluster != ? "
853
             "and v.node in (select node "
854
             "from nodes "
855
             "where parent = ?) "
856
             "and a.serial = v.serial "
857
             "and a.domain = ? "
858
             "and n.node = v.node")
859
        subq, subargs = self._construct_latest_version_subquery(
860
            node=None, before=before)
861
        args = subargs + [except_cluster, parent, domain]
862
        q = q % subq
863
        subq, subargs = self._construct_paths(pathq)
864
        if subq is not None:
865
            q += subq
866
            args += subargs
867
        self.execute(q, args)
868
        return [r[0] for r in self.fetchall()]
869

    
870
    def latest_version_list(self, parent, prefix='', delimiter=None,
871
                            start='', limit=10000, before=inf,
872
                            except_cluster=0, pathq=[], domain=None,
873
                            filterq=[], sizeq=None, all_props=False):
874
        """Return a (list of (path, serial) tuples, list of common prefixes)
875
           for the current versions of the paths with the given parent,
876
           matching the following criteria.
877

878
           The property tuple for a version is returned if all
879
           of these conditions are true:
880

881
                a. parent matches
882

883
                b. path > start
884

885
                c. path starts with prefix (and paths in pathq)
886

887
                d. version is the max up to before
888

889
                e. version is not in cluster
890

891
                f. the path does not have the delimiter occuring
892
                   after the prefix, or ends with the delimiter
893

894
                g. serial matches the attribute filter query.
895

896
                   A filter query is a comma-separated list of
897
                   terms in one of these three forms:
898

899
                   key
900
                       an attribute with this key must exist
901

902
                   !key
903
                       an attribute with this key must not exist
904

905
                   key ?op value
906
                       the attribute with this key satisfies the value
907
                       where ?op is one of =, != <=, >=, <, >.
908

909
                h. the size is in the range set by sizeq
910

911
           The list of common prefixes includes the prefixes
912
           matching up to the first delimiter after prefix,
913
           and are reported only once, as "virtual directories".
914
           The delimiter is included in the prefixes.
915

916
           If arguments are None, then the corresponding matching rule
917
           will always match.
918

919
           Limit applies to the first list of tuples returned.
920

921
           If all_props is True, return all properties after path, not just serial.
922
        """
923

    
924
        execute = self.execute
925

    
926
        if not start or start < prefix:
927
            start = strprevling(prefix)
928
        nextling = strnextling(prefix)
929

    
930
        q = ("select distinct n.path, %s "
931
             "from versions v, nodes n "
932
             "where v.serial = %s "
933
             "and v.cluster != ? "
934
             "and v.node in (select node "
935
             "from nodes "
936
             "where parent = ?) "
937
             "and n.node = v.node "
938
             "and n.path > ? and n.path < ?")
939
        subq, args = self._construct_versions_nodes_latest_version_subquery(
940
            before)
941
        if not all_props:
942
            q = q % ("v.serial", subq)
943
        else:
944
            q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
945
        args += [except_cluster, parent, start, nextling]
946
        start_index = len(args) - 2
947

    
948
        subq, subargs = self._construct_paths(pathq)
949
        if subq is not None:
950
            q += subq
951
            args += subargs
952
        subq, subargs = self._construct_size(sizeq)
953
        if subq is not None:
954
            q += subq
955
            args += subargs
956
        subq, subargs = self._construct_filters(domain, filterq)
957
        if subq is not None:
958
            q += subq
959
            args += subargs
960
        else:
961
            q = q.replace("attributes a, ", "")
962
            q = q.replace("and a.serial = v.serial ", "")
963
        q += " order by n.path"
964

    
965
        if not delimiter:
966
            q += " limit ?"
967
            args.append(limit)
968
            execute(q, args)
969
            return self.fetchall(), ()
970

    
971
        pfz = len(prefix)
972
        dz = len(delimiter)
973
        count = 0
974
        fetchone = self.fetchone
975
        prefixes = []
976
        pappend = prefixes.append
977
        matches = []
978
        mappend = matches.append
979

    
980
        execute(q, args)
981
        while True:
982
            props = fetchone()
983
            if props is None:
984
                break
985
            path = props[0]
986
            serial = props[1]
987
            idx = path.find(delimiter, pfz)
988

    
989
            if idx < 0:
990
                mappend(props)
991
                count += 1
992
                if count >= limit:
993
                    break
994
                continue
995

    
996
            if idx + dz == len(path):
997
                mappend(props)
998
                count += 1
999
                continue  # Get one more, in case there is a path.
1000
            pf = path[:idx + dz]
1001
            pappend(pf)
1002
            if count >= limit:
1003
                break
1004

    
1005
            args[start_index] = strnextling(pf)  # New start.
1006
            execute(q, args)
1007

    
1008
        return matches, prefixes
1009

    
1010
    def latest_uuid(self, uuid, cluster):
1011
        """Return the latest version of the given uuid and cluster.
1012

1013
        Return a (path, serial) tuple.
1014
        If cluster is None, all clusters are considered.
1015

1016
        """
1017
        if cluster is not None:
1018
            cluster_where = "and cluster = ?"
1019
            args = (uuid, int(cluster))
1020
        else:
1021
            cluster_where = ""
1022
            args = (uuid,)
1023

    
1024
        q = ("select n.path, v.serial "
1025
             "from versions v, nodes n "
1026
             "where v.serial = (select max(serial) "
1027
             "from versions "
1028
             "where uuid = ? %s) "
1029
             "and n.node = v.node") % cluster_where
1030
        self.execute(q, args)
1031
        return self.fetchone()
1032

    
1033
    def domain_object_list(self, domain, cluster=None):
1034
        """Return a list of (path, property list, attribute dictionary)
1035
           for the objects in the specific domain and cluster.
1036
        """
1037

    
1038
        q = ("select n.path, v.serial, v.node, v.hash, "
1039
             "v.size, v.type, v.source, v.mtime, v.muser, "
1040
             "v.uuid, v.checksum, v.cluster, a.key, a.value "
1041
             "from nodes n, versions v, attributes a "
1042
             "where n.node = v.node and "
1043
             "n.latest_version = v.serial and "
1044
             "v.serial = a.serial and "
1045
             "a.domain = ? ")
1046
        args = [domain]
1047
        if cluster != None:
1048
            q += "and v.cluster = ?"
1049
            args += [cluster]
1050

    
1051
        self.execute(q, args)
1052
        rows = self.fetchall()
1053

    
1054
        group_by = itemgetter(slice(12))
1055
        rows.sort(key = group_by)
1056
        groups = groupby(rows, group_by)
1057
        return [(k[0], k[1:], dict([i[12:] for i in data])) \
1058
            for (k, data) in groups]