Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ 8b365874

History | View | Annotate | Download (36.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from dbworker import DBWorker
39

    
40
from pithos.backends.filter import parse_filters
41

    
42

    
43
ROOTNODE = 0
44

    
45
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
46
 CLUSTER) = range(11)
47

    
48
(MATCH_PREFIX, MATCH_EXACT) = range(2)
49

    
50
inf = float('inf')
51

    
52

    
53
def strnextling(prefix):
54
    """Return the first unicode string
55
       greater than but not starting with given prefix.
56
       strnextling('hello') -> 'hellp'
57
    """
58
    if not prefix:
59
        ## all strings start with the null string,
60
        ## therefore we have to approximate strnextling('')
61
        ## with the last unicode character supported by python
62
        ## 0x10ffff for wide (32-bit unicode) python builds
63
        ## 0x00ffff for narrow (16-bit unicode) python builds
64
        ## We will not autodetect. 0xffff is safe enough.
65
        return unichr(0xffff)
66
    s = prefix[:-1]
67
    c = ord(prefix[-1])
68
    if c >= 0xffff:
69
        raise RuntimeError
70
    s += unichr(c + 1)
71
    return s
72

    
73

    
74
def strprevling(prefix):
75
    """Return an approximation of the last unicode string
76
       less than but not starting with given prefix.
77
       strprevling(u'hello') -> u'helln\\xffff'
78
    """
79
    if not prefix:
80
        ## There is no prevling for the null string
81
        return prefix
82
    s = prefix[:-1]
83
    c = ord(prefix[-1])
84
    if c > 0:
85
        s += unichr(c - 1) + unichr(0xffff)
86
    return s
87

    
88

    
89
_propnames = {
90
    'serial': 0,
91
    'node': 1,
92
    'hash': 2,
93
    'size': 3,
94
    'type': 4,
95
    'source': 5,
96
    'mtime': 6,
97
    'muser': 7,
98
    'uuid': 8,
99
    'checksum': 9,
100
    'cluster': 10
101
}
102

    
103

    
104
class Node(DBWorker):
105
    """Nodes store path organization and have multiple versions.
106
       Versions store object history and have multiple attributes.
107
       Attributes store metadata.
108
    """
109

    
110
    # TODO: Provide an interface for included and excluded clusters.
111

    
112
    def __init__(self, **params):
113
        DBWorker.__init__(self, **params)
114
        execute = self.execute
115

    
116
        execute(""" pragma foreign_keys = on """)
117

    
118
        execute(""" create table if not exists nodes
119
                          ( node       integer primary key,
120
                            parent     integer default 0,
121
                            path       text    not null default '',
122
                            latest_version     integer,
123
                            foreign key (parent)
124
                            references nodes(node)
125
                            on update cascade
126
                            on delete cascade ) """)
127
        execute(""" create unique index if not exists idx_nodes_path
128
                    on nodes(path) """)
129
        execute(""" create index if not exists idx_nodes_parent
130
                    on nodes(parent) """)
131

    
132
        execute(""" create table if not exists policy
133
                          ( node   integer,
134
                            key    text,
135
                            value  text,
136
                            primary key (node, key)
137
                            foreign key (node)
138
                            references nodes(node)
139
                            on update cascade
140
                            on delete cascade ) """)
141

    
142
        execute(""" create table if not exists statistics
143
                          ( node       integer,
144
                            population integer not null default 0,
145
                            size       integer not null default 0,
146
                            mtime      integer,
147
                            cluster    integer not null default 0,
148
                            primary key (node, cluster)
149
                            foreign key (node)
150
                            references nodes(node)
151
                            on update cascade
152
                            on delete cascade ) """)
153

    
154
        execute(""" create table if not exists versions
155
                          ( serial     integer primary key,
156
                            node       integer,
157
                            hash       text,
158
                            size       integer not null default 0,
159
                            type       text    not null default '',
160
                            source     integer,
161
                            mtime      integer,
162
                            muser      text    not null default '',
163
                            uuid       text    not null default '',
164
                            checksum   text    not null default '',
165
                            cluster    integer not null default 0,
166
                            foreign key (node)
167
                            references nodes(node)
168
                            on update cascade
169
                            on delete cascade ) """)
