Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ 78e1f8da

History | View | Annotate | Download (40 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from dbworker import DBWorker
39

    
40
from pithos.backends.filter import parse_filters
41

    
42

    
43
ROOTNODE = 0
44

    
45
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
46
 CLUSTER) = range(11)
47

    
48
(MATCH_PREFIX, MATCH_EXACT) = range(2)
49

    
50
inf = float('inf')
51

    
52

    
53
def strnextling(prefix):
54
    """Return the first unicode string
55
       greater than but not starting with given prefix.
56
       strnextling('hello') -> 'hellp'
57
    """
58
    if not prefix:
59
        ## all strings start with the null string,
60
        ## therefore we have to approximate strnextling('')
61
        ## with the last unicode character supported by python
62
        ## 0x10ffff for wide (32-bit unicode) python builds
63
        ## 0x00ffff for narrow (16-bit unicode) python builds
64
        ## We will not autodetect. 0xffff is safe enough.
65
        return unichr(0xffff)
66
    s = prefix[:-1]
67
    c = ord(prefix[-1])
68
    if c >= 0xffff:
69
        raise RuntimeError
70
    s += unichr(c + 1)
71
    return s
72

    
73

    
74
def strprevling(prefix):
75
    """Return an approximation of the last unicode string
76
       less than but not starting with given prefix.
77
       strprevling(u'hello') -> u'helln\\xffff'
78
    """
79
    if not prefix:
80
        ## There is no prevling for the null string
81
        return prefix
82
    s = prefix[:-1]
83
    c = ord(prefix[-1])
84
    if c > 0:
85
        s += unichr(c - 1) + unichr(0xffff)
86
    return s
87

    
88

    
89
_propnames = {
90
    'serial': 0,
91
    'node': 1,
92
    'hash': 2,
93
    'size': 3,
94
    'type': 4,
95
    'source': 5,
96
    'mtime': 6,
97
    'muser': 7,
98
    'uuid': 8,
99
    'checksum': 9,
100
    'cluster': 10
101
}
102

    
103

    
104
class Node(DBWorker):
105
    """Nodes store path organization and have multiple versions.
106
       Versions store object history and have multiple attributes.
107
       Attributes store metadata.
108
    """
109

    
110
    # TODO: Provide an interface for included and excluded clusters.
111

    
112
    def __init__(self, **params):
113
        DBWorker.__init__(self, **params)
114
        execute = self.execute
115

    
116
        execute(""" pragma foreign_keys = on """)
117

    
118
        execute(""" create table if not exists nodes
119
                          ( node       integer primary key,
120
                            parent     integer default 0,
121
                            path       text    not null default '',
122
                            latest_version     integer,
123
                            foreign key (parent)
124
                            references nodes(node)
125
                            on update cascade
126
                            on delete cascade ) """)
127
        execute(""" create unique index if not exists idx_nodes_path
128
                    on nodes(path) """)
129
        execute(""" create index if not exists idx_nodes_parent
130
                    on nodes(parent) """)
131

    
132
        execute(""" create table if not exists policy
133
                          ( node   integer,
134
                            key    text,
135
                            value  text,
136
                            primary key (node, key)
137
                            foreign key (node)
138
                            references nodes(node)
139
                            on update cascade
140
                            on delete cascade ) """)
141

    
142
        execute(""" create table if not exists statistics
143
                          ( node       integer,
144
                            population integer not null default 0,
145
                            size       integer not null default 0,
146
                            mtime      integer,
147
                            cluster    integer not null default 0,
148
                            primary key (node, cluster)
149
                            foreign key (node)
150
                            references nodes(node)
151
                            on update cascade
152
                            on delete cascade ) """)
153

    
154
        execute(""" create table if not exists versions
155
                          ( serial     integer primary key,
156
                            node       integer,
157
                            hash       text,
158
                            size       integer not null default 0,
159
                            type       text    not null default '',
160
                            source     integer,
161
                            mtime      integer,
162
                            muser      text    not null default '',
163
                            uuid       text    not null default '',
164
                            checksum   text    not null default '',
165
                            cluster    integer not null default 0,
166
                            foreign key (node)
167
                            references nodes(node)
168
                            on update cascade
169
                            on delete cascade ) """)
170
        execute(""" create index if not exists idx_versions_node_mtime
171
                    on versions(node, mtime) """)
172
        execute(""" create index if not exists idx_versions_node_uuid
173
                    on versions(uuid) """)
