Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ 2c2513fc

History | View | Annotate | Download (39.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from dbworker import DBWorker
39

    
40
from pithos.backends.filter import parse_filters
41

    
42

    
43
ROOTNODE = 0
44

    
45
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
46
 CLUSTER) = range(11)
47

    
48
(MATCH_PREFIX, MATCH_EXACT) = range(2)
49

    
50
inf = float('inf')
51

    
52

    
53
def strnextling(prefix):
54
    """Return the first unicode string
55
       greater than but not starting with given prefix.
56
       strnextling('hello') -> 'hellp'
57
    """
58
    if not prefix:
59
        ## all strings start with the null string,
60
        ## therefore we have to approximate strnextling('')
61
        ## with the last unicode character supported by python
62
        ## 0x10ffff for wide (32-bit unicode) python builds
63
        ## 0x00ffff for narrow (16-bit unicode) python builds
64
        ## We will not autodetect. 0xffff is safe enough.
65
        return unichr(0xffff)
66
    s = prefix[:-1]
67
    c = ord(prefix[-1])
68
    if c >= 0xffff:
69
        raise RuntimeError
70
    s += unichr(c + 1)
71
    return s
72

    
73

    
74
def strprevling(prefix):
75
    """Return an approximation of the last unicode string
76
       less than but not starting with given prefix.
77
       strprevling(u'hello') -> u'helln\\xffff'
78
    """
79
    if not prefix:
80
        ## There is no prevling for the null string
81
        return prefix
82
    s = prefix[:-1]
83
    c = ord(prefix[-1])
84
    if c > 0:
85
        s += unichr(c - 1) + unichr(0xffff)
86
    return s
87

    
88

    
89
_propnames = {
90
    'serial': 0,
91
    'node': 1,
92
    'hash': 2,
93
    'size': 3,
94
    'type': 4,
95
    'source': 5,
96
    'mtime': 6,
97
    'muser': 7,
98
    'uuid': 8,
99
    'checksum': 9,
100
    'cluster': 10
101
}
102

    
103

    
104
class Node(DBWorker):
105
    """Nodes store path organization and have multiple versions.
106
       Versions store object history and have multiple attributes.
107
       Attributes store metadata.
108
    """
109

    
110
    # TODO: Provide an interface for included and excluded clusters.
111

    
112
    def __init__(self, **params):
113
        DBWorker.__init__(self, **params)
114
        execute = self.execute
115

    
116
        execute(""" pragma foreign_keys = on """)
117

    
118
        execute(""" create table if not exists nodes
119
                          ( node       integer primary key,
120
                            parent     integer default 0,
121
                            path       text    not null default '',
122
                            latest_version     integer,
123
                            foreign key (parent)
124
                            references nodes(node)
125
                            on update cascade
126
                            on delete cascade ) """)
127
        execute(""" create unique index if not exists idx_nodes_path
128
                    on nodes(path) """)
129
        execute(""" create index if not exists idx_nodes_parent
130
                    on nodes(parent) """)
131

    
132
        execute(""" create table if not exists policy
133
                          ( node   integer,
134
                            key    text,
135
                            value  text,
136
                            primary key (node, key)
137
                            foreign key (node)
138
                            references nodes(node)
139
                            on update cascade
140
                            on delete cascade ) """)
141

    
142
        execute(""" create table if not exists statistics
143
                          ( node       integer,
144
                            population integer not null default 0,
145
                            size       integer not null default 0,
146
                            mtime      integer,
147
                            cluster    integer not null default 0,
148
                            primary key (node, cluster)
149
                            foreign key (node)
150
                            references nodes(node)
151
                            on update cascade
152
                            on delete cascade ) """)
153

    
154
        execute(""" create table if not exists versions
155
                          ( serial     integer primary key,
156
                            node       integer,
157
                            hash       text,
158
                            size       integer not null default 0,
159
                            type       text    not null default '',
160
                            source     integer,
161
                            mtime      integer,
162
                            muser      text    not null default '',
163
                            uuid       text    not null default '',
164
                            checksum   text    not null default '',
165
                            cluster    integer not null default 0,
166
                            foreign key (node)
167
                            references nodes(node)
168
                            on update cascade
169
                            on delete cascade ) """)
170
        execute(""" create index if not exists idx_versions_node_mtime
171
                    on versions(node, mtime) """)
172
        execute(""" create index if not exists idx_versions_node_uuid
173
                    on versions(uuid) """)
