Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ ae6199c5

History | View | Annotate | Download (36.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from dbworker import DBWorker
39

    
40
from pithos.backends.filter import parse_filters
41

    
42

    
43
ROOTNODE = 0
44

    
45
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
46
 CLUSTER) = range(11)
47

    
48
(MATCH_PREFIX, MATCH_EXACT) = range(2)
49

    
50
inf = float('inf')
51

    
52

    
53
def strnextling(prefix):
54
    """Return the first unicode string
55
       greater than but not starting with given prefix.
56
       strnextling('hello') -> 'hellp'
57
    """
58
    if not prefix:
59
        ## all strings start with the null string,
60
        ## therefore we have to approximate strnextling('')
61
        ## with the last unicode character supported by python
62
        ## 0x10ffff for wide (32-bit unicode) python builds
63
        ## 0x00ffff for narrow (16-bit unicode) python builds
64
        ## We will not autodetect. 0xffff is safe enough.
65
        return unichr(0xffff)
66
    s = prefix[:-1]
67
    c = ord(prefix[-1])
68
    if c >= 0xffff:
69
        raise RuntimeError
70
    s += unichr(c + 1)
71
    return s
72

    
73

    
74
def strprevling(prefix):
75
    """Return an approximation of the last unicode string
76
       less than but not starting with given prefix.
77
       strprevling(u'hello') -> u'helln\\xffff'
78
    """
79
    if not prefix:
80
        ## There is no prevling for the null string
81
        return prefix
82
    s = prefix[:-1]
83
    c = ord(prefix[-1])
84
    if c > 0:
85
        s += unichr(c - 1) + unichr(0xffff)
86
    return s
87

    
88

    
89
_propnames = {
90
    'serial': 0,
91
    'node': 1,
92
    'hash': 2,
93
    'size': 3,
94
    'type': 4,
95
    'source': 5,
96
    'mtime': 6,
97
    'muser': 7,
98
    'uuid': 8,
99
    'checksum': 9,
100
    'cluster': 10
101
}
102

    
103

    
104
class Node(DBWorker):
105
    """Nodes store path organization and have multiple versions.
106
       Versions store object history and have multiple attributes.
107
       Attributes store metadata.
108
    """
109

    
110
    # TODO: Provide an interface for included and excluded clusters.
111

    
112
    def __init__(self, **params):
113
        DBWorker.__init__(self, **params)
114
        execute = self.execute
115

    
116
        execute(""" pragma foreign_keys = on """)
117

    
118
        execute(""" create table if not exists nodes
119
                          ( node       integer primary key,
120
                            parent     integer default 0,
121
                            path       text    not null default '',
122
                            latest_version     integer,
123
                            foreign key (parent)
124
                            references nodes(node)
125
                            on update cascade
126
                            on delete cascade ) """)
127
        execute(""" create unique index if not exists idx_nodes_path
128
                    on nodes(path) """)
129
        execute(""" create index if not exists idx_nodes_parent
130
                    on nodes(parent) """)
131

    
132
        execute(""" create table if not exists policy
133
                          ( node   integer,
134
                            key    text,
135
                            value  text,
136
                            primary key (node, key)
137
                            foreign key (node)
138
                            references nodes(node)
139
                            on update cascade
140
                            on delete cascade ) """)
141

    
142
        execute(""" create table if not exists statistics
143
                          ( node       integer,
144
                            population integer not null default 0,
145
                            size       integer not null default 0,
146
                            mtime      integer,
147
                            cluster    integer not null default 0,
148
                            primary key (node, cluster)
149
                            foreign key (node)
150
                            references nodes(node)
151
                            on update cascade
152
                            on delete cascade ) """)
153

    
154
        execute(""" create table if not exists versions
155
                          ( serial     integer primary key,
156
                            node       integer,
157
                            hash       text,
158
                            size       integer not null default 0,
159
                            type       text    not null default '',
160
                            source     integer,
161
                            mtime      integer,
162
                            muser      text    not null default '',
163
                            uuid       text    not null default '',
164
                            checksum   text    not null default '',
165
                            cluster    integer not null default 0,
166
                            foreign key (node)
167
                            references nodes(node)
168
                            on update cascade
169
                            on delete cascade ) """)