174

    
175
        execute(""" create table if not exists attributes
176
                          ( serial      integer,
177
                            domain      text,
178
                            key         text,
179
                            value       text,
180
                            node        integer not null    default 0,
181
                            is_latest   boolean not null    default 1,
182
                            primary key (serial, domain, key)
183
                            foreign key (serial)
184
                            references versions(serial)
185
                            on update cascade
186
                            on delete cascade ) """)
187
        execute(""" create index if not exists idx_attributes_domain
188
                    on attributes(domain) """)
189
        execute(""" create index if not exists idx_attributes_serial_node
190
                    on attributes(serial, node) """)
191

    
192
        wrapper = self.wrapper
193
        wrapper.execute()
194
        try:
195
            q = "insert or ignore into nodes(node, parent) values (?, ?)"
196
            execute(q, (ROOTNODE, ROOTNODE))
197
        finally:
198
            wrapper.commit()
199

    
200
    def node_create(self, parent, path):
201
        """Create a new node from the given properties.
202
           Return the node identifier of the new node.
203
        """
204

    
205
        q = ("insert into nodes (parent, path) "
206
             "values (?, ?)")
207
        props = (parent, path)
208
        return self.execute(q, props).lastrowid
209

    
210
    def node_lookup(self, path, for_update=False):
211
        """Lookup the current node of the given path.
212
           Return None if the path is not found.
213

214
           kwargs is not used: it is passed for conformance
215
        """
216

    
217
        q = "select node from nodes where path = ?"
218
        self.execute(q, (path,))
219
        r = self.fetchone()
220
        if r is not None:
221
            return r[0]
222
        return None
223

    
224
    def node_lookup_bulk(self, paths):
225
        """Lookup the current nodes for the given paths.
226
           Return () if the path is not found.
227
        """
228

    
229
        placeholders = ','.join('?' for path in paths)
230
        q = "select node from nodes where path in (%s)" % placeholders
231
        self.execute(q, paths)
232
        r = self.fetchall()
233
        if r is not None:
234
            return [row[0] for row in r]
235
        return None
236

    
237
    def node_get_properties(self, node):
238
        """Return the node's (parent, path).
239
           Return None if the node is not found.
240
        """
241

    
242
        q = "select parent, path from nodes where node = ?"
243
        self.execute(q, (node,))
244
        return self.fetchone()
245

    
246
    def node_get_parent_path(self, node):
247
        """Return the node's parent path.
248
           Return None if the node is not found.
249
        """
250

    
251
        q = ("select path from nodes as n1, nodes as n2 "
252
             "where n2.node = n1.parent and n1.node = ?")
253
        self.execute(q, (node,))
254
        l = self.fetchone()
255
        return l[0] if l is not None else None
256

    
257
    def node_get_versions(self, node, keys=(), propnames=_propnames):
258
        """Return the properties of all versions at node.
259
           If keys is empty, return all properties in the order
260
           (serial, node, hash, size, type, source, mtime, muser, uuid,
261
            checksum, cluster).
262
        """
263

    
264
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
265
             "uuid, checksum, cluster "
266
             "from versions "
267
             "where node = ?")
268
        self.execute(q, (node,))
269
        r = self.fetchall()
270
        if r is None:
271
            return r
272

    
273
        if not keys:
274
            return r
275
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
276

    
277
    def node_count_children(self, node):
278
        """Return node's child count."""
279

    
280
        q = "select count(node) from nodes where parent = ? and node != 0"
281
        self.execute(q, (node,))
282
        r = self.fetchone()
283
        if r is None:
284
            return 0
285
        return r[0]
286

    
287
    def node_purge_children(self, parent, before=inf, cluster=0,
288
                            update_statistics_ancestors_depth=None):
289
        """Delete all versions with the specified
290
           parent and cluster, and return
291
           the hashes, the size and the serials of versions deleted.
292
           Clears out nodes with no remaining versions.
293
        """
294

    
295
        execute = self.execute
296
        q = ("select count(serial), sum(size) from versions "
297
             "where node in (select node "
298
             "from nodes "
299
             "where parent = ?) "
300
             "and cluster = ? "
301
             "and mtime <= ?")
302
        args = (parent, cluster, before)
303
        execute(q, args)
304
        nr, size = self.fetchone()
305
        if not nr:
306
            return (), 0, ()
307
        mtime = time()
308
        self.statistics_update(parent, -nr, -size, mtime, cluster)
309
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
310
                                         update_statistics_ancestors_depth)
311

    
312
        q = ("select hash, serial from versions "
313
             "where node in (select node "
314
             "from nodes "
315
             "where parent = ?) "
316
             "and cluster = ? "
317
             "and mtime <= ?")
318
        execute(q, args)
319
        hashes = []
320
        serials = []