174

    
175
        execute(""" create table if not exists attributes
176
                          ( serial      integer,
177
                            domain      text,
178
                            key         text,
179
                            value       text,
180
                            node        integer not null    default 0,
181
                            is_latest   boolean not null    default 1,
182
                            primary key (serial, domain, key)
183
                            foreign key (serial)
184
                            references versions(serial)
185
                            on update cascade
186
                            on delete cascade ) """)
187
        execute(""" create index if not exists idx_attributes_domain
188
                    on attributes(domain) """)
189
        execute(""" create index if not exists idx_attributes_serial_node
190
                    on attributes(serial, node) """)
191

    
192
        wrapper = self.wrapper
193
        wrapper.execute()
194
        try:
195
            q = "insert or ignore into nodes(node, parent) values (?, ?)"
196
            execute(q, (ROOTNODE, ROOTNODE))
197
        finally:
198
            wrapper.commit()
199

    
200
    def node_create(self, parent, path):
201
        """Create a new node from the given properties.
202
           Return the node identifier of the new node.
203
        """
204

    
205
        q = ("insert into nodes (parent, path) "
206
             "values (?, ?)")
207
        props = (parent, path)
208
        return self.execute(q, props).lastrowid
209

    
210
    def node_lookup(self, path, for_update=False):
211
        """Lookup the current node of the given path.
212
           Return None if the path is not found.
213

214
           kwargs is not used: it is passed for conformance
215
        """
216

    
217
        q = "select node from nodes where path = ?"
218
        self.execute(q, (path,))
219
        r = self.fetchone()
220
        if r is not None:
221
            return r[0]
222
        return None
223

    
224
    def node_lookup_bulk(self, paths):
225
        """Lookup the current nodes for the given paths.
226
           Return () if the path is not found.
227
        """
228

    
229
        placeholders = ','.join('?' for path in paths)
230
        q = "select node from nodes where path in (%s)" % placeholders
231
        self.execute(q, paths)
232
        r = self.fetchall()
233
        if r is not None:
234
            return [row[0] for row in r]
235
        return None
236

    
237
    def node_get_properties(self, node):
238
        """Return the node's (parent, path).
239
           Return None if the node is not found.
240
        """
241

    
242
        q = "select parent, path from nodes where node = ?"
243
        self.execute(q, (node,))
244
        return self.fetchone()
245

    
246
    def node_get_versions(self, node, keys=(), propnames=_propnames):
247
        """Return the properties of all versions at node.
248
           If keys is empty, return all properties in the order
249
           (serial, node, hash, size, type, source, mtime, muser, uuid,
250
            checksum, cluster).
251
        """
252

    
253
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
254
             "uuid, checksum, cluster "
255
             "from versions "
256
             "where node = ?")
257
        self.execute(q, (node,))
258
        r = self.fetchall()
259
        if r is None:
260
            return r
261

    
262
        if not keys:
263
            return r
264
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
265

    
266
    def node_count_children(self, node):
267
        """Return node's child count."""
268

    
269
        q = "select count(node) from nodes where parent = ? and node != 0"
270
        self.execute(q, (node,))
271
        r = self.fetchone()
272
        if r is None:
273
            return 0
274
        return r[0]
275

    
276
    def node_purge_children(self, parent, before=inf, cluster=0,
277
                            update_statistics_ancestors_depth=None):
278
        """Delete all versions with the specified
279
           parent and cluster, and return
280
           the hashes, the size and the serials of versions deleted.
281
           Clears out nodes with no remaining versions.
282
        """
283

    
284
        execute = self.execute
285
        q = ("select count(serial), sum(size) from versions "
286
             "where node in (select node "
287
             "from nodes "
288
             "where parent = ?) "
289
             "and cluster = ? "
290
             "and mtime <= ?")
291
        args = (parent, cluster, before)
292
        execute(q, args)
293
        nr, size = self.fetchone()
294
        if not nr:
295
            return (), 0, ()
296
        mtime = time()
297
        self.statistics_update(parent, -nr, -size, mtime, cluster)
298
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
299
                                         update_statistics_ancestors_depth)
300

    
301
        q = ("select hash, serial from versions "
302
             "where node in (select node "
303
             "from nodes "
304
             "where parent = ?) "
305
             "and cluster = ? "
306
             "and mtime <= ?")
307
        execute(q, args)
308
        hashes = []
309
        serials = []
310
        for r in self.fetchall():
311
            hashes += [r[0]]
312
            serials += [r[1]]
313

    
314
        q = ("delete from versions "
315
             "where node in (select node "
316
             "from nodes "
317
             "where parent = ?) "
318
             "and cluster = ? "
319
             "and mtime <= ?")
