Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ 0534576c

History | View | Annotate | Download (38.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from dbworker import DBWorker
39

    
40
from pithos.backends.filter import parse_filters
41

    
42

    
43
ROOTNODE = 0
44

    
45
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
46
 CLUSTER) = range(11)
47

    
48
(MATCH_PREFIX, MATCH_EXACT) = range(2)
49

    
50
inf = float('inf')
51

    
52

    
53
def strnextling(prefix):
54
    """Return the first unicode string
55
       greater than but not starting with given prefix.
56
       strnextling('hello') -> 'hellp'
57
    """
58
    if not prefix:
59
        ## all strings start with the null string,
60
        ## therefore we have to approximate strnextling('')
61
        ## with the last unicode character supported by python
62
        ## 0x10ffff for wide (32-bit unicode) python builds
63
        ## 0x00ffff for narrow (16-bit unicode) python builds
64
        ## We will not autodetect. 0xffff is safe enough.
65
        return unichr(0xffff)
66
    s = prefix[:-1]
67
    c = ord(prefix[-1])
68
    if c >= 0xffff:
69
        raise RuntimeError
70
    s += unichr(c + 1)
71
    return s
72

    
73

    
74
def strprevling(prefix):
75
    """Return an approximation of the last unicode string
76
       less than but not starting with given prefix.
77
       strprevling(u'hello') -> u'helln\\xffff'
78
    """
79
    if not prefix:
80
        ## There is no prevling for the null string
81
        return prefix
82
    s = prefix[:-1]
83
    c = ord(prefix[-1])
84
    if c > 0:
85
        s += unichr(c - 1) + unichr(0xffff)
86
    return s
87

    
88

    
89
_propnames = {
90
    'serial': 0,
91
    'node': 1,
92
    'hash': 2,
93
    'size': 3,
94
    'type': 4,
95
    'source': 5,
96
    'mtime': 6,
97
    'muser': 7,
98
    'uuid': 8,
99
    'checksum': 9,
100
    'cluster': 10
101
}
102

    
103

    
104
class Node(DBWorker):
105
    """Nodes store path organization and have multiple versions.
106
       Versions store object history and have multiple attributes.
107
       Attributes store metadata.
108
    """
109

    
110
    # TODO: Provide an interface for included and excluded clusters.
111

    
112
    def __init__(self, **params):
113
        DBWorker.__init__(self, **params)
114
        execute = self.execute
115

    
116
        execute(""" pragma foreign_keys = on """)
117

    
118
        execute(""" create table if not exists nodes
119
                          ( node       integer primary key,
120
                            parent     integer default 0,
121
                            path       text    not null default '',
122
                            latest_version     integer,
123
                            foreign key (parent)
124
                            references nodes(node)
125
                            on update cascade
126
                            on delete cascade ) """)
127
        execute(""" create unique index if not exists idx_nodes_path
128
                    on nodes(path) """)
129
        execute(""" create index if not exists idx_nodes_parent
130
                    on nodes(parent) """)
131

    
132
        execute(""" create table if not exists policy
133
                          ( node   integer,
134
                            key    text,
135
                            value  text,
136
                            primary key (node, key)
137
                            foreign key (node)
138
                            references nodes(node)
139
                            on update cascade
140
                            on delete cascade ) """)
141

    
142
        execute(""" create table if not exists statistics
143
                          ( node       integer,
144
                            population integer not null default 0,
145
                            size       integer not null default 0,
146
                            mtime      integer,
147
                            cluster    integer not null default 0,
148
                            primary key (node, cluster)
149
                            foreign key (node)
150
                            references nodes(node)
151
                            on update cascade
152
                            on delete cascade ) """)
153

    
154
        execute(""" create table if not exists versions
155
                          ( serial     integer primary key,
156
                            node       integer,
157
                            hash       text,
158
                            size       integer not null default 0,
159
                            type       text    not null default '',
160
                            source     integer,
161
                            mtime      integer,
162
                            muser      text    not null default '',
163
                            uuid       text    not null default '',
164
                            checksum   text    not null default '',
165
                            cluster    integer not null default 0,
166
                            foreign key (node)
167
                            references nodes(node)
168
                            on update cascade
169
                            on delete cascade ) """)
170
        execute(""" create index if not exists idx_versions_node_mtime
171
                    on versions(node, mtime) """)