170
        execute(""" create index if not exists idx_versions_node_mtime
171
                    on versions(node, mtime) """)
172
        execute(""" create index if not exists idx_versions_node_uuid
173
                    on versions(uuid) """)
174

    
175
        execute(""" create table if not exists attributes
176
                          ( serial integer,
177
                            domain text,
178
                            key    text,
179
                            value  text,
180
                            primary key (serial, domain, key)
181
                            foreign key (serial)
182
                            references versions(serial)
183
                            on update cascade
184
                            on delete cascade ) """)
185
        execute(""" create index if not exists idx_attributes_domain
186
                    on attributes(domain) """)
187

    
188
        wrapper = self.wrapper
189
        wrapper.execute()
190
        try:
191
            q = "insert or ignore into nodes(node, parent) values (?, ?)"
192
            execute(q, (ROOTNODE, ROOTNODE))
193
        finally:
194
            wrapper.commit()
195

    
196
    def node_create(self, parent, path):
197
        """Create a new node from the given properties.
198
           Return the node identifier of the new node.
199
        """
200

    
201
        q = ("insert into nodes (parent, path) "
202
             "values (?, ?)")
203
        props = (parent, path)
204
        return self.execute(q, props).lastrowid
205

    
206
    def node_lookup(self, path):
207
        """Lookup the current node of the given path.
208
           Return None if the path is not found.
209
        """
210

    
211
        q = "select node from nodes where path = ?"
212
        self.execute(q, (path,))
213
        r = self.fetchone()
214
        if r is not None:
215
            return r[0]
216
        return None
217

    
218
    def node_lookup_bulk(self, paths):
219
        """Lookup the current nodes for the given paths.
220
           Return () if the path is not found.
221
        """
222

    
223
        placeholders = ','.join('?' for path in paths)
224
        q = "select node from nodes where path in (%s)" % placeholders
225
        self.execute(q, paths)
226
        r = self.fetchall()
227
        if r is not None:
228
            return [row[0] for row in r]
229
        return None
230

    
231
    def node_get_properties(self, node):
232
        """Return the node's (parent, path).
233
           Return None if the node is not found.
234
        """
235

    
236
        q = "select parent, path from nodes where node = ?"
237
        self.execute(q, (node,))
238
        return self.fetchone()
239

    
240
    def node_get_versions(self, node, keys=(), propnames=_propnames):
241
        """Return the properties of all versions at node.
242
           If keys is empty, return all properties in the order
243
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
244
        """
245

    
246
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
247
             "from versions "
248
             "where node = ?")
249
        self.execute(q, (node,))
250
        r = self.fetchall()
251
        if r is None:
252
            return r
253

    
254
        if not keys:
255
            return r
256
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
257

    
258
    def node_count_children(self, node):
259
        """Return node's child count."""
260

    
261
        q = "select count(node) from nodes where parent = ? and node != 0"
262
        self.execute(q, (node,))
263
        r = self.fetchone()
264
        if r is None:
265
            return 0
266
        return r[0]
267

    
268
    def node_purge_children(self, parent, before=inf, cluster=0):
269
        """Delete all versions with the specified
270
           parent and cluster, and return
271
           the hashes, the size and the serials of versions deleted.
272
           Clears out nodes with no remaining versions.
273
        """
274

    
275
        execute = self.execute
276
        q = ("select count(serial), sum(size) from versions "
277
             "where node in (select node "
278
             "from nodes "
279
             "where parent = ?) "
280
             "and cluster = ? "
281
             "and mtime <= ?")
282
        args = (parent, cluster, before)
283
        execute(q, args)
284
        nr, size = self.fetchone()
285
        if not nr:
286
            return (), 0, ()
287
        mtime = time()
288
        self.statistics_update(parent, -nr, -size, mtime, cluster)