320
        execute(q, args)
321
        q = ("delete from nodes "
322
             "where node in (select node from nodes n "
323
             "where (select count(serial) "
324
             "from versions "
325
             "where node = n.node) = 0 "
326
             "and parent = ?)")
327
        execute(q, (parent,))
328
        return hashes, size, serials
329

    
330
    def node_purge(self, node, before=inf, cluster=0,
331
                   update_statistics_ancestors_depth=None):
332
        """Delete all versions with the specified
333
           node and cluster, and return
334
           the hashes, the size and the serials of versions deleted.
335
           Clears out the node if it has no remaining versions.
336
        """
337

    
338
        execute = self.execute
339
        q = ("select count(serial), sum(size) from versions "
340
             "where node = ? "
341
             "and cluster = ? "
342
             "and mtime <= ?")
343
        args = (node, cluster, before)
344
        execute(q, args)
345
        nr, size = self.fetchone()
346
        if not nr:
347
            return (), 0, ()
348
        mtime = time()
349
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
350
                                         update_statistics_ancestors_depth)
351

    
352
        q = ("select hash, serial from versions "
353
             "where node = ? "
354
             "and cluster = ? "
355
             "and mtime <= ?")
356
        execute(q, args)
357
        hashes = []
358
        serials = []
359
        for r in self.fetchall():
360
            hashes += [r[0]]
361
            serials += [r[1]]
362

    
363
        q = ("delete from versions "
364
             "where node = ? "
365
             "and cluster = ? "
366
             "and mtime <= ?")
367
        execute(q, args)
368
        q = ("delete from nodes "
369
             "where node in (select node from nodes n "
370
             "where (select count(serial) "
371
             "from versions "
372
             "where node = n.node) = 0 "
373
             "and node = ?)")
374
        execute(q, (node,))
375
        return hashes, size, serials
376

    
377
    def node_remove(self, node, update_statistics_ancestors_depth=None):
378
        """Remove the node specified.
379
           Return false if the node has children or is not found.
380
        """
381

    
382
        if self.node_count_children(node):
383
            return False
384

    
385
        mtime = time()
386
        q = ("select count(serial), sum(size), cluster "
387
             "from versions "
388
             "where node = ? "
389
             "group by cluster")
390
        self.execute(q, (node,))
391
        for population, size, cluster in self.fetchall():
392
            self.statistics_update_ancestors(
393
                node, -population, -size, mtime, cluster,
394
                update_statistics_ancestors_depth)
395

    
396
        q = "delete from nodes where node = ?"
397
        self.execute(q, (node,))
398
        return True
399

    
400
    def node_accounts(self, accounts=()):
401
        q = ("select path, node from nodes where node != 0 and parent = 0 ")
402
        args = []
403
        if accounts:
404
            placeholders = ','.join('?' for a in accounts)
405
            q += ("and path in (%s)" % placeholders)
406
            args += accounts
407
        return self.execute(q, args).fetchall()
408

    
409
    def node_account_quotas(self):
410
        q = ("select n.path, p.value from nodes n, policy p "
411
             "where n.node != 0 and n.parent = 0 "
412
             "and n.node = p.node and p.key = 'quota'")
413
        return dict(self.execute(q).fetchall())
414

    
415
    def node_account_usage(self, account=None, cluster=0):
416
        """Return usage for a specific account.
417

418
        Keyword arguments:
419
        account -- (default None: list usage for all the accounts)
420
        cluster -- list current, history or deleted usage (default 0: normal)
421
        """
422

    
423
        q = ("select n3.path, sum(v.size) from "
424
             "versions v, nodes n1, nodes n2, nodes n3 "
425
             "where v.node = n1.node "
426
             "and v.cluster = ? "
427
             "and n1.parent = n2.node "
428
             "and n2.parent = n3.node "
429
             "and n3.parent = 0 "
430
             "and n3.node != 0 ")
431
        args = [cluster]
432
        if account:
433
            q += ("and n3.path = ? ")
434
            args += [account]
435
        q += ("group by n3.path")
436

    
437
        print '###', q, args
438
        self.execute(q, args)
439
        return dict(self.fetchall())
440

    
441
    def policy_get(self, node):
442
        q = "select key, value from policy where node = ?"
443
        self.execute(q, (node,))
444
        return dict(self.fetchall())
445

    
446
    def policy_set(self, node, policy):
447
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
448
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
449

    
450
    def statistics_get(self, node, cluster=0):
451
        """Return population, total size and last mtime
452
           for all versions under node that belong to the cluster.
453
        """