172
        execute(""" create index if not exists idx_versions_node_uuid
173
                    on versions(uuid) """)
174

    
175
        execute(""" create table if not exists attributes
176
                          ( serial      integer,
177
                            domain      text,
178
                            key         text,
179
                            value       text,
180
                            node        integer not null    default 0,
181
                            is_latest   boolean not null    default 1,
182
                            primary key (serial, domain, key)
183
                            foreign key (serial)
184
                            references versions(serial)
185
                            on update cascade
186
                            on delete cascade ) """)
187
        execute(""" create index if not exists idx_attributes_domain
188
                    on attributes(domain) """)
189
        execute(""" create index if not exists idx_attributes_serial_node
190
                    on attributes(serial, node) """)
191

    
192
        wrapper = self.wrapper
193
        wrapper.execute()
194
        try:
195
            q = "insert or ignore into nodes(node, parent) values (?, ?)"
196
            execute(q, (ROOTNODE, ROOTNODE))
197
        finally:
198
            wrapper.commit()
199

    
200
    def node_create(self, parent, path):
201
        """Create a new node from the given properties.
202
           Return the node identifier of the new node.
203
        """
204

    
205
        q = ("insert into nodes (parent, path) "
206
             "values (?, ?)")
207
        props = (parent, path)
208
        return self.execute(q, props).lastrowid
209

    
210
    def node_lookup(self, path, for_update=False):
211
        """Lookup the current node of the given path.
212
           Return None if the path is not found.
213

214
           kwargs is not used: it is passed for conformance
215
        """
216

    
217
        q = "select node from nodes where path = ?"
218
        self.execute(q, (path,))
219
        r = self.fetchone()
220
        if r is not None:
221
            return r[0]
222
        return None
223

    
224
    def node_lookup_bulk(self, paths):
225
        """Lookup the current nodes for the given paths.
226
           Return () if the path is not found.
227
        """
228

    
229
        placeholders = ','.join('?' for path in paths)
230
        q = "select node from nodes where path in (%s)" % placeholders
231
        self.execute(q, paths)
232
        r = self.fetchall()
233
        if r is not None:
234
            return [row[0] for row in r]
235
        return None
236

    
237
    def node_get_properties(self, node):
238
        """Return the node's (parent, path).
239
           Return None if the node is not found.
240
        """
241

    
242
        q = "select parent, path from nodes where node = ?"
243
        self.execute(q, (node,))
244
        return self.fetchone()
245

    
246
    def node_get_versions(self, node, keys=(), propnames=_propnames):
247
        """Return the properties of all versions at node.
248
           If keys is empty, return all properties in the order
249
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
250
        """
251

    
252
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
253
             "from versions "
254
             "where node = ?")
255
        self.execute(q, (node,))
256
        r = self.fetchall()
257
        if r is None:
258
            return r
259

    
260
        if not keys:
261
            return r
262
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
263

    
264
    def node_count_children(self, node):
265
        """Return node's child count."""
266

    
267
        q = "select count(node) from nodes where parent = ? and node != 0"
268
        self.execute(q, (node,))
269
        r = self.fetchone()
270
        if r is None:
271
            return 0
272
        return r[0]
273

    
274
    def node_purge_children(self, parent, before=inf, cluster=0,
275
                            update_statistics_ancestors_depth=None):
276
        """Delete all versions with the specified
277
           parent and cluster, and return
278
           the hashes, the size and the serials of versions deleted.
279
           Clears out nodes with no remaining versions.
280
        """
281

    
282
        execute = self.execute
283
        q = ("select count(serial), sum(size) from versions "
284
             "where node in (select node "
285
             "from nodes "
286
             "where parent = ?) "
287
             "and cluster = ? "
288
             "and mtime <= ?")
289
        args = (parent, cluster, before)
290
        execute(q, args)
291
        nr, size = self.fetchone()
292
        if not nr:
293
            return (), 0, ()
294
        mtime = time()
295
        self.statistics_update(parent, -nr, -size, mtime, cluster)
296
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
297
                                         update_statistics_ancestors_depth)
298

    
299
        q = ("select hash, serial from versions "
300
             "where node in (select node "
301
             "from nodes "
302
             "where parent = ?) "
303
             "and cluster = ? "
304
             "and mtime <= ?")