289
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
290

    
291
        q = ("select hash, serial from versions "
292
             "where node in (select node "
293
             "from nodes "
294
             "where parent = ?) "
295
             "and cluster = ? "
296
             "and mtime <= ?")
297
        execute(q, args)
298
        hashes = []
299
        serials = []
300
        for r in self.fetchall():
301
            hashes += [r[0]]
302
            serials += [r[1]]
303
        
304
        q = ("delete from versions "
305
             "where node in (select node "
306
             "from nodes "
307
             "where parent = ?) "
308
             "and cluster = ? "
309
             "and mtime <= ?")
310
        execute(q, args)
311
        q = ("delete from nodes "
312
             "where node in (select node from nodes n "
313
             "where (select count(serial) "
314
             "from versions "
315
             "where node = n.node) = 0 "
316
             "and parent = ?)")
317
        execute(q, (parent,))
318
        return hashes, size, serials
319

    
320
    def node_purge(self, node, before=inf, cluster=0):
321
        """Delete all versions with the specified
322
           node and cluster, and return
323
           the hashes, the size and the serials of versions deleted.
324
           Clears out the node if it has no remaining versions.
325
        """
326

    
327
        execute = self.execute
328
        q = ("select count(serial), sum(size) from versions "
329
             "where node = ? "
330
             "and cluster = ? "
331
             "and mtime <= ?")
332
        args = (node, cluster, before)
333
        execute(q, args)
334
        nr, size = self.fetchone()
335
        if not nr:
336
            return (), 0, ()
337
        mtime = time()
338
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
339

    
340
        q = ("select hash, serial from versions "
341
             "where node = ? "
342
             "and cluster = ? "
343
             "and mtime <= ?")
344
        execute(q, args)
345
        hashes = []
346
        serials = []
347
        for r in self.fetchall():
348
            hashes += [r[0]]
349
            serials += [r[1]]
350

    
351
        q = ("delete from versions "
352
             "where node = ? "
353
             "and cluster = ? "
354
             "and mtime <= ?")
355
        execute(q, args)
356
        q = ("delete from nodes "
357
             "where node in (select node from nodes n "
358
             "where (select count(serial) "
359
             "from versions "
360
             "where node = n.node) = 0 "
361
             "and node = ?)")
362
        execute(q, (node,))
363
        return hashes, size, serials
364

    
365
    def node_remove(self, node):
366
        """Remove the node specified.
367
           Return false if the node has children or is not found.
368
        """
369

    
370
        if self.node_count_children(node):
371
            return False
372

    
373
        mtime = time()
374
        q = ("select count(serial), sum(size), cluster "
375
             "from versions "
376
             "where node = ? "
377
             "group by cluster")
378
        self.execute(q, (node,))
379
        for population, size, cluster in self.fetchall():
380
            self.statistics_update_ancestors(
381
                node, -population, -size, mtime, cluster)
382

    
383
        q = "delete from nodes where node = ?"
384
        self.execute(q, (node,))
385
        return True
386

    
387
    def node_accounts(self, accounts=()):
388
        q = ("select path, node from nodes where node != 0 and parent = 0 ")
389
        args = []
390
        if accounts:
391
            placeholders = ','.join('?' for a in accounts)
392
            q += ("and path in (%s)" % placeholders)
393
            args += accounts
394
        return self.execute(q, args).fetchall()
395

    
396
    def node_account_quotas(self):
397
        q = ("select n.path, p.value from nodes n, policy p "
398
             "where n.node != 0 and n.parent = 0 "
399
             "and n.node = p.node and p.key = 'quota'"
400
        )
401
        return dict(self.execute(q).fetchall())
402

    
403
    def node_account_usage(self, account_node, cluster):
404
        select_children = ("select node from nodes where parent = ?")
405
        select_descedents = ("select node from nodes "
406
                             "where parent in (%s) "
407
                             "or node in (%s) ") % ((select_children,)*2)
408
        args = [account_node]*2
409
        q = ("select sum(v.size) from versions v, nodes n "
410
             "where v.node = n.node "
411
             "and n.node in (%s) "
412
             "and v.cluster = ?") % select_descedents