454

    
455
        q = ("select population, size, mtime from statistics "
456
             "where node = ? and cluster = ?")
457
        self.execute(q, (node, cluster))
458
        return self.fetchone()
459

    
460
    def statistics_update(self, node, population, size, mtime, cluster=0):
461
        """Update the statistics of the given node.
462
           Statistics keep track the population, total
463
           size of objects and mtime in the node's namespace.
464
           May be zero or positive or negative numbers.
465
        """
466

    
467
        qs = ("select population, size from statistics "
468
              "where node = ? and cluster = ?")
469
        qu = ("insert or replace into statistics "
470
              "(node, population, size, mtime, cluster) "
471
              "values (?, ?, ?, ?, ?)")
472
        self.execute(qs, (node, cluster))
473
        r = self.fetchone()
474
        if r is None:
475
            prepopulation, presize = (0, 0)
476
        else:
477
            prepopulation, presize = r
478
        population += prepopulation
479
        population = max(population, 0)
480
        size += presize
481
        self.execute(qu, (node, population, size, mtime, cluster))
482

    
483
    def statistics_update_ancestors(self, node, population, size, mtime,
484
                                    cluster=0, recursion_depth=None):
485
        """Update the statistics of the given node's parent.
486
           Then recursively update all parents up to the root.
487
           Population is not recursive.
488
        """
489

    
490
        i = 0
491
        while True:
492
            if node == ROOTNODE:
493
                break
494
            if recursion_depth and recursion_depth <= i:
495
                break
496
            props = self.node_get_properties(node)
497
            if props is None:
498
                break
499
            parent, path = props
500
            self.statistics_update(parent, population, size, mtime, cluster)
501
            node = parent
502
            population = 0  # Population isn't recursive
503
            i += 1
504

    
505
    def statistics_latest(self, node, before=inf, except_cluster=0):
506
        """Return population, total size and last mtime
507
           for all latest versions under node that
508
           do not belong to the cluster.
509
        """
510

    
511
        execute = self.execute
512
        fetchone = self.fetchone
513

    
514
        # The node.
515
        props = self.node_get_properties(node)
516
        if props is None:
517
            return None
518
        parent, path = props
519

    
520
        # The latest version.
521
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
522
             "uuid, checksum, cluster "
523
             "from versions v "
524
             "where serial = %s "
525
             "and cluster != ?")
526
        subq, args = self._construct_latest_version_subquery(
527
            node=node, before=before)
528
        execute(q % subq, args + [except_cluster])
529
        props = fetchone()
530
        if props is None:
531
            return None
532
        mtime = props[MTIME]
533

    
534
        # First level, just under node (get population).
535
        q = ("select count(serial), sum(size), max(mtime) "
536
             "from versions v "
537
             "where serial = %s "
538
             "and cluster != ? "
539
             "and node in (select node "
540
             "from nodes "
541
             "where parent = ?)")
542
        subq, args = self._construct_latest_version_subquery(
543
            node=None, before=before)
544
        execute(q % subq, args + [except_cluster, node])
545
        r = fetchone()
546
        if r is None:
547
            return None
548
        count = r[0]
549
        mtime = max(mtime, r[2])
550
        if count == 0:
551
            return (0, 0, mtime)
552

    
553
        # All children (get size and mtime).
554
        # This is why the full path is stored.
555
        q = ("select count(serial), sum(size), max(mtime) "
556
             "from versions v "
557
             "where serial = %s "
558
             "and cluster != ? "
559
             "and node in (select node "
560
             "from nodes "
561
             "where path like ? escape '\\')")
562
        subq, args = self._construct_latest_version_subquery(
563
            node=None, before=before)