305
        execute(q, args)
306
        hashes = []
307
        serials = []
308
        for r in self.fetchall():
309
            hashes += [r[0]]
310
            serials += [r[1]]
311

    
312
        q = ("delete from versions "
313
             "where node in (select node "
314
             "from nodes "
315
             "where parent = ?) "
316
             "and cluster = ? "
317
             "and mtime <= ?")
318
        execute(q, args)
319
        q = ("delete from nodes "
320
             "where node in (select node from nodes n "
321
             "where (select count(serial) "
322
             "from versions "
323
             "where node = n.node) = 0 "
324
             "and parent = ?)")
325
        execute(q, (parent,))
326
        return hashes, size, serials
327

    
328
    def node_purge(self, node, before=inf, cluster=0,
329
                   update_statistics_ancestors_depth=None):
330
        """Delete all versions with the specified
331
           node and cluster, and return
332
           the hashes, the size and the serials of versions deleted.
333
           Clears out the node if it has no remaining versions.
334
        """
335

    
336
        execute = self.execute
337
        q = ("select count(serial), sum(size) from versions "
338
             "where node = ? "
339
             "and cluster = ? "
340
             "and mtime <= ?")
341
        args = (node, cluster, before)
342
        execute(q, args)
343
        nr, size = self.fetchone()
344
        if not nr:
345
            return (), 0, ()
346
        mtime = time()
347
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
348
                                         update_statistics_ancestors_depth)
349

    
350
        q = ("select hash, serial from versions "
351
             "where node = ? "
352
             "and cluster = ? "
353
             "and mtime <= ?")
354
        execute(q, args)
355
        hashes = []
356
        serials = []
357
        for r in self.fetchall():
358
            hashes += [r[0]]
359
            serials += [r[1]]
360

    
361
        q = ("delete from versions "
362
             "where node = ? "
363
             "and cluster = ? "
364
             "and mtime <= ?")
365
        execute(q, args)
366
        q = ("delete from nodes "
367
             "where node in (select node from nodes n "
368
             "where (select count(serial) "
369
             "from versions "
370
             "where node = n.node) = 0 "
371
             "and node = ?)")
372
        execute(q, (node,))
373
        return hashes, size, serials
374

    
375
    def node_remove(self, node, update_statistics_ancestors_depth=None):
376
        """Remove the node specified.
377
           Return false if the node has children or is not found.
378
        """
379

    
380
        if self.node_count_children(node):
381
            return False
382

    
383
        mtime = time()
384
        q = ("select count(serial), sum(size), cluster "
385
             "from versions "
386
             "where node = ? "
387
             "group by cluster")
388
        self.execute(q, (node,))
389
        for population, size, cluster in self.fetchall():
390
            self.statistics_update_ancestors(
391
                node, -population, -size, mtime, cluster,
392
                update_statistics_ancestors_depth)
393

    
394
        q = "delete from nodes where node = ?"
395
        self.execute(q, (node,))
396
        return True
397

    
398
    def node_accounts(self, accounts=()):
399
        q = ("select path, node from nodes where node != 0 and parent = 0 ")
400
        args = []
401
        if accounts:
402
            placeholders = ','.join('?' for a in accounts)
403
            q += ("and path in (%s)" % placeholders)
404
            args += accounts
405
        return self.execute(q, args).fetchall()
406

    
407
    def node_account_quotas(self):
408
        q = ("select n.path, p.value from nodes n, policy p "
409
             "where n.node != 0 and n.parent = 0 "
410
             "and n.node = p.node and p.key = 'quota'"
411
        )
412
        return dict(self.execute(q).fetchall())
413

    
414
    def node_account_usage(self, account_node, cluster):
415
        select_children = ("select node from nodes where parent = ?")
416
        select_descedents = ("select node from nodes "
417
                             "where parent in (%s) "
418
                             "or node in (%s) ") % ((select_children,)*2)
419
        args = [account_node]*2
420
        q = ("select sum(v.size) from versions v, nodes n "
421
             "where v.node = n.node "
422
             "and n.node in (%s) "
423
             "and v.cluster = ?") % select_descedents
424
        args += [cluster]
425

    
426
        self.execute(q, args)
427
        return self.fetchone()[0]
428

    
429
    def policy_get(self, node):
430
        q = "select key, value from policy where node = ?"