321
        for r in self.fetchall():
322
            hashes += [r[0]]
323
            serials += [r[1]]
324

    
325
        q = ("delete from versions "
326
             "where node in (select node "
327
             "from nodes "
328
             "where parent = ?) "
329
             "and cluster = ? "
330
             "and mtime <= ?")
331
        execute(q, args)
332
        q = ("delete from nodes "
333
             "where node in (select node from nodes n "
334
             "where (select count(serial) "
335
             "from versions "
336
             "where node = n.node) = 0 "
337
             "and parent = ?)")
338
        execute(q, (parent,))
339
        return hashes, size, serials
340

    
341
    def node_purge(self, node, before=inf, cluster=0,
342
                   update_statistics_ancestors_depth=None):
343
        """Delete all versions with the specified
344
           node and cluster, and return
345
           the hashes, the size and the serials of versions deleted.
346
           Clears out the node if it has no remaining versions.
347
        """
348

    
349
        execute = self.execute
350
        q = ("select count(serial), sum(size) from versions "
351
             "where node = ? "
352
             "and cluster = ? "
353
             "and mtime <= ?")
354
        args = (node, cluster, before)
355
        execute(q, args)
356
        nr, size = self.fetchone()
357
        if not nr:
358
            return (), 0, ()
359
        mtime = time()
360
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
361
                                         update_statistics_ancestors_depth)
362

    
363
        q = ("select hash, serial from versions "
364
             "where node = ? "
365
             "and cluster = ? "
366
             "and mtime <= ?")
367
        execute(q, args)
368
        hashes = []
369
        serials = []
370
        for r in self.fetchall():
371
            hashes += [r[0]]
372
            serials += [r[1]]
373

    
374
        q = ("delete from versions "
375
             "where node = ? "
376
             "and cluster = ? "
377
             "and mtime <= ?")
378
        execute(q, args)
379
        q = ("delete from nodes "
380
             "where node in (select node from nodes n "
381
             "where (select count(serial) "
382
             "from versions "
383
             "where node = n.node) = 0 "
384
             "and node = ?)")
385
        execute(q, (node,))
386
        return hashes, size, serials
387

    
388
    def node_remove(self, node, update_statistics_ancestors_depth=None):
389
        """Remove the node specified.
390
           Return false if the node has children or is not found.
391
        """
392

    
393
        if self.node_count_children(node):
394
            return False
395

    
396
        mtime = time()
397
        q = ("select count(serial), sum(size), cluster "
398
             "from versions "
399
             "where node = ? "
400
             "group by cluster")
401
        self.execute(q, (node,))
402
        for population, size, cluster in self.fetchall():
403
            self.statistics_update_ancestors(
404
                node, -population, -size, mtime, cluster,
405
                update_statistics_ancestors_depth)
406

    
407
        q = "delete from nodes where node = ?"
408
        self.execute(q, (node,))
409
        return True
410

    
411
    def node_accounts(self, accounts=()):
412
        q = ("select path, node from nodes where node != 0 and parent = 0 ")
413
        args = []
414
        if accounts:
415
            placeholders = ','.join('?' for a in accounts)
416
            q += ("and path in (%s)" % placeholders)
417
            args += accounts
418
        return self.execute(q, args).fetchall()
419

    
420
    def node_account_quotas(self):
421
        q = ("select n.path, p.value from nodes n, policy p "
422
             "where n.node != 0 and n.parent = 0 "
423
             "and n.node = p.node and p.key = 'quota'")
424
        return dict(self.execute(q).fetchall())
425

    
426
    def node_account_usage(self, account=None, cluster=0):
427
        """Return usage for a specific account.
428

429
        Keyword arguments:
430
        account -- (default None: list usage for all the accounts)
431
        cluster -- list current, history or deleted usage (default 0: normal)
432
        """
433

    
434
        q = ("select n3.path, sum(v.size) from "
435
             "versions v, nodes n1, nodes n2, nodes n3 "
436
             "where v.node = n1.node "
437
             "and v.cluster = ? "
438
             "and n1.parent = n2.node "
439
             "and n2.parent = n3.node "
440
             "and n3.parent = 0 "
441
             "and n3.node != 0 ")
442
        args = [cluster]
443
        if account:
444
            q += ("and n3.path = ? ")
445
            args += [account]
446
        q += ("group by n3.path")
447

    
448
        print '###', q, args
449
        self.execute(q, args)
450
        return dict(self.fetchall())
451

    
452
    def policy_get(self, node):
453
        q = "select key, value from policy where node = ?"
454
        self.execute(q, (node,))
455
        return dict(self.fetchall())