170
        execute(""" create index if not exists idx_versions_node_mtime
171
                    on versions(node, mtime) """)
172
        execute(""" create index if not exists idx_versions_node_uuid
173
                    on versions(uuid) """)
174

    
175
        execute(""" create table if not exists attributes
176
                          ( serial integer,
177
                            domain text,
178
                            key    text,
179
                            value  text,
180
                            primary key (serial, domain, key)
181
                            foreign key (serial)
182
                            references versions(serial)
183
                            on update cascade
184
                            on delete cascade ) """)
185
        execute(""" create index if not exists idx_attributes_domain
186
                    on attributes(domain) """)
187

    
188
        wrapper = self.wrapper
189
        wrapper.execute()
190
        try:
191
            q = "insert or ignore into nodes(node, parent) values (?, ?)"
192
            execute(q, (ROOTNODE, ROOTNODE))
193
        finally:
194
            wrapper.commit()
195

    
196
    def node_create(self, parent, path):
197
        """Create a new node from the given properties.
198
           Return the node identifier of the new node.
199
        """
200

    
201
        q = ("insert into nodes (parent, path) "
202
             "values (?, ?)")
203
        props = (parent, path)
204
        return self.execute(q, props).lastrowid
205

    
206
    def node_lookup(self, path, **kwargs):
207
        """Lookup the current node of the given path.
208
           Return None if the path is not found.
209

210
           kwargs is not used: it is passed for conformance
211
        """
212

    
213
        q = "select node from nodes where path = ?"
214
        self.execute(q, (path,))
215
        r = self.fetchone()
216
        if r is not None:
217
            return r[0]
218
        return None
219

    
220
    def node_lookup_bulk(self, paths):
221
        """Lookup the current nodes for the given paths.
222
           Return () if the path is not found.
223
        """
224

    
225
        placeholders = ','.join('?' for path in paths)
226
        q = "select node from nodes where path in (%s)" % placeholders
227
        self.execute(q, paths)
228
        r = self.fetchall()
229
        if r is not None:
230
            return [row[0] for row in r]
231
        return None
232

    
233
    def node_get_properties(self, node):
234
        """Return the node's (parent, path).
235
           Return None if the node is not found.
236
        """
237

    
238
        q = "select parent, path from nodes where node = ?"
239
        self.execute(q, (node,))
240
        return self.fetchone()
241

    
242
    def node_get_versions(self, node, keys=(), propnames=_propnames):
243
        """Return the properties of all versions at node.
244
           If keys is empty, return all properties in the order
245
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
246
        """
247

    
248
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
249
             "from versions "
250
             "where node = ?")
251
        self.execute(q, (node,))
252
        r = self.fetchall()
253
        if r is None:
254
            return r
255

    
256
        if not keys:
257
            return r
258
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
259

    
260
    def node_count_children(self, node):
261
        """Return node's child count."""
262

    
263
        q = "select count(node) from nodes where parent = ? and node != 0"
264
        self.execute(q, (node,))
265
        r = self.fetchone()
266
        if r is None:
267
            return 0
268
        return r[0]
269

    
270
    def node_purge_children(self, parent, before=inf, cluster=0):
271
        """Delete all versions with the specified
272
           parent and cluster, and return
273
           the hashes, the size and the serials of versions deleted.
274
           Clears out nodes with no remaining versions.
275
        """
276

    
277
        execute = self.execute
278
        q = ("select count(serial), sum(size) from versions "
279
             "where node in (select node "
280
             "from nodes "
281
             "where parent = ?) "
282
             "and cluster = ? "
283
             "and mtime <= ?")
284
        args = (parent, cluster, before)
285
        execute(q, args)
286
        nr, size = self.fetchone()
287
        if not nr:
288
            return (), 0, ()
289
        mtime = time()
290
        self.statistics_update(parent, -nr, -size, mtime, cluster)