431
        self.execute(q, (node,))
432
        return dict(self.fetchall())
433

    
434
    def policy_set(self, node, policy):
435
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
436
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
437

    
438
    def statistics_get(self, node, cluster=0):
439
        """Return population, total size and last mtime
440
           for all versions under node that belong to the cluster.
441
        """
442

    
443
        q = ("select population, size, mtime from statistics "
444
             "where node = ? and cluster = ?")
445
        self.execute(q, (node, cluster))
446
        return self.fetchone()
447

    
448
    def statistics_update(self, node, population, size, mtime, cluster=0):
449
        """Update the statistics of the given node.
450
           Statistics keep track the population, total
451
           size of objects and mtime in the node's namespace.
452
           May be zero or positive or negative numbers.
453
        """
454

    
455
        qs = ("select population, size from statistics "
456
              "where node = ? and cluster = ?")
457
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
458
              "values (?, ?, ?, ?, ?)")
459
        self.execute(qs, (node, cluster))
460
        r = self.fetchone()
461
        if r is None:
462
            prepopulation, presize = (0, 0)
463
        else:
464
            prepopulation, presize = r
465
        population += prepopulation
466
        population = max(population, 0)
467
        size += presize
468
        self.execute(qu, (node, population, size, mtime, cluster))
469

    
470
    def statistics_update_ancestors(self, node, population, size, mtime,
471
                                    cluster=0, recursion_depth=None):
472
        """Update the statistics of the given node's parent.
473
           Then recursively update all parents up to the root.
474
           Population is not recursive.
475
        """
476

    
477
        i = 0
478
        while True:
479
            if node == ROOTNODE:
480
                break
481
            if recursion_depth and recursion_depth <= i:
482
                break
483
            props = self.node_get_properties(node)
484
            if props is None:
485
                break
486
            parent, path = props
487
            self.statistics_update(parent, population, size, mtime, cluster)
488
            node = parent
489
            population = 0  # Population isn't recursive
490
            i += 1
491

    
492
    def statistics_latest(self, node, before=inf, except_cluster=0):
493
        """Return population, total size and last mtime
494
           for all latest versions under node that
495
           do not belong to the cluster.
496
        """
497

    
498
        execute = self.execute
499
        fetchone = self.fetchone
500

    
501
        # The node.
502
        props = self.node_get_properties(node)
503
        if props is None:
504
            return None
505
        parent, path = props
506

    
507
        # The latest version.
508
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
509
             "from versions v "
510
             "where serial = %s "
511
             "and cluster != ?")
512
        subq, args = self._construct_latest_version_subquery(
513
            node=node, before=before)
514
        execute(q % subq, args + [except_cluster])
515
        props = fetchone()
516
        if props is None:
517
            return None
518
        mtime = props[MTIME]
519

    
520
        # First level, just under node (get population).
521
        q = ("select count(serial), sum(size), max(mtime) "
522
             "from versions v "
523
             "where serial = %s "
524
             "and cluster != ? "
525
             "and node in (select node "
526
             "from nodes "
527
             "where parent = ?)")
528
        subq, args = self._construct_latest_version_subquery(
529
            node=None, before=before)
530
        execute(q % subq, args + [except_cluster, node])
531
        r = fetchone()
532
        if r is None:
533
            return None
534
        count = r[0]
535
        mtime = max(mtime, r[2])
536
        if count == 0:
537
            return (0, 0, mtime)
538

    
539
        # All children (get size and mtime).
540
        # This is why the full path is stored.
541
        q = ("select count(serial), sum(size), max(mtime) "
542
             "from versions v "
543
             "where serial = %s "
544
             "and cluster != ? "
545
             "and node in (select node "
546
             "from nodes "
547
             "where path like ? escape '\\')")
548
        subq, args = self._construct_latest_version_subquery(
549
            node=None, before=before)