564
        execute(
565
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
566
        r = fetchone()
567
        if r is None:
568
            return None
569
        size = r[1] - props[SIZE]
570
        mtime = max(mtime, r[2])
571
        return (count, size, mtime)
572

    
573
    def nodes_set_latest_version(self, node, serial):
574
        q = ("update nodes set latest_version = ? where node = ?")
575
        props = (serial, node)
576
        self.execute(q, props)
577

    
578
    def version_create(self, node, hash, size, type, source, muser, uuid,
579
                       checksum, cluster=0,
580
                       update_statistics_ancestors_depth=None):
581
        """Create a new version from the given properties.
582
           Return the (serial, mtime) of the new version.
583
        """
584

    
585
        q = ("insert into versions (node, hash, size, type, source, mtime, "
586
             "muser, uuid, checksum, cluster) "
587
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
588
        mtime = time()
589
        props = (node, hash, size, type, source, mtime, muser,
590
                 uuid, checksum, cluster)
591
        serial = self.execute(q, props).lastrowid
592
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
593
                                         update_statistics_ancestors_depth)
594

    
595
        self.nodes_set_latest_version(node, serial)
596

    
597
        return serial, mtime
598

    
599
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
600
        """Lookup the current version of the given node.
601
           Return a list with its properties:
602
           (serial, node, hash, size, type, source, mtime,
603
            muser, uuid, checksum, cluster)
604
           or None if the current version is not found in the given cluster.
605
        """
606

    
607
        q = ("select %s "
608
             "from versions v "
609
             "where serial = %s "
610
             "and cluster = ?")
611
        subq, args = self._construct_latest_version_subquery(
612
            node=node, before=before)
613
        if not all_props:
614
            q = q % ("serial", subq)
615
        else:
616
            q = q % (("serial, node, hash, size, type, source, mtime, muser, "
617
                      "uuid, checksum, cluster"),
618
                     subq)
619

    
620
        self.execute(q, args + [cluster])
621
        props = self.fetchone()
622
        if props is not None:
623
            return props
624
        return None
625

    
626
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
627
                            all_props=True, order_by_path=False):
628
        """Lookup the current versions of the given nodes.
629
           Return a list with their properties:
630
           (serial, node, hash, size, type, source, mtime, muser, uuid,
631
            checksum, cluster).
632
        """
633

    
634
        if not nodes:
635
            return ()
636
        q = ("select %s "
637
             "from versions v, nodes n "
638
             "where serial in %s "
639
             "and v.node = n.node "
640
             "and cluster = ? %s ")
641
        subq, args = self._construct_latest_versions_subquery(
642
            nodes=nodes, before=before)
643
        if not all_props:
644
            q = q % ("serial", subq, '')
645
        else:
646
            q = q % (("serial, v.node, hash, size, type, source, mtime, "
647
                      "muser, uuid, checksum, cluster"),
648
                     subq,
649
                     "order by path" if order_by_path else "")
650

    
651
        args += [cluster]
652
        self.execute(q, args)
653
        return self.fetchall()
654

    
655
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
656
                               node=None):
657
        """Return a sequence of values for the properties of
658
           the version specified by serial and the keys, in the order given.
659
           If keys is empty, return all properties in the order
660
           (serial, node, hash, size, type, source, mtime, muser, uuid,
661
            checksum, cluster).
662
        """
663

    
664
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
665
             "uuid, checksum, cluster "
666
             "from versions "
667
             "where serial = ? ")
668
        args = [serial]
669
        if node is not None:
670
            q += ("and node = ?")
671
            args += [node]
672
        self.execute(q, args)
673
        r = self.fetchone()
674
        if r is None:
675
            return r
676

    
677
        if not keys:
678
            return r
679
        return [r[propnames[k]] for k in keys if k in propnames]
680

    
681
    def version_put_property(self, serial, key, value):
682
        """Set value for the property of version specified by key."""
683

    
684
        if key not in _propnames:
685
            return
686
        q = "update versions set %s = ? where serial = ?" % key
687
        self.execute(q, (value, serial))
688

    
689
    def version_recluster(self, serial, cluster,
690
                          update_statistics_ancestors_depth=None):
691
        """Move the version into another cluster."""
692

    
693
        props = self.version_get_properties(serial)
694
        if not props:
695
            return
696
        node = props[NODE]
697
        size = props[SIZE]
698
        oldcluster = props[CLUSTER]
699
        if cluster == oldcluster:
700
            return
701

    
702
        mtime = time()
703
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
704
                                         update_statistics_ancestors_depth)
705
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
706
                                         update_statistics_ancestors_depth)
707

    
708
        q = "update versions set cluster = ? where serial = ?"
709
        self.execute(q, (cluster, serial))
710

    
711
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
712
        """Remove the serial specified."""
713

    
714
        props = self.version_get_properties(serial)
715
        if not props:
716
            return
717
        node = props[NODE]
718
        hash = props[HASH]
719
        size = props[SIZE]
720
        cluster = props[CLUSTER]
721

    
722
        mtime = time()
723
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
724
                                         update_statistics_ancestors_depth)
725

    
726
        q = "delete from versions where serial = ?"
727
        self.execute(q, (serial,))
728

    
729
        props = self.version_lookup(node, cluster=cluster, all_props=False)
730
        if props:
731
            self.nodes_set_latest_version(node, props[0])
732
        return hash, size
733

    
734
    def attribute_get(self, serial, domain, keys=()):