291
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
292

    
293
        q = ("select hash, serial from versions "
294
             "where node in (select node "
295
             "from nodes "
296
             "where parent = ?) "
297
             "and cluster = ? "
298
             "and mtime <= ?")
299
        execute(q, args)
300
        hashes = []
301
        serials = []
302
        for r in self.fetchall():
303
            hashes += [r[0]]
304
            serials += [r[1]]
305
        
306
        q = ("delete from versions "
307
             "where node in (select node "
308
             "from nodes "
309
             "where parent = ?) "
310
             "and cluster = ? "
311
             "and mtime <= ?")
312
        execute(q, args)
313
        q = ("delete from nodes "
314
             "where node in (select node from nodes n "
315
             "where (select count(serial) "
316
             "from versions "
317
             "where node = n.node) = 0 "
318
             "and parent = ?)")
319
        execute(q, (parent,))
320
        return hashes, size, serials
321

    
322
    def node_purge(self, node, before=inf, cluster=0):
323
        """Delete all versions with the specified
324
           node and cluster, and return
325
           the hashes, the size and the serials of versions deleted.
326
           Clears out the node if it has no remaining versions.
327
        """
328

    
329
        execute = self.execute
330
        q = ("select count(serial), sum(size) from versions "
331
             "where node = ? "
332
             "and cluster = ? "
333
             "and mtime <= ?")
334
        args = (node, cluster, before)
335
        execute(q, args)
336
        nr, size = self.fetchone()
337
        if not nr:
338
            return (), 0, ()
339
        mtime = time()
340
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
341

    
342
        q = ("select hash, serial from versions "
343
             "where node = ? "
344
             "and cluster = ? "
345
             "and mtime <= ?")
346
        execute(q, args)
347
        hashes = []
348
        serials = []
349
        for r in self.fetchall():
350
            hashes += [r[0]]
351
            serials += [r[1]]
352

    
353
        q = ("delete from versions "
354
             "where node = ? "
355
             "and cluster = ? "
356
             "and mtime <= ?")
357
        execute(q, args)
358
        q = ("delete from nodes "
359
             "where node in (select node from nodes n "
360
             "where (select count(serial) "
361
             "from versions "
362
             "where node = n.node) = 0 "
363
             "and node = ?)")
364
        execute(q, (node,))
365
        return hashes, size, serials
366

    
367
    def node_remove(self, node):
368
        """Remove the node specified.
369
           Return false if the node has children or is not found.
370
        """
371

    
372
        if self.node_count_children(node):
373
            return False
374

    
375
        mtime = time()
376
        q = ("select count(serial), sum(size), cluster "
377
             "from versions "
378
             "where node = ? "
379
             "group by cluster")
380
        self.execute(q, (node,))
381
        for population, size, cluster in self.fetchall():
382
            self.statistics_update_ancestors(
383
                node, -population, -size, mtime, cluster)
384

    
385
        q = "delete from nodes where node = ?"
386
        self.execute(q, (node,))
387
        return True
388

    
389
    def node_accounts(self, accounts=()):
390
        q = ("select path, node from nodes where node != 0 and parent = 0 ")
391
        args = []
392
        if accounts:
393
            placeholders = ','.join('?' for a in accounts)
394
            q += ("and path in (%s)" % placeholders)
395
            args += accounts
396
        return self.execute(q, args).fetchall()
397

    
398
    def node_account_quotas(self):
399
        q = ("select n.path, p.value from nodes n, policy p "
400
             "where n.node != 0 and n.parent = 0 "
401
             "and n.node = p.node and p.key = 'quota'"
402
        )
403
        return dict(self.execute(q).fetchall())
404

    
405
    def node_account_usage(self, account_node, cluster):
406
        select_children = ("select node from nodes where parent = ?")
407
        select_descedents = ("select node from nodes "
408
                             "where parent in (%s) "
409
                             "or node in (%s) ") % ((select_children,)*2)
410
        args = [account_node]*2
411
        q = ("select sum(v.size) from versions v, nodes n "
412
             "where v.node = n.node "
413
             "and n.node in (%s) "
414
             "and v.cluster = ?") % select_descedents