550
        execute(
551
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
552
        r = fetchone()
553
        if r is None:
554
            return None
555
        size = r[1] - props[SIZE]
556
        mtime = max(mtime, r[2])
557
        return (count, size, mtime)
558

    
559
    def nodes_set_latest_version(self, node, serial):
560
        q = ("update nodes set latest_version = ? where node = ?")
561
        props = (serial, node)
562
        self.execute(q, props)
563

    
564
    def version_create(self, node, hash, size, type, source, muser, uuid,
565
                       checksum, cluster=0,
566
                       update_statistics_ancestors_depth=None):
567
        """Create a new version from the given properties.
568
           Return the (serial, mtime) of the new version.
569
        """
570

    
571
        q = ("insert into versions (node, hash, size, type, source, mtime, muser, uuid, checksum, cluster) "
572
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
573
        mtime = time()
574
        props = (node, hash, size, type, source, mtime, muser,
575
                 uuid, checksum, cluster)
576
        serial = self.execute(q, props).lastrowid
577
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
578
                                         update_statistics_ancestors_depth)
579

    
580
        self.nodes_set_latest_version(node, serial)
581

    
582
        return serial, mtime
583

    
584
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
585
        """Lookup the current version of the given node.
586
           Return a list with its properties:
587
           (serial, node, hash, size, type, source, mtime,
588
            muser, uuid, checksum, cluster)
589
           or None if the current version is not found in the given cluster.
590
        """
591

    
592
        q = ("select %s "
593
             "from versions v "
594
             "where serial = %s "
595
             "and cluster = ?")
596
        subq, args = self._construct_latest_version_subquery(
597
            node=node, before=before)
598
        if not all_props:
599
            q = q % ("serial", subq)
600
        else:
601
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq)
602

    
603
        self.execute(q, args + [cluster])
604
        props = self.fetchone()
605
        if props is not None:
606
            return props
607
        return None
608

    
609
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
610
        """Lookup the current versions of the given nodes.
611
           Return a list with their properties:
612
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
613
        """
614

    
615
        if not nodes:
616
            return ()
617
        q = ("select %s "
618
             "from versions "
619
             "where serial in %s "
620
             "and cluster = ? %s")
621
        subq, args = self._construct_latest_versions_subquery(
622
            nodes=nodes, before=before)
623
        if not all_props:
624
            q = q % ("serial", subq, '')
625
        else:
626
            q = q % ("serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster", subq, 'order by node')
627

    
628
        args += [cluster]
629
        self.execute(q, args)
630
        return self.fetchall()
631

    
632
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
633
        """Return a sequence of values for the properties of
634
           the version specified by serial and the keys, in the order given.
635
           If keys is empty, return all properties in the order
636
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
637
        """
638

    
639
        q = ("select serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster "
640
             "from versions "
641
             "where serial = ?")
642
        self.execute(q, (serial,))
643
        r = self.fetchone()
644
        if r is None:
645
            return r
646

    
647
        if not keys:
648
            return r
649
        return [r[propnames[k]] for k in keys if k in propnames]
650

    
651
    def version_put_property(self, serial, key, value):
652
        """Set value for the property of version specified by key."""
653

    
654
        if key not in _propnames:
655
            return
656
        q = "update versions set %s = ? where serial = ?" % key
657
        self.execute(q, (value, serial))
658

    
659
    def version_recluster(self, serial, cluster,
660
                          update_statistics_ancestors_depth=None):
661
        """Move the version into another cluster."""
662

    
663
        props = self.version_get_properties(serial)
664
        if not props:
665
            return
666
        node = props[NODE]
667
        size = props[SIZE]
668
        oldcluster = props[CLUSTER]
669
        if cluster == oldcluster:
670
            return
671

    
672
        mtime = time()
673
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
674
                                         update_statistics_ancestors_depth)
675
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
676
                                         update_statistics_ancestors_depth)
677

    
678
        q = "update versions set cluster = ? where serial = ?"
679
        self.execute(q, (cluster, serial))
680

    
681
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
682
        """Remove the serial specified."""
683

    
684
        props = self.version_get_properties(serial)
685
        if not props:
686
            return
687
        node = props[NODE]
688
        hash = props[HASH]
689
        size = props[SIZE]
690
        cluster = props[CLUSTER]
691

    
692
        mtime = time()
693
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
694
                                         update_statistics_ancestors_depth)
695

    
696
        q = "delete from versions where serial = ?"
697
        self.execute(q, (serial,))
698

    
699
        props = self.version_lookup(node, cluster=cluster, all_props=False)
700
        if props:
701
            self.nodes_set_latest_version(node, props[0])
702
        return hash, size
703

    
704
    def attribute_get(self, serial, domain, keys=()):