413
        args += [cluster]
414

    
415
        self.execute(q, args)
416
        return self.fetchone()[0]
417

    
418
    def policy_get(self, node):
419
        q = "select key, value from policy where node = ?"
420
        self.execute(q, (node,))
421
        return dict(self.fetchall())
422

    
423
    def policy_set(self, node, policy):
424
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
425
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
426

    
427
    def statistics_get(self, node, cluster=0):
428
        """Return population, total size and last mtime
429
           for all versions under node that belong to the cluster.
430
        """
431

    
432
        q = ("select population, size, mtime from statistics "
433
             "where node = ? and cluster = ?")
434
        self.execute(q, (node, cluster))
435
        return self.fetchone()
436

    
437
    def statistics_update(self, node, population, size, mtime, cluster=0):
438
        """Update the statistics of the given node.
439
           Statistics keep track the population, total
440
           size of objects and mtime in the node's namespace.
441
           May be zero or positive or negative numbers.
442
        """
443

    
444
        qs = ("select population, size from statistics "
445
              "where node = ? and cluster = ?")
446
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
447
              "values (?, ?, ?, ?, ?)")
448
        self.execute(qs, (node, cluster))
449
        r = self.fetchone()
450
        if r is None:
451
            prepopulation, presize = (0, 0)
452
        else:
453
            prepopulation, presize = r
454
        population += prepopulation
455
        population = max(population, 0)
456
        size += presize
457
        self.execute(qu, (node, population, size, mtime, cluster))
458

    
459
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
460
        """Update the statistics of the given node's parent.
461
           Then recursively update all parents up to the root.
462
           Population is not recursive.
463
        """
464

    
465
        while True:
466
            if node == 0:
467
                break
468
            props = self.node_get_properties(node)
469
            if props is None:
470
                break
471
            parent, path = props
472
            self.statistics_update(parent, population, size, mtime, cluster)
473
            node = parent
474
            population = 0  # Population isn't recursive
475

    
476
    def statistics_latest(self, node, before=inf, except_cluster=0):
477
        """Return population, total size and last mtime
478
           for all latest versions under node that
479
           do not belong to the cluster.
480
        """
481

    
482
        execute = self.execute
483
        fetchone = self.fetchone
484

    
485
        # The node.
486
        props = self.node_get_properties(node)
487
        if props is None:
488
            return None
489
        parent, path = props
490

    
491
        # The latest version.
492
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
493
             "from versions v "
494
             "where serial = %s "
495
             "and cluster != ?")
496
        subq, args = self._construct_latest_version_subquery(
497
            node=node, before=before)
498
        execute(q % subq, args + [except_cluster])
499
        props = fetchone()
500
        if props is None:
501
            return None
502
        mtime = props[MTIME]
503

    
504
        # First level, just under node (get population).
505
        q = ("select count(serial), sum(size), max(mtime) "
506
             "from versions v "
507
             "where serial = %s "
508
             "and cluster != ? "
509
             "and node in (select node "
510
             "from nodes "
511
             "where parent = ?)")
512
        subq, args = self._construct_latest_version_subquery(
513
            node=None, before=before)
514
        execute(q % subq, args + [except_cluster, node])
515
        r = fetchone()
516
        if r is None:
517
            return None
518
        count = r[0]
519
        mtime = max(mtime, r[2])
520
        if count == 0:
521
            return (0, 0, mtime)
522

    
523
        # All children (get size and mtime).
524
        # This is why the full path is stored.
525
        q = ("select count(serial), sum(size), max(mtime) "
526
             "from versions v "
527
             "where serial = %s "
528
             "and cluster != ? "
529
             "and node in (select node "
530
             "from nodes "
531
             "where path like ? escape '\\')")
532
        subq, args = self._construct_latest_version_subquery(
533
            node=None, before=before)