415
        args += [cluster]
416

    
417
        self.execute(q, args)
418
        return self.fetchone()[0]
419

    
420
    def policy_get(self, node):
421
        q = "select key, value from policy where node = ?"
422
        self.execute(q, (node,))
423
        return dict(self.fetchall())
424

    
425
    def policy_set(self, node, policy):
426
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
427
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
428

    
429
    def statistics_get(self, node, cluster=0):
430
        """Return population, total size and last mtime
431
           for all versions under node that belong to the cluster.
432
        """
433

    
434
        q = ("select population, size, mtime from statistics "
435
             "where node = ? and cluster = ?")
436
        self.execute(q, (node, cluster))
437
        return self.fetchone()
438

    
439
    def statistics_update(self, node, population, size, mtime, cluster=0):
440
        """Update the statistics of the given node.
441
           Statistics keep track the population, total
442
           size of objects and mtime in the node's namespace.
443
           May be zero or positive or negative numbers.
444
        """
445

    
446
        qs = ("select population, size from statistics "
447
              "where node = ? and cluster = ?")
448
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
449
              "values (?, ?, ?, ?, ?)")
450
        self.execute(qs, (node, cluster))
451
        r = self.fetchone()
452
        if r is None:
453
            prepopulation, presize = (0, 0)
454
        else:
455
            prepopulation, presize = r
456
        population += prepopulation
457
        population = max(population, 0)
458
        size += presize
459
        self.execute(qu, (node, population, size, mtime, cluster))
460

    
461
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
462
        """Update the statistics of the given node's parent.
463
           Then recursively update all parents up to the root.
464
           Population is not recursive.
465
        """
466

    
467
        while True:
468
            if node == 0:
469
                break
470
            props = self.node_get_properties(node)
471
            if props is None:
472
                break
473
            parent, path = props
474
            self.statistics_update(parent, population, size, mtime, cluster)
475
            node = parent
476
            population = 0  # Population isn't recursive
477

    
478
    def statistics_latest(self, node, before=inf, except_cluster=0):
479
        """Return population, total size and last mtime
480
           for all latest versions under node that
481
           do not belong to the cluster.
482
        """
483

    
484
        execute = self.execute
485
        fetchone = self.fetchone
486

    
487
        # The node.
488
        props = self.node_get_properties(node)
489
        if props is None:
490
            return None
491
        parent, path = props
492

    
493
        # The latest version.
494
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
495
             "from versions v "
496
             "where serial = %s "
497
             "and cluster != ?")
498
        subq, args = self._construct_latest_version_subquery(
499
            node=node, before=before)
500
        execute(q % subq, args + [except_cluster])
501
        props = fetchone()
502
        if props is None:
503
            return None
504
        mtime = props[MTIME]
505

    
506
        # First level, just under node (get population).
507
        q = ("select count(serial), sum(size), max(mtime) "
508
             "from versions v "
509
             "where serial = %s "
510
             "and cluster != ? "
511
             "and node in (select node "
512
             "from nodes "
513
             "where parent = ?)")
514
        subq, args = self._construct_latest_version_subquery(
515
            node=None, before=before)
516
        execute(q % subq, args + [except_cluster, node])
517
        r = fetchone()
518
        if r is None:
519
            return None
520
        count = r[0]
521
        mtime = max(mtime, r[2])
522
        if count == 0:
523
            return (0, 0, mtime)
524

    
525
        # All children (get size and mtime).
526
        # This is why the full path is stored.
527
        q = ("select count(serial), sum(size), max(mtime) "
528
             "from versions v "
529
             "where serial = %s "
530
             "and cluster != ? "
531
             "and node in (select node "
532
             "from nodes "
533
             "where path like ? escape '\\')")
534
        subq, args = self._construct_latest_version_subquery(
535
            node=None, before=before)