705
        """Return a list of (key, value) pairs of the version specified by serial.
706
           If keys is empty, return all attributes.
707
           Othwerise, return only those specified.
708
        """
709

    
710
        execute = self.execute
711
        if keys:
712
            marks = ','.join('?' for k in keys)
713
            q = ("select key, value from attributes "
714
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
715
            execute(q, keys + (serial, domain))
716
        else:
717
            q = "select key, value from attributes where serial = ? and domain = ?"
718
            execute(q, (serial, domain))
719
        return self.fetchall()
720

    
721
    def attribute_set(self, serial, domain, node, items, is_latest=True):
722
        """Set the attributes of the version specified by serial.
723
           Receive attributes as an iterable of (key, value) pairs.
724
        """
725

    
726
        q = ("insert or replace into attributes "
727
             "(serial, domain, node, is_latest, key, value) "
728
             "values (?, ?, ?, ?, ?, ?)")
729
        self.executemany(q, ((serial, domain, node, is_latest, k, v) for
730
            k, v in items))
731

    
732
    def attribute_del(self, serial, domain, keys=()):
733
        """Delete attributes of the version specified by serial.
734
           If keys is empty, delete all attributes.
735
           Otherwise delete those specified.
736
        """
737

    
738
        if keys:
739
            q = "delete from attributes where serial = ? and domain = ? and key = ?"
740
            self.executemany(q, ((serial, domain, key) for key in keys))
741
        else:
742
            q = "delete from attributes where serial = ? and domain = ?"
743
            self.execute(q, (serial, domain))
744

    
745
    def attribute_copy(self, source, dest):
746
        q = ("insert or replace into attributes "
747
             "select ?, domain, node, is_latest, key, value from attributes "
748
             "where serial = ?")
749
        self.execute(q, (dest, source))
750

    
751
    def attribute_unset_is_latest(self, node, exclude):
752
        q = ("update attributes set is_latest = 0 "
753
             "where node = ? and serial != ?")
754
        self.execute(q, (node, exclude))
755

    
756
    def _construct_filters(self, domain, filterq):
757
        if not domain or not filterq:
758
            return None, None
759

    
760
        subqlist = []
761
        append = subqlist.append
762
        included, excluded, opers = parse_filters(filterq)
763
        args = []
764

    
765
        if included:
766
            subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
767
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
768
            subq += ")"
769
            args += [domain]
770
            args += included
771
            append(subq)
772

    
773
        if excluded:
774
            subq = "not exists (select 1 from attributes where serial = v.serial and domain = ? and "
775
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
776
            subq += ")"
777
            args += [domain]
778
            args += excluded
779
            append(subq)
780

    
781
        if opers:
782
            for k, o, v in opers:
783
                subq = "exists (select 1 from attributes where serial = v.serial and domain = ? and "
784
                subq += "key = ? and value %s ?" % (o,)
785
                subq += ")"
786
                args += [domain, k, v]
787
                append(subq)
788

    
789
        if not subqlist:
790
            return None, None
791

    
792
        subq = ' and ' + ' and '.join(subqlist)
793

    
794
        return subq, args
795

    
796
    def _construct_paths(self, pathq):
797
        if not pathq:
798
            return None, None
799

    
800
        subqlist = []
801
        args = []
802
        for path, match in pathq:
803
            if match == MATCH_PREFIX:
804
                subqlist.append("n.path like ? escape '\\'")
805
                args.append(self.escape_like(path) + '%')
806
            elif match == MATCH_EXACT:
807
                subqlist.append("n.path = ?")
808
                args.append(path)
809

    
810
        subq = ' and (' + ' or '.join(subqlist) + ')'
811
        args = tuple(args)
812

    
813
        return subq, args
814

    
815
    def _construct_size(self, sizeq):
816
        if not sizeq or len(sizeq) != 2:
817
            return None, None
818

    
819
        subq = ''
820
        args = []
821
        if sizeq[0]:
822
            subq += " and v.size >= ?"
823
            args += [sizeq[0]]
824
        if sizeq[1]:
825
            subq += " and v.size < ?"
826
            args += [sizeq[1]]
827

    
828
        return subq, args
829

    
830
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
831
        if before == inf:
832
            q = ("n.latest_version ")
833
            args = []
834
        else:
835
            q = ("(select max(serial) "
836
                 "from versions "
837
                 "where node = v.node and mtime < ?) ")
838
            args = [before]
839
        return q, args
840

    
841
    def _construct_latest_version_subquery(self, node=None, before=inf):
842
        where_cond = "node = v.node"
843
        args = []
844
        if node:
845
            where_cond = "node = ? "
846
            args = [node]
847

    
848
        if before == inf:
849
            q = ("(select latest_version "
850
                 "from nodes "
851
                 "where %s) ")
852
        else:
853
            q = ("(select max(serial) "
854
                 "from versions "
855
                 "where %s and mtime < ?) ")
856
            args += [before]
857
        return q % where_cond, args
858

    
859
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
860
        where_cond = ""
861
        args = []
862
        if nodes:
863
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
864
            args = nodes
865

    
866
        if before == inf:
867
            q = ("(select latest_version "
868
                 "from nodes "
869
                 "where %s ) ")
870
        else:
871
            q = ("(select max(serial) "
872
                 "from versions "
873
                 "where %s and mtime < ? group by node) ")
874
            args += [before]
875
        return q % where_cond, args
876

    
877
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=None):
878
        """Return a list with all keys pairs defined
879
           for all latest versions under parent that
880
           do not belong to the cluster.
881
        """