534
        execute(
535
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
536
        r = fetchone()
537
        if r is None:
538
            return None
539
        size = r[1] - props[SIZE]
540
        mtime = max(mtime, r[2])
541
        return (count, size, mtime)
542

    
543
    def nodes_set_latest_version(self, node, serial):
544
        q = ("update nodes set latest_version = ? where node = ?")
545
        props = (serial, node)
546
        self.execute(q, props)
547

    
548
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
549
        """Create a new version from the given properties.
550
           Return the (serial, mtime) of the new version.
551
        """
552

    
553
        q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
554
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
555
        mtime = time()
556
        props = (node, hash, size, type, source, mtime, muser,
557
                 uuid, checksum, cluster)
558
        serial = self.execute(q, props).lastrowid
559
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
560

    
561
        self.nodes_set_latest_version(node, serial)
562

    
563
        return serial, mtime
564

    
565
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
566
        """Lookup the current version of the given node.
567
           Return a list with its properties:
568
           (serial, node, hash, size, type, source, mtime,
569
            muser, uuid, checksum, cluster)
570
           or None if the current version is not found in the given cluster.
571
        """
572

    
573
        q = ("select %s "
574
             "from versions v "
575
             "where serial = %s "
576
             "and cluster = ?")
577
        subq, args = self._construct_latest_version_subquery(
578
            node=node, before=before)
579
        if not all_props:
580
            q = q % ("serial", subq)
581
        else:
582
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
583

    
584
        self.execute(q, args + [cluster])
585
        props = self.fetchone()
586
        if props is not None:
587
            return props
588
        return None
589

    
590
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
591
        """Lookup the current versions of the given nodes.
592
           Return a list with their properties:
593
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
594
        """
595

    
596
        if not nodes:
597
            return ()
598
        q = ("select %s "
599
             "from versions "
600
             "where serial in %s "
601
             "and cluster = ? %s")
602
        subq, args = self._construct_latest_versions_subquery(
603
            nodes=nodes, before=before)
604
        if not all_props:
605
            q = q % ("serial", subq, '')
606
        else:
607
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq, 'order by node')
608

    
609
        args += [cluster]
610
        self.execute(q, args)
611
        return self.fetchall()
612

    
613
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
614
        """Return a sequence of values for the properties of
615
           the version specified by serial and the keys, in the order given.
616
           If keys is empty, return all properties in the order
617
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
618
        """
619

    
620
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
621
             "from versions "
622
             "where serial = ?")
623
        self.execute(q, (serial,))
624
        r = self.fetchone()
625
        if r is None:
626
            return r
627

    
628
        if not keys:
629
            return r
630
        return [r[propnames[k]] for k in keys if k in propnames]
631

    
632
    def version_put_property(self, serial, key, value):
633
        """Set value for the property of version specified by key."""
634

    
635
        if key not in _propnames:
636
            return
637
        q = "update versions set %s = ? where serial = ?" % key
638
        self.execute(q, (value, serial))
639

    
640
    def version_recluster(self, serial, cluster):
641
        """Move the version into another cluster."""
642

    
643
        props = self.version_get_properties(serial)
644
        if not props:
645
            return
646
        node = props[NODE]
647
        size = props[SIZE]
648
        oldcluster = props[CLUSTER]
649
        if cluster == oldcluster:
650
            return
651

    
652
        mtime = time()
653
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
654
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
655

    
656
        q = "update versions set cluster = ? where serial = ?"
657
        self.execute(q, (cluster, serial))
658

    
659
    def version_remove(self, serial):
660
        """Remove the serial specified."""
661

    
662
        props = self.version_get_properties(serial)
663
        if not props:
664
            return
665
        node = props[NODE]
666
        hash = props[HASH]
667
        size = props[SIZE]
668
        cluster = props[CLUSTER]
669

    
670
        mtime = time()
671
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
672

    
673
        q = "delete from versions where serial = ?"
674
        self.execute(q, (serial,))
675

    
676
        props = self.version_lookup(node, cluster=cluster, all_props=False)
677
        if props:
678
            self.nodes_set_latest_version(node, props[0])
679
        return hash, size
680

    
681
    def attribute_get(self, serial, domain, keys=()):