536
        execute(
537
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
538
        r = fetchone()
539
        if r is None:
540
            return None
541
        size = r[1] - props[SIZE]
542
        mtime = max(mtime, r[2])
543
        return (count, size, mtime)
544

    
545
    def nodes_set_latest_version(self, node, serial):
546
        q = ("update nodes set latest_version = ? where node = ?")
547
        props = (serial, node)
548
        self.execute(q, props)
549

    
550
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
551
        """Create a new version from the given properties.
552
           Return the (serial, mtime) of the new version.
553
        """
554

    
555
        q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
556
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
557
        mtime = time()
558
        props = (node, hash, size, type, source, mtime, muser,
559
                 uuid, checksum, cluster)
560
        serial = self.execute(q, props).lastrowid
561
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
562

    
563
        self.nodes_set_latest_version(node, serial)
564

    
565
        return serial, mtime
566

    
567
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
568
        """Lookup the current version of the given node.
569
           Return a list with its properties:
570
           (serial, node, hash, size, type, source, mtime,
571
            muser, uuid, checksum, cluster)
572
           or None if the current version is not found in the given cluster.
573
        """
574

    
575
        q = ("select %s "
576
             "from versions v "
577
             "where serial = %s "
578
             "and cluster = ?")
579
        subq, args = self._construct_latest_version_subquery(
580
            node=node, before=before)
581
        if not all_props:
582
            q = q % ("serial", subq)
583
        else:
584
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
585

    
586
        self.execute(q, args + [cluster])
587
        props = self.fetchone()
588
        if props is not None:
589
            return props
590
        return None
591

    
592
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
593
        """Lookup the current versions of the given nodes.
594
           Return a list with their properties:
595
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
596
        """
597

    
598
        if not nodes:
599
            return ()
600
        q = ("select %s "
601
             "from versions "
602
             "where serial in %s "
603
             "and cluster = ? %s")
604
        subq, args = self._construct_latest_versions_subquery(
605
            nodes=nodes, before=before)
606
        if not all_props:
607
            q = q % ("serial", subq, '')
608
        else:
609
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq, 'order by node')
610

    
611
        args += [cluster]
612
        self.execute(q, args)
613
        return self.fetchall()
614

    
615
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
616
        """Return a sequence of values for the properties of
617
           the version specified by serial and the keys, in the order given.
618
           If keys is empty, return all properties in the order
619
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
620
        """
621

    
622
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
623
             "from versions "
624
             "where serial = ?")
625
        self.execute(q, (serial,))
626
        r = self.fetchone()
627
        if r is None:
628
            return r
629

    
630
        if not keys:
631
            return r
632
        return [r[propnames[k]] for k in keys if k in propnames]
633

    
634
    def version_put_property(self, serial, key, value):
635
        """Set value for the property of version specified by key."""
636

    
637
        if key not in _propnames:
638
            return
639
        q = "update versions set %s = ? where serial = ?" % key
640
        self.execute(q, (value, serial))
641

    
642
    def version_recluster(self, serial, cluster):
643
        """Move the version into another cluster."""
644

    
645
        props = self.version_get_properties(serial)
646
        if not props:
647
            return
648
        node = props[NODE]
649
        size = props[SIZE]
650
        oldcluster = props[CLUSTER]
651
        if cluster == oldcluster:
652
            return
653

    
654
        mtime = time()
655
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
656
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
657

    
658
        q = "update versions set cluster = ? where serial = ?"
659
        self.execute(q, (cluster, serial))
660

    
661
    def version_remove(self, serial):
662
        """Remove the serial specified."""
663

    
664
        props = self.version_get_properties(serial)
665
        if not props:
666
            return
667
        node = props[NODE]
668
        hash = props[HASH]
669
        size = props[SIZE]
670
        cluster = props[CLUSTER]
671

    
672
        mtime = time()
673
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
674

    
675
        q = "delete from versions where serial = ?"
676
        self.execute(q, (serial,))
677

    
678
        props = self.version_lookup(node, cluster=cluster, all_props=False)
679
        if props:
680
            self.nodes_set_latest_version(node, props[0])
681
        return hash, size
682

    
683
    def attribute_get(self, serial, domain, keys=()):