882

    
883
        pathq = pathq or []
884

    
885
        # TODO: Use another table to store before=inf results.
886
        q = ("select distinct a.key "
887
             "from attributes a, versions v, nodes n "
888
             "where v.serial = %s "
889
             "and v.cluster != ? "
890
             "and v.node in (select node "
891
             "from nodes "
892
             "where parent = ?) "
893
             "and a.serial = v.serial "
894
             "and a.domain = ? "
895
             "and n.node = v.node")
896
        subq, subargs = self._construct_latest_version_subquery(
897
            node=None, before=before)
898
        args = subargs + [except_cluster, parent, domain]
899
        q = q % subq
900
        subq, subargs = self._construct_paths(pathq)
901
        if subq is not None:
902
            q += subq
903
            args += subargs
904
        self.execute(q, args)
905
        return [r[0] for r in self.fetchall()]
906

    
907
    def latest_version_list(self, parent, prefix='', delimiter=None,
908
                            start='', limit=10000, before=inf,
909
                            except_cluster=0, pathq=[], domain=None,
910
                            filterq=[], sizeq=None, all_props=False):
911
        """Return a (list of (path, serial) tuples, list of common prefixes)
912
           for the current versions of the paths with the given parent,
913
           matching the following criteria.
914

915
           The property tuple for a version is returned if all
916
           of these conditions are true:
917

918
                a. parent matches
919

920
                b. path > start
921

922
                c. path starts with prefix (and paths in pathq)
923

924
                d. version is the max up to before
925

926
                e. version is not in cluster
927

928
                f. the path does not have the delimiter occuring
929
                   after the prefix, or ends with the delimiter
930

931
                g. serial matches the attribute filter query.
932

933
                   A filter query is a comma-separated list of
934
                   terms in one of these three forms:
935

936
                   key
937
                       an attribute with this key must exist
938

939
                   !key
940
                       an attribute with this key must not exist
941

942
                   key ?op value
943
                       the attribute with this key satisfies the value
944
                       where ?op is one of =, != <=, >=, <, >.
945

946
                h. the size is in the range set by sizeq
947

948
           The list of common prefixes includes the prefixes
949
           matching up to the first delimiter after prefix,
950
           and are reported only once, as "virtual directories".
951
           The delimiter is included in the prefixes.
952

953
           If arguments are None, then the corresponding matching rule
954
           will always match.
955

956
           Limit applies to the first list of tuples returned.
957

958
           If all_props is True, return all properties after path, not just serial.
959
        """
960

    
961
        execute = self.execute
962

    
963
        if not start or start < prefix:
964
            start = strprevling(prefix)
965
        nextling = strnextling(prefix)
966

    
967
        q = ("select distinct n.path, %s "
968
             "from versions v, nodes n "
969
             "where v.serial = %s "
970
             "and v.cluster != ? "
971
             "and v.node in (select node "
972
             "from nodes "
973
             "where parent = ?) "
974
             "and n.node = v.node "
975
             "and n.path > ? and n.path < ?")
976
        subq, args = self._construct_versions_nodes_latest_version_subquery(
977
            before)