682
        """Return a list of (key, value) pairs of the version specified by serial.
683
           If keys is empty, return all attributes.
684
           Othwerise, return only those specified.
685
        """
686

    
687
        execute = self.execute
688
        if keys:
689
            marks = ','.join('?' for k in keys)
690
            q = ("select key, value from attributes "
691
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
692
            execute(q, keys + (serial, domain))
693
        else:
694
            q = "select key, value from attributes where serial = ? and domain = ?"
695
            execute(q, (serial, domain))
696
        return self.fetchall()
697

    
698
    def attribute_set(self, serial, domain, items):
699
        """Set the attributes of the version specified by serial.
700
           Receive attributes as an iterable of (key, value) pairs.
701
        """
702

    
703
        q = ("insert or replace into attributes (serial, domain, key, value) "
704
             "values (?, ?, ?, ?)")
705
        self.executemany(q, ((serial, domain, k, v) for k, v in items))
706

    
707
    def attribute_del(self, serial, domain, keys=()):
708
        """Delete attributes of the version specified by serial.
709
           If keys is empty, delete all attributes.
710
           Otherwise delete those specified.
711
        """
712

    
713
        if keys:
714
            q = "delete from attributes where serial = ? and domain = ? and key = ?"
715
            self.executemany(q, ((serial, domain, key) for key in keys))
716
        else:
717
            q = "delete from attributes where serial = ? and domain = ?"
718
            self.execute(q, (serial, domain))
719

    
720
    def attribute_copy(self, source, dest):
721
        q = ("insert or replace into attributes "
722
             "select ?, domain, key, value from attributes "
723
             "where serial = ?")
724
        self.execute(q, (dest, source))
725

    
726
    def _construct_filters(self, domain, filterq):
727
        if not domain or not filterq:
728
            return None, None
729

    
730
        subqlist = []
731
        append = subqlist.append
732
        included, excluded, opers = parse_filters(filterq)
733
        args = []
734

    
735
        if included:
736
            subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
737
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
738
            subq += ")"
739
            args += [domain]
740
            args += included
741
            append(subq)
742

    
743
        if excluded:
744
            subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
745
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
746
            subq += ")"
747
            args += [domain]
748
            args += excluded
749
            append(subq)
750

    
751
        if opers:
752
            for k, o, v in opers:
753
                subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
754
                subq += "key = ? and value %s ?" % (o,)
755
                subq += ")"
756
                args += [domain, k, v]
757
                append(subq)
758

    
759
        if not subqlist:
760
            return None, None
761

    
762
        subq = ' and ' + ' and '.join(subqlist)
763

    
764
        return subq, args
765

    
766
    def _construct_paths(self, pathq):
767
        if not pathq:
768
            return None, None
769

    
770
        subqlist = []
771
        args = []
772
        for path, match in pathq:
773
            if match == MATCH_PREFIX:
774
                subqlist.append("n.path like ? escape '\\'")
775
                args.append(self.escape_like(path) + '%')
776
            elif match == MATCH_EXACT:
777
                subqlist.append("n.path = ?")
778
                args.append(path)
779

    
780
        subq = ' and (' + ' or '.join(subqlist) + ')'
781
        args = tuple(args)
782

    
783
        return subq, args
784

    
785
    def _construct_size(self, sizeq):
786
        if not sizeq or len(sizeq) != 2:
787
            return None, None
788

    
789
        subq = ''
790
        args = []
791
        if sizeq[0]:
792
            subq += " and v.size >= ?"
793
            args += [sizeq[0]]
794
        if sizeq[1]:
795
            subq += " and v.size < ?"
796
            args += [sizeq[1]]
797

    
798
        return subq, args
799

    
800
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
801
        if before == inf:
802
            q = ("n.latest_version ")
803
            args = []
804
        else:
805
            q = ("(select max(serial) "
806
                 "from versions "
807
                 "where node = v.node and mtime < ?) ")
808
            args = [before]
809
        return q, args
810

    
811
    def _construct_latest_version_subquery(self, node=None, before=inf):
812
        where_cond = "node = v.node"
813
        args = []
814
        if node:
815
            where_cond = "node = ? "
816
            args = [node]
817

    
818
        if before == inf:
819
            q = ("(select latest_version "
820
                 "from nodes "
821
                 "where %s) ")
822
        else:
823
            q = ("(select max(serial) "
824
                 "from versions "
825
                 "where %s and mtime < ?) ")
826
            args += [before]
827
        return q % where_cond, args
828

    
829
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
830
        where_cond = ""
831
        args = []
832
        if nodes:
833
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
834
            args = nodes
835

    
836
        if before == inf:
837
            q = ("(select latest_version "
838
                 "from nodes "
839
                 "where %s ) ")
840
        else:
841
            q = ("(select max(serial) "
842
                 "from versions "
843
                 "where %s and mtime < ? group by node) ")
844
            args += [before]
845
        return q % where_cond, args
846

    
847
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=None):
848
        """Return a list with all keys pairs defined
849
           for all latest versions under parent that
850
           do not belong to the cluster.
851
        """