684
        """Return a list of (key, value) pairs of the version specified by serial.
685
           If keys is empty, return all attributes.
686
           Othwerise, return only those specified.
687
        """
688

    
689
        execute = self.execute
690
        if keys:
691
            marks = ','.join('?' for k in keys)
692
            q = ("select key, value from attributes "
693
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
694
            execute(q, keys + (serial, domain))
695
        else:
696
            q = "select key, value from attributes where serial = ? and domain = ?"
697
            execute(q, (serial, domain))
698
        return self.fetchall()
699

    
700
    def attribute_set(self, serial, domain, items):
701
        """Set the attributes of the version specified by serial.
702
           Receive attributes as an iterable of (key, value) pairs.
703
        """
704

    
705
        q = ("insert or replace into attributes (serial, domain, key, value) "
706
             "values (?, ?, ?, ?)")
707
        self.executemany(q, ((serial, domain, k, v) for k, v in items))
708

    
709
    def attribute_del(self, serial, domain, keys=()):
710
        """Delete attributes of the version specified by serial.
711
           If keys is empty, delete all attributes.
712
           Otherwise delete those specified.
713
        """
714

    
715
        if keys:
716
            q = "delete from attributes where serial = ? and domain = ? and key = ?"
717
            self.executemany(q, ((serial, domain, key) for key in keys))
718
        else:
719
            q = "delete from attributes where serial = ? and domain = ?"
720
            self.execute(q, (serial, domain))
721

    
722
    def attribute_copy(self, source, dest):
723
        q = ("insert or replace into attributes "
724
             "select ?, domain, key, value from attributes "
725
             "where serial = ?")
726
        self.execute(q, (dest, source))
727

    
728
    def _construct_filters(self, domain, filterq):
729
        if not domain or not filterq:
730
            return None, None
731

    
732
        subqlist = []
733
        append = subqlist.append
734
        included, excluded, opers = parse_filters(filterq)
735
        args = []
736

    
737
        if included:
738
            subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
739
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
740
            subq += ")"
741
            args += [domain]
742
            args += included
743
            append(subq)
744

    
745
        if excluded:
746
            subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
747
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
748
            subq += ")"
749
            args += [domain]
750
            args += excluded
751
            append(subq)
752

    
753
        if opers:
754
            for k, o, v in opers:
755
                subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
756
                subq += "key = ? and value %s ?" % (o,)
757
                subq += ")"
758
                args += [domain, k, v]
759
                append(subq)
760

    
761
        if not subqlist:
762
            return None, None
763

    
764
        subq = ' and ' + ' and '.join(subqlist)
765

    
766
        return subq, args
767

    
768
    def _construct_paths(self, pathq):
769
        if not pathq:
770
            return None, None
771

    
772
        subqlist = []
773
        args = []
774
        for path, match in pathq:
775
            if match == MATCH_PREFIX:
776
                subqlist.append("n.path like ? escape '\\'")
777
                args.append(self.escape_like(path) + '%')
778
            elif match == MATCH_EXACT:
779
                subqlist.append("n.path = ?")
780
                args.append(path)
781

    
782
        subq = ' and (' + ' or '.join(subqlist) + ')'
783
        args = tuple(args)
784

    
785
        return subq, args
786

    
787
    def _construct_size(self, sizeq):
788
        if not sizeq or len(sizeq) != 2:
789
            return None, None
790

    
791
        subq = ''
792
        args = []
793
        if sizeq[0]:
794
            subq += " and v.size >= ?"
795
            args += [sizeq[0]]
796
        if sizeq[1]:
797
            subq += " and v.size < ?"
798
            args += [sizeq[1]]
799

    
800
        return subq, args
801

    
802
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
803
        if before == inf:
804
            q = ("n.latest_version ")
805
            args = []
806
        else:
807
            q = ("(select max(serial) "
808
                 "from versions "
809
                 "where node = v.node and mtime < ?) ")
810
            args = [before]
811
        return q, args
812

    
813
    def _construct_latest_version_subquery(self, node=None, before=inf):
814
        where_cond = "node = v.node"
815
        args = []
816
        if node:
817
            where_cond = "node = ? "
818
            args = [node]
819

    
820
        if before == inf:
821
            q = ("(select latest_version "
822
                 "from nodes "
823
                 "where %s) ")
824
        else:
825
            q = ("(select max(serial) "
826
                 "from versions "
827
                 "where %s and mtime < ?) ")
828
            args += [before]
829
        return q % where_cond, args
830

    
831
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
832
        where_cond = ""
833
        args = []
834
        if nodes:
835
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
836
            args = nodes
837

    
838
        if before == inf:
839
            q = ("(select latest_version "
840
                 "from nodes "
841
                 "where %s ) ")
842
        else:
843
            q = ("(select max(serial) "
844
                 "from versions "
845
                 "where %s and mtime < ? group by node) ")
846
            args += [before]
847
        return q % where_cond, args
848

    
849
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=None):
850
        """Return a list with all keys pairs defined
851
           for all latest versions under parent that
852
           do not belong to the cluster.
853
        """