978
        if not all_props:
979
            q = q % ("v.serial", subq)
980
        else:
981
            q = q % ("v.serial, v.node, v.hash, v.size, v.type, v.source, v.mtime, v.muser, v.uuid, v.checksum, v.cluster", subq)
982
        args += [except_cluster, parent, start, nextling]
983
        start_index = len(args) - 2
984

    
985
        subq, subargs = self._construct_paths(pathq)
986
        if subq is not None:
987
            q += subq
988
            args += subargs
989
        subq, subargs = self._construct_size(sizeq)
990
        if subq is not None:
991
            q += subq
992
            args += subargs
993
        subq, subargs = self._construct_filters(domain, filterq)
994
        if subq is not None:
995
            q += subq
996
            args += subargs
997
        else:
998
            q = q.replace("attributes a, ", "")
999
            q = q.replace("and a.serial = v.serial ", "")
1000
        q += " order by n.path"
1001

    
1002
        if not delimiter:
1003
            q += " limit ?"
1004
            args.append(limit)
1005
            execute(q, args)
1006
            return self.fetchall(), ()
1007

    
1008
        pfz = len(prefix)
1009
        dz = len(delimiter)
1010
        count = 0
1011
        fetchone = self.fetchone
1012
        prefixes = []
1013
        pappend = prefixes.append
1014
        matches = []
1015
        mappend = matches.append
1016

    
1017
        execute(q, args)
1018
        while True:
1019
            props = fetchone()
1020
            if props is None:
1021
                break
1022
            path = props[0]
1023
            serial = props[1]
1024
            idx = path.find(delimiter, pfz)
1025

    
1026
            if idx < 0:
1027
                mappend(props)
1028
                count += 1
1029
                if count >= limit:
1030
                    break
1031
                continue
1032

    
1033
            if idx + dz == len(path):
1034
                mappend(props)
1035
                count += 1
1036
                continue  # Get one more, in case there is a path.
1037
            pf = path[:idx + dz]
1038
            pappend(pf)
1039
            if count >= limit:
1040
                break
1041

    
1042
            args[start_index] = strnextling(pf)  # New start.
1043
            execute(q, args)
1044

    
1045
        return matches, prefixes
1046

    
1047
    def latest_uuid(self, uuid, cluster):
1048
        """Return the latest version of the given uuid and cluster.
1049

1050
        Return a (path, serial) tuple.
1051
        If cluster is None, all clusters are considered.
1052

1053
        """
1054
        if cluster is not None:
1055
            cluster_where = "and cluster = ?"
1056
            args = (uuid, int(cluster))
1057
        else:
1058
            cluster_where = ""
1059
            args = (uuid,)
1060

    
1061
        q = ("select n.path, v.serial "
1062
             "from versions v, nodes n "
1063
             "where v.serial = (select max(serial) "
1064
             "from versions "
1065
             "where uuid = ? %s) "
1066
             "and n.node = v.node") % cluster_where
1067
        self.execute(q, args)
1068
        return self.fetchone()
1069

    
1070
    def domain_object_list(self, domain, paths, cluster=None):
1071
        """Return a list of (path, property list, attribute dictionary)
1072
           for the objects in the specific domain and cluster.
1073
        """
1074

    
1075
        q = ("select n.path, v.serial, v.node, v.hash, "
1076
             "v.size, v.type, v.source, v.mtime, v.muser, "
1077
             "v.uuid, v.checksum, v.cluster, a.key, a.value "
1078
             "from nodes n, versions v, attributes a "
1079
             "where v.serial = a.serial and "
1080
             "a.domain = ? and "
1081
             "a.node = n.node and "
1082
             "a.is_latest = 1 and "
1083
             "n.path in (%s)") % ','.join('?' for _ in paths)
1084
        args = [domain]
1085
        map(args.append, paths)
1086
        if cluster != None:
1087
            q += "and v.cluster = ?"
1088
            args += [cluster]
1089

    
1090
        self.execute(q, args)
1091
        rows = self.fetchall()
1092

    
1093
        group_by = itemgetter(slice(12))
1094
        rows.sort(key = group_by)
1095
        groups = groupby(rows, group_by)
1096
        return [(k[0], k[1:], dict([i[12:] for i in data])) \
1097
            for (k, data) in groups]