852

    
853
        pathq = pathq or []
854

    
855
        # TODO: Use another table to store before=inf results.
856
        q = ("select distinct a.key "
857
             "from attributes a, versions v, nodes n "
858
             "where v.serial = %s "
859
             "and v.cluster != ? "
860
             "and v.node in (select node "
861
             "from nodes "
862
             "where parent = ?) "
863
             "and a.serial = v.serial "
864
             "and a.domain = ? "
865
             "and n.node = v.node")
866
        subq, subargs = self._construct_latest_version_subquery(
867
            node=None, before=before)
868
        args = subargs + [except_cluster, parent, domain]
869
        q = q % subq
870
        subq, subargs = self._construct_paths(pathq)
871
        if subq is not None:
872
            q += subq
873
            args += subargs
874
        self.execute(q, args)
875
        return [r[0] for r in self.fetchall()]
876

    
877
    def latest_version_list(self, parent, prefix='', delimiter=None,
878
                            start='', limit=10000, before=inf,
879
                            except_cluster=0, pathq=[], domain=None,
880
                            filterq=[], sizeq=None, all_props=False):
881
        """Return a (list of (path, serial) tuples, list of common prefixes)
882
           for the current versions of the paths with the given parent,
883
           matching the following criteria.
884

885
           The property tuple for a version is returned if all
886
           of these conditions are true:
887

888
                a. parent matches
889

890
                b. path > start
891

892
                c. path starts with prefix (and paths in pathq)
893

894
                d. version is the max up to before
895

896
                e. version is not in cluster
897

898
                f. the path does not have the delimiter occuring
899
                   after the prefix, or ends with the delimiter
900

901
                g. serial matches the attribute filter query.
902

903
                   A filter query is a comma-separated list of
904
                   terms in one of these three forms:
905

906
                   key
907
                       an attribute with this key must exist
908

909
                   !key
910
                       an attribute with this key must not exist
911

912
                   key ?op value
913
                       the attribute with this key satisfies the value
914
                       where ?op is one of =, != <=, >=, <, >.
915

916
                h. the size is in the range set by sizeq
917

918
           The list of common prefixes includes the prefixes
919
           matching up to the first delimiter after prefix,
920
           and are reported only once, as "virtual directories".
921
           The delimiter is included in the prefixes.
922

923
           If arguments are None, then the corresponding matching rule
924
           will always match.
925

926
           Limit applies to the first list of tuples returned.
927

928
           If all_props is True, return all properties after path, not just serial.
929
        """
930

    
931
        execute = self.execute
932

    
933
        if not start or start < prefix:
934
            start = strprevling(prefix)
935
        nextling = strnextling(prefix)
936

    
937
        q = ("select distinct n.path, %s "
938
             "from versions v, nodes n "
939
             "where v.serial = %s "
940
             "and v.cluster != ? "
941
             "and v.node in (select node "
942
             "from nodes "
943
             "where parent = ?) "
944
             "and n.node = v.node "
945
             "and n.path > ? and n.path < ?")