456

    
457
    def policy_set(self, node, policy):
458
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
459
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
460

    
461
    def statistics_get(self, node, cluster=0):
462
        """Return population, total size and last mtime
463
           for all versions under node that belong to the cluster.
464
        """
465

    
466
        q = ("select population, size, mtime from statistics "
467
             "where node = ? and cluster = ?")
468
        self.execute(q, (node, cluster))
469
        return self.fetchone()
470

    
471
    def statistics_update(self, node, population, size, mtime, cluster=0):
472
        """Update the statistics of the given node.
473
           Statistics keep track the population, total
474
           size of objects and mtime in the node's namespace.
475
           May be zero or positive or negative numbers.
476
        """
477

    
478
        qs = ("select population, size from statistics "
479
              "where node = ? and cluster = ?")
480
        qu = ("insert or replace into statistics "
481
              "(node, population, size, mtime, cluster) "
482
              "values (?, ?, ?, ?, ?)")
483
        self.execute(qs, (node, cluster))
484
        r = self.fetchone()
485
        if r is None:
486
            prepopulation, presize = (0, 0)
487
        else:
488
            prepopulation, presize = r
489
        population += prepopulation
490
        population = max(population, 0)
491
        size += presize
492
        self.execute(qu, (node, population, size, mtime, cluster))
493

    
494
    def statistics_update_ancestors(self, node, population, size, mtime,
495
                                    cluster=0, recursion_depth=None):
496
        """Update the statistics of the given node's parent.
497
           Then recursively update all parents up to the root.
498
           Population is not recursive.
499
        """
500

    
501
        i = 0
502
        while True:
503
            if node == ROOTNODE:
504
                break
505
            if recursion_depth and recursion_depth <= i:
506
                break
507
            props = self.node_get_properties(node)
508
            if props is None:
509
                break
510
            parent, path = props
511
            self.statistics_update(parent, population, size, mtime, cluster)
512
            node = parent
513
            population = 0  # Population isn't recursive
514
            i += 1
515

    
516
    def statistics_latest(self, node, before=inf, except_cluster=0):
517
        """Return population, total size and last mtime
518
           for all latest versions under node that
519
           do not belong to the cluster.
520
        """
521

    
522
        execute = self.execute
523
        fetchone = self.fetchone
524

    
525
        # The node.
526
        props = self.node_get_properties(node)
527
        if props is None:
528
            return None
529
        parent, path = props
530

    
531
        # The latest version.
532
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
533
             "uuid, checksum, cluster "
534
             "from versions v "
535
             "where serial = %s "
536
             "and cluster != ?")
537
        subq, args = self._construct_latest_version_subquery(
538
            node=node, before=before)
539
        execute(q % subq, args + [except_cluster])
540
        props = fetchone()
541
        if props is None:
542
            return None
543
        mtime = props[MTIME]
544

    
545
        # First level, just under node (get population).
546
        q = ("select count(serial), sum(size), max(mtime) "
547
             "from versions v "
548
             "where serial = %s "
549
             "and cluster != ? "
550
             "and node in (select node "
551
             "from nodes "
552
             "where parent = ?)")
553
        subq, args = self._construct_latest_version_subquery(
554
            node=None, before=before)
555
        execute(q % subq, args + [except_cluster, node])
556
        r = fetchone()
557
        if r is None:
558
            return None
559
        count = r[0]
560
        mtime = max(mtime, r[2])
561
        if count == 0:
562
            return (0, 0, mtime)
563

    
564
        # All children (get size and mtime).
565
        # This is why the full path is stored.
566
        q = ("select count(serial), sum(size), max(mtime) "
567
             "from versions v "
568
             "where serial = %s "
569
             "and cluster != ? "
570
             "and node in (select node "
571
             "from nodes "
572
             "where path like ? escape '\\')")
573
        subq, args = self._construct_latest_version_subquery(
574
            node=None, before=before)