854

    
855
        pathq = pathq or []
856

    
857
        # TODO: Use another table to store before=inf results.
858
        q = ("select distinct a.key "
859
             "from attributes a, versions v, nodes n "
860
             "where v.serial = %s "
861
             "and v.cluster != ? "
862
             "and v.node in (select node "
863
             "from nodes "
864
             "where parent = ?) "
865
             "and a.serial = v.serial "
866
             "and a.domain = ? "
867
             "and n.node = v.node")
868
        subq, subargs = self._construct_latest_version_subquery(
869
            node=None, before=before)
870
        args = subargs + [except_cluster, parent, domain]
871
        q = q % subq
872
        subq, subargs = self._construct_paths(pathq)
873
        if subq is not None:
874
            q += subq
875
            args += subargs
876
        self.execute(q, args)
877
        return [r[0] for r in self.fetchall()]
878

    
879
    def latest_version_list(self, parent, prefix='', delimiter=None,
880
                            start='', limit=10000, before=inf,
881
                            except_cluster=0, pathq=[], domain=None,
882
                            filterq=[], sizeq=None, all_props=False):
883
        """Return a (list of (path, serial) tuples, list of common prefixes)
884
           for the current versions of the paths with the given parent,
885
           matching the following criteria.
886

887
           The property tuple for a version is returned if all
888
           of these conditions are true:
889

890
                a. parent matches
891

892
                b. path > start
893

894
                c. path starts with prefix (and paths in pathq)
895

896
                d. version is the max up to before
897

898
                e. version is not in cluster
899

900
                f. the path does not have the delimiter occuring
901
                   after the prefix, or ends with the delimiter
902

903
                g. serial matches the attribute filter query.
904

905
                   A filter query is a comma-separated list of
906
                   terms in one of these three forms:
907

908
                   key
909
                       an attribute with this key must exist
910

911
                   !key
912
                       an attribute with this key must not exist
913

914
                   key ?op value
915
                       the attribute with this key satisfies the value
916
                       where ?op is one of =, != <=, >=, <, >.
917

918
                h. the size is in the range set by sizeq
919

920
           The list of common prefixes includes the prefixes
921
           matching up to the first delimiter after prefix,
922
           and are reported only once, as "virtual directories".
923
           The delimiter is included in the prefixes.
924

925
           If arguments are None, then the corresponding matching rule
926
           will always match.
927

928
           Limit applies to the first list of tuples returned.
929

930
           If all_props is True, return all properties after path, not just serial.
931
        """
932

    
933
        execute = self.execute
934

    
935
        if not start or start < prefix:
936
            start = strprevling(prefix)
937
        nextling = strnextling(prefix)
938

    
939
        q = ("select distinct n.path, %s "
940
             "from versions v, nodes n "
941
             "where v.serial = %s "
942
             "and v.cluster != ? "
943
             "and v.node in (select node "
944
             "from nodes "
945
             "where parent = ?) "
946
             "and n.node = v.node "
947
             "and n.path > ? and n.path < ?")
948
        subq, args = self._construct_versions_nodes_latest_version_subquery(
949
            before)