946
        subq, args = self._construct_versions_nodes_latest_version_subquery(
947
            before)
948
        if not all_props:
949
            q = q % ("v.serial", subq)
950
        else:
951
            q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
952
        args += [except_cluster, parent, start, nextling]
953
        start_index = len(args) - 2
954

    
955
        subq, subargs = self._construct_paths(pathq)
956
        if subq is not None:
957
            q += subq
958
            args += subargs
959
        subq, subargs = self._construct_size(sizeq)
960
        if subq is not None:
961
            q += subq
962
            args += subargs
963
        subq, subargs = self._construct_filters(domain, filterq)
964
        if subq is not None:
965
            q += subq
966
            args += subargs
967
        else:
968
            q = q.replace("attributes a, ", "")
969
            q = q.replace("and a.serial = v.serial ", "")
970
        q += " order by n.path"
971

    
972
        if not delimiter:
973
            q += " limit ?"
974
            args.append(limit)
975
            execute(q, args)
976
            return self.fetchall(), ()
977

    
978
        pfz = len(prefix)
979
        dz = len(delimiter)
980
        count = 0
981
        fetchone = self.fetchone
982
        prefixes = []
983
        pappend = prefixes.append
984
        matches = []
985
        mappend = matches.append
986

    
987
        execute(q, args)
988
        while True:
989
            props = fetchone()
990
            if props is None:
991
                break
992
            path = props[0]
993
            serial = props[1]
994
            idx = path.find(delimiter, pfz)
995

    
996
            if idx < 0:
997
                mappend(props)
998
                count += 1
999
                if count >= limit:
1000
                    break
1001
                continue
1002

    
1003
            if idx + dz == len(path):
1004
                mappend(props)
1005
                count += 1
1006
                continue  # Get one more, in case there is a path.
1007
            pf = path[:idx + dz]
1008
            pappend(pf)
1009
            if count >= limit:
1010
                break
1011

    
1012
            args[start_index] = strnextling(pf)  # New start.
1013
            execute(q, args)
1014

    
1015
        return matches, prefixes
1016

    
1017
    def latest_uuid(self, uuid, cluster):
1018
        """Return the latest version of the given uuid and cluster.
1019

1020
        Return a (path, serial) tuple.
1021
        If cluster is None, all clusters are considered.
1022

1023
        """
1024
        if cluster is not None:
1025
            cluster_where = "and cluster = ?"
1026
            args = (uuid, int(cluster))
1027
        else:
1028
            cluster_where = ""
1029
            args = (uuid,)
1030

    
1031
        q = ("select n.path, v.serial "
1032
             "from versions v, nodes n "
1033
             "where v.serial = (select max(serial) "
1034
             "from versions "
1035
             "where uuid = ? %s) "
1036
             "and n.node = v.node") % cluster_where
1037
        self.execute(q, args)
1038
        return self.fetchone()
1039

    
1040
    def domain_object_list(self, domain, cluster=None):
1041
        """Return a list of (path, property list, attribute dictionary)
1042
           for the objects in the specific domain and cluster.
1043
        """
1044

    
1045
        q = ("select n.path, v.serial, v.node, v.hash, "
1046
             "v.size, v.type, v.source, v.mtime, v.muser, "
1047
             "v.uuid, v.checksum, v.cluster, a.key, a.value "
1048
             "from nodes n, versions v, attributes a "
1049
             "where n.node = v.node and "
1050
             "n.latest_version = v.serial and "
1051
             "v.serial = a.serial and "
1052
             "a.domain = ? ")
1053
        args = [domain]
1054
        if cluster != None:
1055
            q += "and v.cluster = ?"
1056
            args += [cluster]
1057

    
1058
        self.execute(q, args)
1059
        rows = self.fetchall()
1060

    
1061
        group_by = itemgetter(slice(12))
1062
        rows.sort(key = group_by)
1063
        groups = groupby(rows, group_by)
1064
        return [(k[0], k[1:], dict([i[12:] for i in data])) \
1065
            for (k, data) in groups]