575
        execute(
576
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
577
        r = fetchone()
578
        if r is None:
579
            return None
580
        size = r[1] - props[SIZE]
581
        mtime = max(mtime, r[2])
582
        return (count, size, mtime)
583

    
584
    def nodes_set_latest_version(self, node, serial):
585
        q = ("update nodes set latest_version = ? where node = ?")
586
        props = (serial, node)
587
        self.execute(q, props)
588

    
589
    def version_create(self, node, hash, size, type, source, muser, uuid,
590
                       checksum, cluster=0,
591
                       update_statistics_ancestors_depth=None):
592
        """Create a new version from the given properties.
593
           Return the (serial, mtime) of the new version.
594
        """
595

    
596
        q = ("insert into versions (node, hash, size, type, source, mtime, "
597
             "muser, uuid, checksum, cluster) "
598
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
599
        mtime = time()
600
        props = (node, hash, size, type, source, mtime, muser,
601
                 uuid, checksum, cluster)
602
        serial = self.execute(q, props).lastrowid
603
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
604
                                         update_statistics_ancestors_depth)
605

    
606
        self.nodes_set_latest_version(node, serial)
607

    
608
        return serial, mtime
609

    
610
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
611
        """Lookup the current version of the given node.
612
           Return a list with its properties:
613
           (serial, node, hash, size, type, source, mtime,
614
            muser, uuid, checksum, cluster)
615
           or None if the current version is not found in the given cluster.
616
        """
617

    
618
        q = ("select %s "
619
             "from versions v "
620
             "where serial = %s "
621
             "and cluster = ?")
622
        subq, args = self._construct_latest_version_subquery(
623
            node=node, before=before)
624
        if not all_props:
625
            q = q % ("serial", subq)
626
        else:
627
            q = q % (("serial, node, hash, size, type, source, mtime, muser, "
628
                      "uuid, checksum, cluster"),
629
                     subq)
630

    
631
        self.execute(q, args + [cluster])
632
        props = self.fetchone()
633
        if props is not None:
634
            return props
635
        return None
636

    
637
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
638
                            all_props=True, order_by_path=False):
639
        """Lookup the current versions of the given nodes.
640
           Return a list with their properties:
641
           (serial, node, hash, size, type, source, mtime, muser, uuid,
642
            checksum, cluster).
643
        """
644

    
645
        if not nodes:
646
            return ()
647
        q = ("select %s "
648
             "from versions v, nodes n "
649
             "where serial in %s "
650
             "and v.node = n.node "
651
             "and cluster = ? %s ")
652
        subq, args = self._construct_latest_versions_subquery(
653
            nodes=nodes, before=before)
654
        if not all_props:
655
            q = q % ("serial", subq, '')
656
        else:
657
            q = q % (("serial, v.node, hash, size, type, source, mtime, "
658
                      "muser, uuid, checksum, cluster"),
659
                     subq,
660
                     "order by path" if order_by_path else "")
661

    
662
        args += [cluster]
663
        self.execute(q, args)
664
        return self.fetchall()
665

    
666
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
667
                               node=None):
668
        """Return a sequence of values for the properties of
669
           the version specified by serial and the keys, in the order given.
670
           If keys is empty, return all properties in the order
671
           (serial, node, hash, size, type, source, mtime, muser, uuid,
672
            checksum, cluster).
673
        """
674

    
675
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
676
             "uuid, checksum, cluster "
677
             "from versions "
678
             "where serial = ? ")
679
        args = [serial]
680
        if node is not None:
681
            q += ("and node = ?")
682
            args += [node]
683
        self.execute(q, args)
684
        r = self.fetchone()
685
        if r is None:
686
            return r
687

    
688
        if not keys:
689
            return r
690
        return [r[propnames[k]] for k in keys if k in propnames]
691

    
692
    def version_put_property(self, serial, key, value):
693
        """Set value for the property of version specified by key."""
694

    
695
        if key not in _propnames:
696
            return
697
        q = "update versions set %s = ? where serial = ?" % key
698
        self.execute(q, (value, serial))
699

    
700
    def version_recluster(self, serial, cluster,
701
                          update_statistics_ancestors_depth=None):
702
        """Move the version into another cluster."""
703

    
704
        props = self.version_get_properties(serial)
705
        if not props:
706
            return
707
        node = props[NODE]
708
        size = props[SIZE]
709
        oldcluster = props[CLUSTER]
710
        if cluster == oldcluster:
711
            return
712

    
713
        mtime = time()
714
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
715
                                         update_statistics_ancestors_depth)
716
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
717
                                         update_statistics_ancestors_depth)
718

    
719
        q = "update versions set cluster = ? where serial = ?"
720
        self.execute(q, (cluster, serial))
721

    
722
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
723
        """Remove the serial specified."""
724

    
725
        props = self.version_get_properties(serial)
726
        if not props:
727
            return
728
        node = props[NODE]
729
        hash = props[HASH]
730
        size = props[SIZE]
731
        cluster = props[CLUSTER]
732

    
733
        mtime = time()
734
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
735
                                         update_statistics_ancestors_depth)
736

    
737
        q = "delete from versions where serial = ?"
738
        self.execute(q, (serial,))
739

    
740
        props = self.version_lookup(node, cluster=cluster, all_props=False)
741
        if props:
742
            self.nodes_set_latest_version(node, props[0])
743
        return hash, size
744

    
745
    def attribute_get(self, serial, domain, keys=()):
746
        """Return a list of (key, value) pairs of the specific version.
747

748
           If keys is empty, return all attributes.
749
           Othwerise, return only those specified.
750
        """
751

    
752
        execute = self.execute
753
        if keys:
754
            marks = ','.join('?' for k in keys)
755
            q = ("select key, value from attributes "
756
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
757
            execute(q, keys + (serial, domain))
758
        else:
759
            q = ("select key, value from attributes where "
760
                 "serial = ? and domain = ?")
761
            execute(q, (serial, domain))
762
        return self.fetchall()
763

    
764
    def attribute_set(self, serial, domain, node, items, is_latest=True):
765
        """Set the attributes of the version specified by serial.
766
           Receive attributes as an iterable of (key, value) pairs.
767
        """
768

    
769
        q = ("insert or replace into attributes "
770
             "(serial, domain, node, is_latest, key, value) "
771
             "values (?, ?, ?, ?, ?, ?)")
772
        self.executemany(q, ((serial, domain, node, is_latest, k, v) for
773
                         k, v in items))
774

    
775
    def attribute_del(self, serial, domain, keys=()):
776
        """Delete attributes of the version specified by serial.
777
           If keys is empty, delete all attributes.
778
           Otherwise delete those specified.
779
        """
780

    
781
        if keys:
782
            q = ("delete from attributes "
783
                 "where serial = ? and domain = ? and key = ?")
784
            self.executemany(q, ((serial, domain, key) for key in keys))
785
        else:
786
            q = "delete from attributes where serial = ? and domain = ?"
787
            self.execute(q, (serial, domain))
788

    
789
    def attribute_copy(self, source, dest):
790
        q = ("insert or replace into attributes "
791
             "(serial, domain, node, is_latest, key, value) "
792
             "select ?, domain, node, is_latest, key, value from attributes "
793
             "where serial = ?")
794
        self.execute(q, (dest, source))
795

    
796
    def attribute_unset_is_latest(self, node, exclude):
797
        q = ("update attributes set is_latest = 0 "
798
             "where node = ? and serial != ?")
799
        self.execute(q, (node, exclude))
800

    
801
    def _construct_filters(self, domain, filterq):
802
        if not domain or not filterq:
803
            return None, None
804

    
805
        subqlist = []
806
        append = subqlist.append
807
        included, excluded, opers = parse_filters(filterq)
808
        args = []
809

    
810
        if included:
811
            subq = ("exists (select 1 from attributes where serial = v.serial "
812
                    "and domain = ? and ")
813
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
814
            subq += ")"
815
            args += [domain]
816
            args += included
817
            append(subq)
818

    
819
        if excluded:
820
            subq = ("not exists (select 1 from attributes where "
821
                    "serial = v.serial and domain = ? and ")
822
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
823
            subq += ")"
824
            args += [domain]
825
            args += excluded
826
            append(subq)
827

    
828
        if opers:
829
            for k, o, v in opers:
830
                subq = ("exists (select 1 from attributes where "
831
                        "serial = v.serial and domain = ? and ")
832
                subq += "key = ? and value %s ?" % (o,)
833
                subq += ")"
834
                args += [domain, k, v]
835
                append(subq)
836

    
837
        if not subqlist:
838
            return None, None
839

    
840
        subq = ' and ' + ' and '.join(subqlist)
841

    
842
        return subq, args
843

    
844
    def _construct_paths(self, pathq):
845
        if not pathq:
846
            return None, None
847

    
848
        subqlist = []
849
        args = []
850
        for path, match in pathq:
851
            if match == MATCH_PREFIX:
852
                subqlist.append("n.path like ? escape '\\'")
853
                args.append(self.escape_like(path) + '%')
854
            elif match == MATCH_EXACT:
855
                subqlist.append("n.path = ?")
856
                args.append(path)
857

    
858
        subq = ' and (' + ' or '.join(subqlist) + ')'
859
        args = tuple(args)
860

    
861
        return subq, args
862

    
863
    def _construct_size(self, sizeq):
864
        if not sizeq or len(sizeq) != 2:
865
            return None, None
866

    
867
        subq = ''
868
        args = []
869
        if sizeq[0]:
870
            subq += " and v.size >= ?"
871
            args += [sizeq[0]]
872
        if sizeq[1]:
873
            subq += " and v.size < ?"
874
            args += [sizeq[1]]
875

    
876
        return subq, args
877

    
878
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
879
        if before == inf:
880
            q = ("n.latest_version ")
881
            args = []
882
        else:
883
            q = ("(select max(serial) "
884
                 "from versions "
885
                 "where node = v.node and mtime < ?) ")
886
            args = [before]
887
        return q, args
888

    
889
    def _construct_latest_version_subquery(self, node=None, before=inf):
890
        where_cond = "node = v.node"
891
        args = []
892
        if node:
893
            where_cond = "node = ? "
894
            args = [node]
895

    
896
        if before == inf:
897
            q = ("(select latest_version "
898
                 "from nodes "
899
                 "where %s) ")
900
        else:
901
            q = ("(select max(serial) "
902
                 "from versions "
903
                 "where %s and mtime < ?) ")
904
            args += [before]
905
        return q % where_cond, args
906

    
907
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
908
        where_cond = ""
909
        args = []
910
        if nodes:
911
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
912
            args = nodes
913

    
914
        if before == inf:
915
            q = ("(select latest_version "
916
                 "from nodes "
917
                 "where %s ) ")
918
        else:
919
            q = ("(select max(serial) "
920
                 "from versions "
921
                 "where %s and mtime < ? group by node) ")
922
            args += [before]
923
        return q % where_cond, args
924

    
925
    def latest_attribute_keys(self, parent, domain, before=inf,
926
                              except_cluster=0, pathq=None):
927
        """Return a list with all keys pairs defined
928
           for all latest versions under parent that
929
           do not belong to the cluster.
930
        """
931

    
932
        pathq = pathq or []
933

    
934
        # TODO: Use another table to store before=inf results.
935
        q = ("select distinct a.key "
936
             "from attributes a, versions v, nodes n "
937
             "where v.serial = %s "
938
             "and v.cluster != ? "
939
             "and v.node in (select node "
940
             "from nodes "
941
             "where parent = ?) "
942
             "and a.serial = v.serial "
943
             "and a.domain = ? "
944
             "and n.node = v.node")
945
        subq, subargs = self._construct_latest_version_subquery(
946
            node=None, before=before)
947
        args = subargs + [except_cluster, parent, domain]
948
        q = q % subq
949
        subq, subargs = self._construct_paths(pathq)
950
        if subq is not None:
951
            q += subq
952
            args += subargs
953
        self.execute(q, args)
954
        return [r[0] for r in self.fetchall()]
955

    
956
    def latest_version_list(self, parent, prefix='', delimiter=None,
957
                            start='', limit=10000, before=inf,
958
                            except_cluster=0, pathq=[], domain=None,
959
                            filterq=[], sizeq=None, all_props=False):
960
        """Return a (list of (path, serial) tuples, list of common prefixes)
961
           for the current versions of the paths with the given parent,
962
           matching the following criteria.
963

964
           The property tuple for a version is returned if all
965
           of these conditions are true:
966

967
                a. parent matches
968

969
                b. path > start
970

971
                c. path starts with prefix (and paths in pathq)
972

973
                d. version is the max up to before
974

975
                e. version is not in cluster
976

977
                f. the path does not have the delimiter occuring
978
                   after the prefix, or ends with the delimiter
979

980
                g. serial matches the attribute filter query.
981

982
                   A filter query is a comma-separated list of
983
                   terms in one of these three forms:
984

985
                   key
986
                       an attribute with this key must exist
987

988
                   !key
989
                       an attribute with this key must not exist
990

991
                   key ?op value
992
                       the attribute with this key satisfies the value
993
                       where ?op is one of =, != <=, >=, <, >.
994

995
                h. the size is in the range set by sizeq
996

997
           The list of common prefixes includes the prefixes
998
           matching up to the first delimiter after prefix,
999
           and are reported only once, as "virtual directories".
1000
           The delimiter is included in the prefixes.
1001

1002
           If arguments are None, then the corresponding matching rule
1003
           will always match.
1004

1005
           Limit applies to the first list of tuples returned.
1006

1007
           If all_props is True, return all properties after path,
1008
           not just serial.
1009
        """
1010

    
1011
        execute = self.execute
1012

    
1013
        if not start or start < prefix:
1014
            start = strprevling(prefix)
1015
        nextling = strnextling(prefix)
1016

    
1017
        q = ("select distinct n.path, %s "
1018
             "from versions v, nodes n "
1019
             "where v.serial = %s "
1020
             "and v.cluster != ? "
1021
             "and v.node in (select node "
1022
             "from nodes "
1023
             "where parent = ?) "
1024
             "and n.node = v.node "
1025
             "and n.path > ? and n.path < ?")
1026
        subq, args = self._construct_versions_nodes_latest_version_subquery(
1027
            before)
1028
        if not all_props:
1029
            q = q % ("v.serial", subq)
1030
        else:
1031
            q = q % (("v.serial, v.node, v.hash, v.size, v.type, v.source, "
1032
                      "v.mtime, v.muser, v.uuid, v.checksum, v.cluster"),
1033
                     subq)
1034
        args += [except_cluster, parent, start, nextling]
1035
        start_index = len(args) - 2
1036

    
1037
        subq, subargs = self._construct_paths(pathq)
1038
        if subq is not None:
1039
            q += subq
1040
            args += subargs
1041
        subq, subargs = self._construct_size(sizeq)
1042
        if subq is not None:
1043
            q += subq
1044
            args += subargs
1045
        subq, subargs = self._construct_filters(domain, filterq)
1046
        if subq is not None:
1047
            q += subq
1048
            args += subargs
1049
        else:
1050
            q = q.replace("attributes a, ", "")
1051
            q = q.replace("and a.serial = v.serial ", "")
1052
        q += " order by n.path"
1053

    
1054
        if not delimiter:
1055
            q += " limit ?"
1056
            args.append(limit)
1057
            execute(q, args)
1058
            return self.fetchall(), ()
1059

    
1060
        pfz = len(prefix)
1061
        dz = len(delimiter)
1062
        count = 0
1063
        fetchone = self.fetchone
1064
        prefixes = []
1065
        pappend = prefixes.append
1066
        matches = []
1067
        mappend = matches.append
1068

    
1069
        execute(q, args)
1070
        while True:
1071
            props = fetchone()
1072
            if props is None:
1073
                break
1074
            path = props[0]
1075
            idx = path.find(delimiter, pfz)
1076

    
1077
            if idx < 0:
1078
                mappend(props)
1079
                count += 1
1080
                if count >= limit:
1081
                    break
1082
                continue
1083

    
1084
            if idx + dz == len(path):
1085
                mappend(props)
1086
                count += 1
1087
                continue  # Get one more, in case there is a path.
1088
            pf = path[:idx + dz]
1089
            pappend(pf)
1090
            if count >= limit:
1091
                break
1092

    
1093
            args[start_index] = strnextling(pf)  # New start.
1094
            execute(q, args)
1095

    
1096
        return matches, prefixes
1097

    
1098
    def latest_uuid(self, uuid, cluster):
1099
        """Return the latest version of the given uuid and cluster.
1100

1101
        Return a (path, serial) tuple.
1102
        If cluster is None, all clusters are considered.
1103

1104
        """
1105
        if cluster is not None:
1106
            cluster_where = "and cluster = ?"
1107
            args = (uuid, int(cluster))
1108
        else:
1109
            cluster_where = ""
1110
            args = (uuid,)
1111

    
1112
        q = ("select n.path, v.serial "
1113
             "from versions v, nodes n "
1114
             "where v.serial = (select max(serial) "
1115
             "from versions "
1116
             "where uuid = ? %s) "
1117
             "and n.node = v.node") % cluster_where
1118
        self.execute(q, args)
1119
        return self.fetchone()
1120

    
1121
    def domain_object_list(self, domain, paths, cluster=None):
1122
        """Return a list of (path, property list, attribute dictionary)
1123
           for the objects in the specific domain and cluster.
1124
        """
1125

    
1126
        q = ("select n.path, v.serial, v.node, v.hash, "
1127
             "v.size, v.type, v.source, v.mtime, v.muser, "
1128
             "v.uuid, v.checksum, v.cluster, a.key, a.value "
1129
             "from nodes n, versions v, attributes a "
1130
             "where v.serial = a.serial and "
1131
             "a.domain = ? and "
1132
             "a.node = n.node and "
1133
             "a.is_latest = 1 and "
1134
             "n.path in (%s)") % ','.join('?' for _ in paths)
1135
        args = [domain]
1136
        map(args.append, paths)
1137
        if cluster is not None:
1138
            q += "and v.cluster = ?"
1139
            args += [cluster]
1140

    
1141
        self.execute(q, args)
1142
        rows = self.fetchall()
1143

    
1144
        group_by = itemgetter(slice(12))
1145
        rows.sort(key=group_by)
1146
        groups = groupby(rows, group_by)
1147
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1148
                (k, data) in groups]
1149

    
1150
    def get_props(self, paths):
1151
        q = ("select distinct n.path, v.type "
1152
             "from nodes n inner join versions v "
1153
             "on v.serial = n.latest_version "
1154
             "where n.path in (%s)") % ','.join('?' for _ in paths)
1155
        self.execute(q, paths)
1156
        return self.fetchall()