950
        if not all_props:
951
            q = q % ("v.serial", subq)
952
        else:
953
            q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
954
        args += [except_cluster, parent, start, nextling]
955
        start_index = len(args) - 2
956

    
957
        subq, subargs = self._construct_paths(pathq)
958
        if subq is not None:
959
            q += subq
960
            args += subargs
961
        subq, subargs = self._construct_size(sizeq)
962
        if subq is not None:
963
            q += subq
964
            args += subargs
965
        subq, subargs = self._construct_filters(domain, filterq)
966
        if subq is not None:
967
            q += subq
968
            args += subargs
969
        else:
970
            q = q.replace("attributes a, ", "")
971
            q = q.replace("and a.serial = v.serial ", "")
972
        q += " order by n.path"
973

    
974
        if not delimiter:
975
            q += " limit ?"
976
            args.append(limit)
977
            execute(q, args)
978
            return self.fetchall(), ()
979

    
980
        pfz = len(prefix)
981
        dz = len(delimiter)
982
        count = 0
983
        fetchone = self.fetchone
984
        prefixes = []
985
        pappend = prefixes.append
986
        matches = []
987
        mappend = matches.append
988

    
989
        execute(q, args)
990
        while True:
991
            props = fetchone()
992
            if props is None:
993
                break
994
            path = props[0]
995
            serial = props[1]
996
            idx = path.find(delimiter, pfz)
997

    
998
            if idx < 0:
999
                mappend(props)
1000
                count += 1
1001
                if count >= limit:
1002
                    break
1003
                continue
1004

    
1005
            if idx + dz == len(path):
1006
                mappend(props)
1007
                count += 1
1008
                continue  # Get one more, in case there is a path.
1009
            pf = path[:idx + dz]
1010
            pappend(pf)
1011
            if count >= limit:
1012
                break
1013

    
1014
            args[start_index] = strnextling(pf)  # New start.
1015
            execute(q, args)
1016

    
1017
        return matches, prefixes
1018

    
1019
    def latest_uuid(self, uuid, cluster):
1020
        """Return the latest version of the given uuid and cluster.
1021

1022
        Return a (path, serial) tuple.
1023
        If cluster is None, all clusters are considered.
1024

1025
        """
1026
        if cluster is not None:
1027
            cluster_where = "and cluster = ?"
1028
            args = (uuid, int(cluster))
1029
        else:
1030
            cluster_where = ""
1031
            args = (uuid,)
1032

    
1033
        q = ("select n.path, v.serial "
1034
             "from versions v, nodes n "
1035
             "where v.serial = (select max(serial) "
1036
             "from versions "
1037
             "where uuid = ? %s) "
1038
             "and n.node = v.node") % cluster_where
1039
        self.execute(q, args)
1040
        return self.fetchone()
1041

    
1042
    def domain_object_list(self, domain, paths, cluster=None):
1043
        """Return a list of (path, property list, attribute dictionary)
1044
           for the objects in the specific domain and cluster.
1045
        """
1046

    
1047
        q = ("select n.path, v.serial, v.node, v.hash, "
1048
             "v.size, v.type, v.source, v.mtime, v.muser, "
1049
             "v.uuid, v.checksum, v.cluster, a.key, a.value "
1050
             "from nodes n, versions v, attributes a "
1051
             "where n.node = v.node and "
1052
             "n.latest_version = v.serial and "
1053
             "v.serial = a.serial and "
1054
             "a.domain = ? and "
1055
             "n.path in (%s)" % ','.join(['?' for _ in range(len(paths))))
1056
        args = [domain]
1057
        map(args.append, paths)
1058
        if cluster != None:
1059
            q += "and v.cluster = ?"
1060
            args += [cluster]
1061

    
1062
        self.execute(q, args)
1063
        rows = self.fetchall()
1064

    
1065
        group_by = itemgetter(slice(12))
1066
        rows.sort(key = group_by)
1067
        groups = groupby(rows, group_by)
1068
        return [(k[0], k[1:], dict([i[12:] for i in data])) \
1069
            for (k, data) in groups]