735
        """Return a list of (key, value) pairs of the specific version.
736

737
           If keys is empty, return all attributes.
738
           Othwerise, return only those specified.
739
        """
740

    
741
        execute = self.execute
742
        if keys:
743
            marks = ','.join('?' for k in keys)
744
            q = ("select key, value from attributes "
745
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
746
            execute(q, keys + (serial, domain))
747
        else:
748
            q = ("select key, value from attributes where "
749
                 "serial = ? and domain = ?")
750
            execute(q, (serial, domain))
751
        return self.fetchall()
752

    
753
    def attribute_set(self, serial, domain, node, items, is_latest=True):
754
        """Set the attributes of the version specified by serial.
755
           Receive attributes as an iterable of (key, value) pairs.
756
        """
757

    
758
        q = ("insert or replace into attributes "
759
             "(serial, domain, node, is_latest, key, value) "
760
             "values (?, ?, ?, ?, ?, ?)")
761
        self.executemany(q, ((serial, domain, node, is_latest, k, v) for
762
                         k, v in items))
763

    
764
    def attribute_del(self, serial, domain, keys=()):
765
        """Delete attributes of the version specified by serial.
766
           If keys is empty, delete all attributes.
767
           Otherwise delete those specified.
768
        """
769

    
770
        if keys:
771
            q = ("delete from attributes "
772
                 "where serial = ? and domain = ? and key = ?")
773
            self.executemany(q, ((serial, domain, key) for key in keys))
774
        else:
775
            q = "delete from attributes where serial = ? and domain = ?"
776
            self.execute(q, (serial, domain))
777

    
778
    def attribute_copy(self, source, dest):
779
        q = ("insert or replace into attributes "
780
             "(serial, domain, node, is_latest, key, value) "
781
             "select ?, domain, node, is_latest, key, value from attributes "
782
             "where serial = ?")
783
        self.execute(q, (dest, source))
784

    
785
    def attribute_unset_is_latest(self, node, exclude):
786
        q = ("update attributes set is_latest = 0 "
787
             "where node = ? and serial != ?")
788
        self.execute(q, (node, exclude))
789

    
790
    def _construct_filters(self, domain, filterq):
791
        if not domain or not filterq:
792
            return None, None
793

    
794
        subqlist = []
795
        append = subqlist.append
796
        included, excluded, opers = parse_filters(filterq)
797
        args = []
798

    
799
        if included:
800
            subq = ("exists (select 1 from attributes where serial = v.serial "
801
                    "and domain = ? and ")
802
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
803
            subq += ")"
804
            args += [domain]
805
            args += included
806
            append(subq)
807

    
808
        if excluded:
809
            subq = ("not exists (select 1 from attributes where "
810
                    "serial = v.serial and domain = ? and ")
811
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
812
            subq += ")"
813
            args += [domain]
814
            args += excluded
815
            append(subq)
816

    
817
        if opers:
818
            for k, o, v in opers:
819
                subq = ("exists (select 1 from attributes where "
820
                        "serial = v.serial and domain = ? and ")
821
                subq += "key = ? and value %s ?" % (o,)
822
                subq += ")"
823
                args += [domain, k, v]
824
                append(subq)
825

    
826
        if not subqlist:
827
            return None, None
828

    
829
        subq = ' and ' + ' and '.join(subqlist)
830

    
831
        return subq, args
832

    
833
    def _construct_paths(self, pathq):
834
        if not pathq:
835
            return None, None
836

    
837
        subqlist = []
838
        args = []
839
        for path, match in pathq:
840
            if match == MATCH_PREFIX:
841
                subqlist.append("n.path like ? escape '\\'")
842
                args.append(self.escape_like(path) + '%')
843
            elif match == MATCH_EXACT:
844
                subqlist.append("n.path = ?")
845
                args.append(path)
846

    
847
        subq = ' and (' + ' or '.join(subqlist) + ')'
848
        args = tuple(args)
849

    
850
        return subq, args
851

    
852
    def _construct_size(self, sizeq):
853
        if not sizeq or len(sizeq) != 2:
854
            return None, None
855

    
856
        subq = ''
857
        args = []
858
        if sizeq[0]:
859
            subq += " and v.size >= ?"
860
            args += [sizeq[0]]
861
        if sizeq[1]:
862
            subq += " and v.size < ?"
863
            args += [sizeq[1]]
864

    
865
        return subq, args
866

    
867
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
868
        if before == inf:
869
            q = ("n.latest_version ")
870
            args = []
871
        else:
872
            q = ("(select max(serial) "
873
                 "from versions "
874
                 "where node = v.node and mtime < ?) ")
875
            args = [before]
876
        return q, args
877

    
878
    def _construct_latest_version_subquery(self, node=None, before=inf):
879
        where_cond = "node = v.node"
880
        args = []
881
        if node:
882
            where_cond = "node = ? "
883
            args = [node]
884

    
885
        if before == inf:
886
            q = ("(select latest_version "
887
                 "from nodes "
888
                 "where %s) ")
889
        else:
890
            q = ("(select max(serial) "
891
                 "from versions "
892
                 "where %s and mtime < ?) ")
893
            args += [before]
894
        return q % where_cond, args
895

    
896
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
897
        where_cond = ""
898
        args = []
899
        if nodes:
900
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
901
            args = nodes
902

    
903
        if before == inf:
904
            q = ("(select latest_version "
905
                 "from nodes "
906
                 "where %s ) ")
907
        else:
908
            q = ("(select max(serial) "
909
                 "from versions "
910
                 "where %s and mtime < ? group by node) ")
911
            args += [before]
912
        return q % where_cond, args
913

    
914
    def latest_attribute_keys(self, parent, domain, before=inf,
915
                              except_cluster=0, pathq=None):
916
        """Return a list with all keys pairs defined
917
           for all latest versions under parent that
918
           do not belong to the cluster.
919
        """
920

    
921
        pathq = pathq or []
922

    
923
        # TODO: Use another table to store before=inf results.
924
        q = ("select distinct a.key "
925
             "from attributes a, versions v, nodes n "
926
             "where v.serial = %s "
927
             "and v.cluster != ? "
928
             "and v.node in (select node "
929
             "from nodes "
930
             "where parent = ?) "
931
             "and a.serial = v.serial "
932
             "and a.domain = ? "
933
             "and n.node = v.node")
934
        subq, subargs = self._construct_latest_version_subquery(
935
            node=None, before=before)
936
        args = subargs + [except_cluster, parent, domain]
937
        q = q % subq
938
        subq, subargs = self._construct_paths(pathq)
939
        if subq is not None:
940
            q += subq
941
            args += subargs
942
        self.execute(q, args)
943
        return [r[0] for r in self.fetchall()]
944

    
945
    def latest_version_list(self, parent, prefix='', delimiter=None,
946
                            start='', limit=10000, before=inf,
947
                            except_cluster=0, pathq=[], domain=None,
948
                            filterq=[], sizeq=None, all_props=False):
949
        """Return a (list of (path, serial) tuples, list of common prefixes)
950
           for the current versions of the paths with the given parent,
951
           matching the following criteria.
952

953
           The property tuple for a version is returned if all
954
           of these conditions are true:
955

956
                a. parent matches
957

958
                b. path > start
959

960
                c. path starts with prefix (and paths in pathq)
961

962
                d. version is the max up to before
963

964
                e. version is not in cluster
965

966
                f. the path does not have the delimiter occuring
967
                   after the prefix, or ends with the delimiter
968

969
                g. serial matches the attribute filter query.
970

971
                   A filter query is a comma-separated list of
972
                   terms in one of these three forms:
973

974
                   key
975
                       an attribute with this key must exist
976

977
                   !key
978
                       an attribute with this key must not exist
979

980
                   key ?op value
981
                       the attribute with this key satisfies the value
982
                       where ?op is one of =, != <=, >=, <, >.
983

984
                h. the size is in the range set by sizeq
985

986
           The list of common prefixes includes the prefixes
987
           matching up to the first delimiter after prefix,
988
           and are reported only once, as "virtual directories".
989
           The delimiter is included in the prefixes.
990

991
           If arguments are None, then the corresponding matching rule
992
           will always match.
993

994
           Limit applies to the first list of tuples returned.
995

996
           If all_props is True, return all properties after path,
997
           not just serial.
998
        """
999

    
1000
        execute = self.execute
1001

    
1002
        if not start or start < prefix:
1003
            start = strprevling(prefix)
1004
        nextling = strnextling(prefix)
1005

    
1006
        q = ("select distinct n.path, %s "
1007
             "from versions v, nodes n "
1008
             "where v.serial = %s "
1009
             "and v.cluster != ? "
1010
             "and v.node in (select node "
1011
             "from nodes "
1012
             "where parent = ?) "
1013
             "and n.node = v.node "
1014
             "and n.path > ? and n.path < ?")
1015
        subq, args = self._construct_versions_nodes_latest_version_subquery(
1016
            before)
1017
        if not all_props:
1018
            q = q % ("v.serial", subq)
1019
        else:
1020
            q = q % (("v.serial, v.node, v.hash, v.size, v.type, v.source, "
1021
                      "v.mtime, v.muser, v.uuid, v.checksum, v.cluster"),
1022
                     subq)
1023
        args += [except_cluster, parent, start, nextling]
1024
        start_index = len(args) - 2
1025

    
1026
        subq, subargs = self._construct_paths(pathq)
1027
        if subq is not None:
1028
            q += subq
1029
            args += subargs
1030
        subq, subargs = self._construct_size(sizeq)
1031
        if subq is not None:
1032
            q += subq
1033
            args += subargs
1034
        subq, subargs = self._construct_filters(domain, filterq)
1035
        if subq is not None:
1036
            q += subq
1037
            args += subargs
1038
        else:
1039
            q = q.replace("attributes a, ", "")
1040
            q = q.replace("and a.serial = v.serial ", "")
1041
        q += " order by n.path"
1042

    
1043
        if not delimiter:
1044
            q += " limit ?"
1045
            args.append(limit)
1046
            execute(q, args)
1047
            return self.fetchall(), ()
1048

    
1049
        pfz = len(prefix)
1050
        dz = len(delimiter)
1051
        count = 0
1052
        fetchone = self.fetchone
1053
        prefixes = []
1054
        pappend = prefixes.append
1055
        matches = []
1056
        mappend = matches.append
1057

    
1058
        execute(q, args)
1059
        while True:
1060
            props = fetchone()
1061
            if props is None:
1062
                break
1063
            path = props[0]
1064
            idx = path.find(delimiter, pfz)
1065

    
1066
            if idx < 0:
1067
                mappend(props)
1068
                count += 1
1069
                if count >= limit:
1070
                    break
1071
                continue
1072

    
1073
            if idx + dz == len(path):
1074
                mappend(props)
1075
                count += 1
1076
                continue  # Get one more, in case there is a path.
1077
            pf = path[:idx + dz]
1078
            pappend(pf)
1079
            if count >= limit:
1080
                break
1081

    
1082
            args[start_index] = strnextling(pf)  # New start.
1083
            execute(q, args)
1084

    
1085
        return matches, prefixes
1086

    
1087
    def latest_uuid(self, uuid, cluster):
1088
        """Return the latest version of the given uuid and cluster.
1089

1090
        Return a (path, serial) tuple.
1091
        If cluster is None, all clusters are considered.
1092

1093
        """
1094
        if cluster is not None:
1095
            cluster_where = "and cluster = ?"
1096
            args = (uuid, int(cluster))
1097
        else:
1098
            cluster_where = ""
1099
            args = (uuid,)
1100

    
1101
        q = ("select n.path, v.serial "
1102
             "from versions v, nodes n "
1103
             "where v.serial = (select max(serial) "
1104
             "from versions "
1105
             "where uuid = ? %s) "
1106
             "and n.node = v.node") % cluster_where
1107
        self.execute(q, args)
1108
        return self.fetchone()
1109

    
1110
    def domain_object_list(self, domain, paths, cluster=None):
1111
        """Return a list of (path, property list, attribute dictionary)
1112
           for the objects in the specific domain and cluster.
1113
        """
1114

    
1115
        q = ("select n.path, v.serial, v.node, v.hash, "
1116
             "v.size, v.type, v.source, v.mtime, v.muser, "
1117
             "v.uuid, v.checksum, v.cluster, a.key, a.value "
1118
             "from nodes n, versions v, attributes a "
1119
             "where v.serial = a.serial and "
1120
             "a.domain = ? and "
1121
             "a.node = n.node and "
1122
             "a.is_latest = 1 and "
1123
             "n.path in (%s)") % ','.join('?' for _ in paths)
1124
        args = [domain]
1125
        map(args.append, paths)
1126
        if cluster is not None:
1127
            q += "and v.cluster = ?"
1128
            args += [cluster]
1129

    
1130
        self.execute(q, args)
1131
        rows = self.fetchall()
1132

    
1133
        group_by = itemgetter(slice(12))
1134
        rows.sort(key=group_by)
1135
        groups = groupby(rows, group_by)
1136
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1137
                (k, data) in groups]
1138

    
1139
    def get_props(self, paths):
1140
        q = ("select distinct n.path, v.type "
1141
             "from nodes n inner join versions v "
1142
             "on v.serial = n.latest_version "
1143
             "where n.path in (%s)") % ','.join('?' for _ in paths)
1144
        self.execute(q, paths)
1145
        return self.fetchall()