Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlite / node.py @ d3c34119

History | View | Annotate | Download (39.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from dbworker import DBWorker
39

    
40
from pithos.backends.filter import parse_filters
41

    
42

    
43
ROOTNODE = 0
44

    
45
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
46
 CLUSTER) = range(11)
47

    
48
(MATCH_PREFIX, MATCH_EXACT) = range(2)
49

    
50
inf = float('inf')
51

    
52

    
53
def strnextling(prefix):
54
    """Return the first unicode string
55
       greater than but not starting with given prefix.
56
       strnextling('hello') -> 'hellp'
57
    """
58
    if not prefix:
59
        ## all strings start with the null string,
60
        ## therefore we have to approximate strnextling('')
61
        ## with the last unicode character supported by python
62
        ## 0x10ffff for wide (32-bit unicode) python builds
63
        ## 0x00ffff for narrow (16-bit unicode) python builds
64
        ## We will not autodetect. 0xffff is safe enough.
65
        return unichr(0xffff)
66
    s = prefix[:-1]
67
    c = ord(prefix[-1])
68
    if c >= 0xffff:
69
        raise RuntimeError
70
    s += unichr(c + 1)
71
    return s
72

    
73

    
74
def strprevling(prefix):
75
    """Return an approximation of the last unicode string
76
       less than but not starting with given prefix.
77
       strprevling(u'hello') -> u'helln\\xffff'
78
    """
79
    if not prefix:
80
        ## There is no prevling for the null string
81
        return prefix
82
    s = prefix[:-1]
83
    c = ord(prefix[-1])
84
    if c > 0:
85
        s += unichr(c - 1) + unichr(0xffff)
86
    return s
87

    
88

    
89
_propnames = {
90
    'serial': 0,
91
    'node': 1,
92
    'hash': 2,
93
    'size': 3,
94
    'type': 4,
95
    'source': 5,
96
    'mtime': 6,
97
    'muser': 7,
98
    'uuid': 8,
99
    'checksum': 9,
100
    'cluster': 10
101
}
102

    
103

    
104
class Node(DBWorker):
105
    """Nodes store path organization and have multiple versions.
106
       Versions store object history and have multiple attributes.
107
       Attributes store metadata.
108
    """
109

    
110
    # TODO: Provide an interface for included and excluded clusters.
111

    
112
    def __init__(self, **params):
113
        DBWorker.__init__(self, **params)
114
        execute = self.execute
115

    
116
        execute(""" pragma foreign_keys = on """)
117

    
118
        execute(""" create table if not exists nodes
119
                          ( node       integer primary key,
120
                            parent     integer default 0,
121
                            path       text    not null default '',
122
                            latest_version     integer,
123
                            foreign key (parent)
124
                            references nodes(node)
125
                            on update cascade
126
                            on delete cascade ) """)
127
        execute(""" create unique index if not exists idx_nodes_path
128
                    on nodes(path) """)
129
        execute(""" create index if not exists idx_nodes_parent
130
                    on nodes(parent) """)
131

    
132
        execute(""" create table if not exists policy
133
                          ( node   integer,
134
                            key    text,
135
                            value  text,
136
                            primary key (node, key)
137
                            foreign key (node)
138
                            references nodes(node)
139
                            on update cascade
140
                            on delete cascade ) """)
141

    
142
        execute(""" create table if not exists statistics
143
                          ( node       integer,
144
                            population integer not null default 0,
145
                            size       integer not null default 0,
146
                            mtime      integer,
147
                            cluster    integer not null default 0,
148
                            primary key (node, cluster)
149
                            foreign key (node)
150
                            references nodes(node)
151
                            on update cascade
152
                            on delete cascade ) """)
153

    
154
        execute(""" create table if not exists versions
155
                          ( serial     integer primary key,
156
                            node       integer,
157
                            hash       text,
158
                            size       integer not null default 0,
159
                            type       text    not null default '',
160
                            source     integer,
161
                            mtime      integer,
162
                            muser      text    not null default '',
163
                            uuid       text    not null default '',
164
                            checksum   text    not null default '',
165
                            cluster    integer not null default 0,
166
                            foreign key (node)
167
                            references nodes(node)
168
                            on update cascade
169
                            on delete cascade ) """)
170
        execute(""" create index if not exists idx_versions_node_mtime
171
                    on versions(node, mtime) """)
172
        execute(""" create index if not exists idx_versions_node_uuid
173
                    on versions(uuid) """)
174

    
175
        execute(""" create table if not exists attributes
176
                          ( serial      integer,
177
                            domain      text,
178
                            key         text,
179
                            value       text,
180
                            node        integer not null    default 0,
181
                            is_latest   boolean not null    default 1,
182
                            primary key (serial, domain, key)
183
                            foreign key (serial)
184
                            references versions(serial)
185
                            on update cascade
186
                            on delete cascade ) """)
187
        execute(""" create index if not exists idx_attributes_domain
188
                    on attributes(domain) """)
189
        execute(""" create index if not exists idx_attributes_serial_node
190
                    on attributes(serial, node) """)
191

    
192
        wrapper = self.wrapper
193
        wrapper.execute()
194
        try:
195
            q = "insert or ignore into nodes(node, parent) values (?, ?)"
196
            execute(q, (ROOTNODE, ROOTNODE))
197
        finally:
198
            wrapper.commit()
199

    
200
    def node_create(self, parent, path):
201
        """Create a new node from the given properties.
202
           Return the node identifier of the new node.
203
        """
204

    
205
        q = ("insert into nodes (parent, path) "
206
             "values (?, ?)")
207
        props = (parent, path)
208
        return self.execute(q, props).lastrowid
209

    
210
    def node_lookup(self, path, for_update=False):
211
        """Lookup the current node of the given path.
212
           Return None if the path is not found.
213

214
           kwargs is not used: it is passed for conformance
215
        """
216

    
217
        q = "select node from nodes where path = ?"
218
        self.execute(q, (path,))
219
        r = self.fetchone()
220
        if r is not None:
221
            return r[0]
222
        return None
223

    
224
    def node_lookup_bulk(self, paths):
225
        """Lookup the current nodes for the given paths.
226
           Return () if the path is not found.
227
        """
228

    
229
        placeholders = ','.join('?' for path in paths)
230
        q = "select node from nodes where path in (%s)" % placeholders
231
        self.execute(q, paths)
232
        r = self.fetchall()
233
        if r is not None:
234
            return [row[0] for row in r]
235
        return None
236

    
237
    def node_get_properties(self, node):
238
        """Return the node's (parent, path).
239
           Return None if the node is not found.
240
        """
241

    
242
        q = "select parent, path from nodes where node = ?"
243
        self.execute(q, (node,))
244
        return self.fetchone()
245

    
246
    def node_get_versions(self, node, keys=(), propnames=_propnames):
247
        """Return the properties of all versions at node.
248
           If keys is empty, return all properties in the order
249
           (serial, node, hash, size, type, source, mtime, muser, uuid,
250
            checksum, cluster).
251
        """
252

    
253
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
254
             "uuid, checksum, cluster "
255
             "from versions "
256
             "where node = ?")
257
        self.execute(q, (node,))
258
        r = self.fetchall()
259
        if r is None:
260
            return r
261

    
262
        if not keys:
263
            return r
264
        return [[p[propnames[k]] for k in keys if k in propnames] for p in r]
265

    
266
    def node_count_children(self, node):
267
        """Return node's child count."""
268

    
269
        q = "select count(node) from nodes where parent = ? and node != 0"
270
        self.execute(q, (node,))
271
        r = self.fetchone()
272
        if r is None:
273
            return 0
274
        return r[0]
275

    
276
    def node_purge_children(self, parent, before=inf, cluster=0,
277
                            update_statistics_ancestors_depth=None):
278
        """Delete all versions with the specified
279
           parent and cluster, and return
280
           the hashes, the size and the serials of versions deleted.
281
           Clears out nodes with no remaining versions.
282
        """
283

    
284
        execute = self.execute
285
        q = ("select count(serial), sum(size) from versions "
286
             "where node in (select node "
287
             "from nodes "
288
             "where parent = ?) "
289
             "and cluster = ? "
290
             "and mtime <= ?")
291
        args = (parent, cluster, before)
292
        execute(q, args)
293
        nr, size = self.fetchone()
294
        if not nr:
295
            return (), 0, ()
296
        mtime = time()
297
        self.statistics_update(parent, -nr, -size, mtime, cluster)
298
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
299
                                         update_statistics_ancestors_depth)
300

    
301
        q = ("select hash, serial from versions "
302
             "where node in (select node "
303
             "from nodes "
304
             "where parent = ?) "
305
             "and cluster = ? "
306
             "and mtime <= ?")
307
        execute(q, args)
308
        hashes = []
309
        serials = []
310
        for r in self.fetchall():
311
            hashes += [r[0]]
312
            serials += [r[1]]
313

    
314
        q = ("delete from versions "
315
             "where node in (select node "
316
             "from nodes "
317
             "where parent = ?) "
318
             "and cluster = ? "
319
             "and mtime <= ?")
320
        execute(q, args)
321
        q = ("delete from nodes "
322
             "where node in (select node from nodes n "
323
             "where (select count(serial) "
324
             "from versions "
325
             "where node = n.node) = 0 "
326
             "and parent = ?)")
327
        execute(q, (parent,))
328
        return hashes, size, serials
329

    
330
    def node_purge(self, node, before=inf, cluster=0,
331
                   update_statistics_ancestors_depth=None):
332
        """Delete all versions with the specified
333
           node and cluster, and return
334
           the hashes, the size and the serials of versions deleted.
335
           Clears out the node if it has no remaining versions.
336
        """
337

    
338
        execute = self.execute
339
        q = ("select count(serial), sum(size) from versions "
340
             "where node = ? "
341
             "and cluster = ? "
342
             "and mtime <= ?")
343
        args = (node, cluster, before)
344
        execute(q, args)
345
        nr, size = self.fetchone()
346
        if not nr:
347
            return (), 0, ()
348
        mtime = time()
349
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
350
                                         update_statistics_ancestors_depth)
351

    
352
        q = ("select hash, serial from versions "
353
             "where node = ? "
354
             "and cluster = ? "
355
             "and mtime <= ?")
356
        execute(q, args)
357
        hashes = []
358
        serials = []
359
        for r in self.fetchall():
360
            hashes += [r[0]]
361
            serials += [r[1]]
362

    
363
        q = ("delete from versions "
364
             "where node = ? "
365
             "and cluster = ? "
366
             "and mtime <= ?")
367
        execute(q, args)
368
        q = ("delete from nodes "
369
             "where node in (select node from nodes n "
370
             "where (select count(serial) "
371
             "from versions "
372
             "where node = n.node) = 0 "
373
             "and node = ?)")
374
        execute(q, (node,))
375
        return hashes, size, serials
376

    
377
    def node_remove(self, node, update_statistics_ancestors_depth=None):
378
        """Remove the node specified.
379
           Return false if the node has children or is not found.
380
        """
381

    
382
        if self.node_count_children(node):
383
            return False
384

    
385
        mtime = time()
386
        q = ("select count(serial), sum(size), cluster "
387
             "from versions "
388
             "where node = ? "
389
             "group by cluster")
390
        self.execute(q, (node,))
391
        for population, size, cluster in self.fetchall():
392
            self.statistics_update_ancestors(
393
                node, -population, -size, mtime, cluster,
394
                update_statistics_ancestors_depth)
395

    
396
        q = "delete from nodes where node = ?"
397
        self.execute(q, (node,))
398
        return True
399

    
400
    def node_accounts(self, accounts=()):
401
        q = ("select path, node from nodes where node != 0 and parent = 0 ")
402
        args = []
403
        if accounts:
404
            placeholders = ','.join('?' for a in accounts)
405
            q += ("and path in (%s)" % placeholders)
406
            args += accounts
407
        return self.execute(q, args).fetchall()
408

    
409
    def node_account_quotas(self):
410
        q = ("select n.path, p.value from nodes n, policy p "
411
             "where n.node != 0 and n.parent = 0 "
412
             "and n.node = p.node and p.key = 'quota'")
413
        return dict(self.execute(q).fetchall())
414

    
415
    def node_account_usage(self, account=None, cluster=0):
416
        """Return usage for a specific account.
417

418
        Keyword arguments:
419
        account -- (default None: list usage for all the accounts)
420
        cluster -- list current, history or deleted usage (default 0: normal)
421
        """
422

    
423
        q = ("select n3.path, sum(v.size) from "
424
             "versions v, nodes n1, nodes n2, nodes n3 "
425
             "where v.node = n1.node "
426
             "and v.cluster = ? "
427
             "and n1.parent = n2.node "
428
             "and n2.parent = n3.node "
429
             "and n3.parent = 0 "
430
             "and n3.node != 0 ")
431
        args = [cluster]
432
        if account:
433
            q += ("and n3.path = ? ")
434
            args += [account]
435
        q += ("group by n3.path")
436

    
437
        print '###', q, args
438
        self.execute(q, args)
439
        return dict(self.fetchall())
440

    
441
    def policy_get(self, node):
442
        q = "select key, value from policy where node = ?"
443
        self.execute(q, (node,))
444
        return dict(self.fetchall())
445

    
446
    def policy_set(self, node, policy):
447
        q = "insert or replace into policy (node, key, value) values (?, ?, ?)"
448
        self.executemany(q, ((node, k, v) for k, v in policy.iteritems()))
449

    
450
    def statistics_get(self, node, cluster=0):
451
        """Return population, total size and last mtime
452
           for all versions under node that belong to the cluster.
453
        """
454

    
455
        q = ("select population, size, mtime from statistics "
456
             "where node = ? and cluster = ?")
457
        self.execute(q, (node, cluster))
458
        return self.fetchone()
459

    
460
    def statistics_update(self, node, population, size, mtime, cluster=0):
461
        """Update the statistics of the given node.
462
           Statistics keep track the population, total
463
           size of objects and mtime in the node's namespace.
464
           May be zero or positive or negative numbers.
465
        """
466

    
467
        qs = ("select population, size from statistics "
468
              "where node = ? and cluster = ?")
469
        qu = ("insert or replace into statistics "
470
              "(node, population, size, mtime, cluster) "
471
              "values (?, ?, ?, ?, ?)")
472
        self.execute(qs, (node, cluster))
473
        r = self.fetchone()
474
        if r is None:
475
            prepopulation, presize = (0, 0)
476
        else:
477
            prepopulation, presize = r
478
        population += prepopulation
479
        population = max(population, 0)
480
        size += presize
481
        self.execute(qu, (node, population, size, mtime, cluster))
482

    
483
    def statistics_update_ancestors(self, node, population, size, mtime,
484
                                    cluster=0, recursion_depth=None):
485
        """Update the statistics of the given node's parent.
486
           Then recursively update all parents up to the root.
487
           Population is not recursive.
488
        """
489

    
490
        i = 0
491
        while True:
492
            if node == ROOTNODE:
493
                break
494
            if recursion_depth and recursion_depth <= i:
495
                break
496
            props = self.node_get_properties(node)
497
            if props is None:
498
                break
499
            parent, path = props
500
            self.statistics_update(parent, population, size, mtime, cluster)
501
            node = parent
502
            population = 0  # Population isn't recursive
503
            i += 1
504

    
505
    def statistics_latest(self, node, before=inf, except_cluster=0):
506
        """Return population, total size and last mtime
507
           for all latest versions under node that
508
           do not belong to the cluster.
509
        """
510

    
511
        execute = self.execute
512
        fetchone = self.fetchone
513

    
514
        # The node.
515
        props = self.node_get_properties(node)
516
        if props is None:
517
            return None
518
        parent, path = props
519

    
520
        # The latest version.
521
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
522
             "uuid, checksum, cluster "
523
             "from versions v "
524
             "where serial = %s "
525
             "and cluster != ?")
526
        subq, args = self._construct_latest_version_subquery(
527
            node=node, before=before)
528
        execute(q % subq, args + [except_cluster])
529
        props = fetchone()
530
        if props is None:
531
            return None
532
        mtime = props[MTIME]
533

    
534
        # First level, just under node (get population).
535
        q = ("select count(serial), sum(size), max(mtime) "
536
             "from versions v "
537
             "where serial = %s "
538
             "and cluster != ? "
539
             "and node in (select node "
540
             "from nodes "
541
             "where parent = ?)")
542
        subq, args = self._construct_latest_version_subquery(
543
            node=None, before=before)
544
        execute(q % subq, args + [except_cluster, node])
545
        r = fetchone()
546
        if r is None:
547
            return None
548
        count = r[0]
549
        mtime = max(mtime, r[2])
550
        if count == 0:
551
            return (0, 0, mtime)
552

    
553
        # All children (get size and mtime).
554
        # This is why the full path is stored.
555
        q = ("select count(serial), sum(size), max(mtime) "
556
             "from versions v "
557
             "where serial = %s "
558
             "and cluster != ? "
559
             "and node in (select node "
560
             "from nodes "
561
             "where path like ? escape '\\')")
562
        subq, args = self._construct_latest_version_subquery(
563
            node=None, before=before)
564
        execute(
565
            q % subq, args + [except_cluster, self.escape_like(path) + '%'])
566
        r = fetchone()
567
        if r is None:
568
            return None
569
        size = r[1] - props[SIZE]
570
        mtime = max(mtime, r[2])
571
        return (count, size, mtime)
572

    
573
    def nodes_set_latest_version(self, node, serial):
574
        q = ("update nodes set latest_version = ? where node = ?")
575
        props = (serial, node)
576
        self.execute(q, props)
577

    
578
    def version_create(self, node, hash, size, type, source, muser, uuid,
579
                       checksum, cluster=0,
580
                       update_statistics_ancestors_depth=None):
581
        """Create a new version from the given properties.
582
           Return the (serial, mtime) of the new version.
583
        """
584

    
585
        q = ("insert into versions (node, hash, size, type, source, mtime, "
586
             "muser, uuid, checksum, cluster) "
587
             "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
588
        mtime = time()
589
        props = (node, hash, size, type, source, mtime, muser,
590
                 uuid, checksum, cluster)
591
        serial = self.execute(q, props).lastrowid
592
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
593
                                         update_statistics_ancestors_depth)
594

    
595
        self.nodes_set_latest_version(node, serial)
596

    
597
        return serial, mtime
598

    
599
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
600
        """Lookup the current version of the given node.
601
           Return a list with its properties:
602
           (serial, node, hash, size, type, source, mtime,
603
            muser, uuid, checksum, cluster)
604
           or None if the current version is not found in the given cluster.
605
        """
606

    
607
        q = ("select %s "
608
             "from versions v "
609
             "where serial = %s "
610
             "and cluster = ?")
611
        subq, args = self._construct_latest_version_subquery(
612
            node=node, before=before)
613
        if not all_props:
614
            q = q % ("serial", subq)
615
        else:
616
            q = q % (("serial, node, hash, size, type, source, mtime, muser, "
617
                      "uuid, checksum, cluster"),
618
                     subq)
619

    
620
        self.execute(q, args + [cluster])
621
        props = self.fetchone()
622
        if props is not None:
623
            return props
624
        return None
625

    
626
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
627
                            all_props=True):
628
        """Lookup the current versions of the given nodes.
629
           Return a list with their properties:
630
           (serial, node, hash, size, type, source, mtime, muser, uuid,
631
            checksum, cluster).
632
        """
633

    
634
        if not nodes:
635
            return ()
636
        q = ("select %s "
637
             "from versions "
638
             "where serial in %s "
639
             "and cluster = ? %s")
640
        subq, args = self._construct_latest_versions_subquery(
641
            nodes=nodes, before=before)
642
        if not all_props:
643
            q = q % ("serial", subq, '')
644
        else:
645
            q = q % (("serial, node, hash, size, type, source, mtime, muser, "
646
                     "uuid, checksum, cluster"),
647
                     subq,
648
                     'order by node')
649

    
650
        args += [cluster]
651
        self.execute(q, args)
652
        return self.fetchall()
653

    
654
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
655
                               node=None):
656
        """Return a sequence of values for the properties of
657
           the version specified by serial and the keys, in the order given.
658
           If keys is empty, return all properties in the order
659
           (serial, node, hash, size, type, source, mtime, muser, uuid,
660
            checksum, cluster).
661
        """
662

    
663
        q = ("select serial, node, hash, size, type, source, mtime, muser, "
664
             "uuid, checksum, cluster "
665
             "from versions "
666
             "where serial = ? ")
667
        args = [serial]
668
        if node is not None:
669
            q += ("and node = ?")
670
            args += [node]
671
        self.execute(q, args)
672
        r = self.fetchone()
673
        if r is None:
674
            return r
675

    
676
        if not keys:
677
            return r
678
        return [r[propnames[k]] for k in keys if k in propnames]
679

    
680
    def version_put_property(self, serial, key, value):
681
        """Set value for the property of version specified by key."""
682

    
683
        if key not in _propnames:
684
            return
685
        q = "update versions set %s = ? where serial = ?" % key
686
        self.execute(q, (value, serial))
687

    
688
    def version_recluster(self, serial, cluster,
689
                          update_statistics_ancestors_depth=None):
690
        """Move the version into another cluster."""
691

    
692
        props = self.version_get_properties(serial)
693
        if not props:
694
            return
695
        node = props[NODE]
696
        size = props[SIZE]
697
        oldcluster = props[CLUSTER]
698
        if cluster == oldcluster:
699
            return
700

    
701
        mtime = time()
702
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
703
                                         update_statistics_ancestors_depth)
704
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
705
                                         update_statistics_ancestors_depth)
706

    
707
        q = "update versions set cluster = ? where serial = ?"
708
        self.execute(q, (cluster, serial))
709

    
710
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
711
        """Remove the serial specified."""
712

    
713
        props = self.version_get_properties(serial)
714
        if not props:
715
            return
716
        node = props[NODE]
717
        hash = props[HASH]
718
        size = props[SIZE]
719
        cluster = props[CLUSTER]
720

    
721
        mtime = time()
722
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
723
                                         update_statistics_ancestors_depth)
724

    
725
        q = "delete from versions where serial = ?"
726
        self.execute(q, (serial,))
727

    
728
        props = self.version_lookup(node, cluster=cluster, all_props=False)
729
        if props:
730
            self.nodes_set_latest_version(node, props[0])
731
        return hash, size
732

    
733
    def attribute_get(self, serial, domain, keys=()):
734
        """Return a list of (key, value) pairs of the specific version.
735

736
           If keys is empty, return all attributes.
737
           Othwerise, return only those specified.
738
        """
739

    
740
        execute = self.execute
741
        if keys:
742
            marks = ','.join('?' for k in keys)
743
            q = ("select key, value from attributes "
744
                 "where key in (%s) and serial = ? and domain = ?" % (marks,))
745
            execute(q, keys + (serial, domain))
746
        else:
747
            q = ("select key, value from attributes where "
748
                 "serial = ? and domain = ?")
749
            execute(q, (serial, domain))
750
        return self.fetchall()
751

    
752
    def attribute_set(self, serial, domain, node, items, is_latest=True):
753
        """Set the attributes of the version specified by serial.
754
           Receive attributes as an iterable of (key, value) pairs.
755
        """
756

    
757
        q = ("insert or replace into attributes "
758
             "(serial, domain, node, is_latest, key, value) "
759
             "values (?, ?, ?, ?, ?, ?)")
760
        self.executemany(q, ((serial, domain, node, is_latest, k, v) for
761
                         k, v in items))
762

    
763
    def attribute_del(self, serial, domain, keys=()):
764
        """Delete attributes of the version specified by serial.
765
           If keys is empty, delete all attributes.
766
           Otherwise delete those specified.
767
        """
768

    
769
        if keys:
770
            q = ("delete from attributes "
771
                 "where serial = ? and domain = ? and key = ?")
772
            self.executemany(q, ((serial, domain, key) for key in keys))
773
        else:
774
            q = "delete from attributes where serial = ? and domain = ?"
775
            self.execute(q, (serial, domain))
776

    
777
    def attribute_copy(self, source, dest):
778
        q = ("insert or replace into attributes "
779
             "(serial, domain, node, is_latest, key, value) "
780
             "select ?, domain, node, is_latest, key, value from attributes "
781
             "where serial = ?")
782
        self.execute(q, (dest, source))
783

    
784
    def attribute_unset_is_latest(self, node, exclude):
785
        q = ("update attributes set is_latest = 0 "
786
             "where node = ? and serial != ?")
787
        self.execute(q, (node, exclude))
788

    
789
    def _construct_filters(self, domain, filterq):
790
        if not domain or not filterq:
791
            return None, None
792

    
793
        subqlist = []
794
        append = subqlist.append
795
        included, excluded, opers = parse_filters(filterq)
796
        args = []
797

    
798
        if included:
799
            subq = ("exists (select 1 from attributes where serial = v.serial "
800
                    "and domain = ? and ")
801
            subq += "(" + ' or '.join(('key = ?' for x in included)) + ")"
802
            subq += ")"
803
            args += [domain]
804
            args += included
805
            append(subq)
806

    
807
        if excluded:
808
            subq = ("not exists (select 1 from attributes where "
809
                    "serial = v.serial and domain = ? and ")
810
            subq += "(" + ' or '.join(('key = ?' for x in excluded)) + ")"
811
            subq += ")"
812
            args += [domain]
813
            args += excluded
814
            append(subq)
815

    
816
        if opers:
817
            for k, o, v in opers:
818
                subq = ("exists (select 1 from attributes where "
819
                        "serial = v.serial and domain = ? and ")
820
                subq += "key = ? and value %s ?" % (o,)
821
                subq += ")"
822
                args += [domain, k, v]
823
                append(subq)
824

    
825
        if not subqlist:
826
            return None, None
827

    
828
        subq = ' and ' + ' and '.join(subqlist)
829

    
830
        return subq, args
831

    
832
    def _construct_paths(self, pathq):
833
        if not pathq:
834
            return None, None
835

    
836
        subqlist = []
837
        args = []
838
        for path, match in pathq:
839
            if match == MATCH_PREFIX:
840
                subqlist.append("n.path like ? escape '\\'")
841
                args.append(self.escape_like(path) + '%')
842
            elif match == MATCH_EXACT:
843
                subqlist.append("n.path = ?")
844
                args.append(path)
845

    
846
        subq = ' and (' + ' or '.join(subqlist) + ')'
847
        args = tuple(args)
848

    
849
        return subq, args
850

    
851
    def _construct_size(self, sizeq):
852
        if not sizeq or len(sizeq) != 2:
853
            return None, None
854

    
855
        subq = ''
856
        args = []
857
        if sizeq[0]:
858
            subq += " and v.size >= ?"
859
            args += [sizeq[0]]
860
        if sizeq[1]:
861
            subq += " and v.size < ?"
862
            args += [sizeq[1]]
863

    
864
        return subq, args
865

    
866
    def _construct_versions_nodes_latest_version_subquery(self, before=inf):
867
        if before == inf:
868
            q = ("n.latest_version ")
869
            args = []
870
        else:
871
            q = ("(select max(serial) "
872
                 "from versions "
873
                 "where node = v.node and mtime < ?) ")
874
            args = [before]
875
        return q, args
876

    
877
    def _construct_latest_version_subquery(self, node=None, before=inf):
878
        where_cond = "node = v.node"
879
        args = []
880
        if node:
881
            where_cond = "node = ? "
882
            args = [node]
883

    
884
        if before == inf:
885
            q = ("(select latest_version "
886
                 "from nodes "
887
                 "where %s) ")
888
        else:
889
            q = ("(select max(serial) "
890
                 "from versions "
891
                 "where %s and mtime < ?) ")
892
            args += [before]
893
        return q % where_cond, args
894

    
895
    def _construct_latest_versions_subquery(self, nodes=(), before=inf):
896
        where_cond = ""
897
        args = []
898
        if nodes:
899
            where_cond = "node in (%s) " % ','.join('?' for node in nodes)
900
            args = nodes
901

    
902
        if before == inf:
903
            q = ("(select latest_version "
904
                 "from nodes "
905
                 "where %s ) ")
906
        else:
907
            q = ("(select max(serial) "
908
                 "from versions "
909
                 "where %s and mtime < ? group by node) ")
910
            args += [before]
911
        return q % where_cond, args
912

    
913
    def latest_attribute_keys(self, parent, domain, before=inf,
914
                              except_cluster=0, pathq=None):
915
        """Return a list with all keys pairs defined
916
           for all latest versions under parent that
917
           do not belong to the cluster.
918
        """
919

    
920
        pathq = pathq or []
921

    
922
        # TODO: Use another table to store before=inf results.
923
        q = ("select distinct a.key "
924
             "from attributes a, versions v, nodes n "
925
             "where v.serial = %s "
926
             "and v.cluster != ? "
927
             "and v.node in (select node "
928
             "from nodes "
929
             "where parent = ?) "
930
             "and a.serial = v.serial "
931
             "and a.domain = ? "
932
             "and n.node = v.node")
933
        subq, subargs = self._construct_latest_version_subquery(
934
            node=None, before=before)
935
        args = subargs + [except_cluster, parent, domain]
936
        q = q % subq
937
        subq, subargs = self._construct_paths(pathq)
938
        if subq is not None:
939
            q += subq
940
            args += subargs
941
        self.execute(q, args)
942
        return [r[0] for r in self.fetchall()]
943

    
944
    def latest_version_list(self, parent, prefix='', delimiter=None,
945
                            start='', limit=10000, before=inf,
946
                            except_cluster=0, pathq=[], domain=None,
947
                            filterq=[], sizeq=None, all_props=False):
948
        """Return a (list of (path, serial) tuples, list of common prefixes)
949
           for the current versions of the paths with the given parent,
950
           matching the following criteria.
951

952
           The property tuple for a version is returned if all
953
           of these conditions are true:
954

955
                a. parent matches
956

957
                b. path > start
958

959
                c. path starts with prefix (and paths in pathq)
960

961
                d. version is the max up to before
962

963
                e. version is not in cluster
964

965
                f. the path does not have the delimiter occuring
966
                   after the prefix, or ends with the delimiter
967

968
                g. serial matches the attribute filter query.
969

970
                   A filter query is a comma-separated list of
971
                   terms in one of these three forms:
972

973
                   key
974
                       an attribute with this key must exist
975

976
                   !key
977
                       an attribute with this key must not exist
978

979
                   key ?op value
980
                       the attribute with this key satisfies the value
981
                       where ?op is one of =, != <=, >=, <, >.
982

983
                h. the size is in the range set by sizeq
984

985
           The list of common prefixes includes the prefixes
986
           matching up to the first delimiter after prefix,
987
           and are reported only once, as "virtual directories".
988
           The delimiter is included in the prefixes.
989

990
           If arguments are None, then the corresponding matching rule
991
           will always match.
992

993
           Limit applies to the first list of tuples returned.
994

995
           If all_props is True, return all properties after path,
996
           not just serial.
997
        """
998

    
999
        execute = self.execute
1000

    
1001
        if not start or start < prefix:
1002
            start = strprevling(prefix)
1003
        nextling = strnextling(prefix)
1004

    
1005
        q = ("select distinct n.path, %s "
1006
             "from versions v, nodes n "
1007
             "where v.serial = %s "
1008
             "and v.cluster != ? "
1009
             "and v.node in (select node "
1010
             "from nodes "
1011
             "where parent = ?) "
1012
             "and n.node = v.node "
1013
             "and n.path > ? and n.path < ?")
1014
        subq, args = self._construct_versions_nodes_latest_version_subquery(
1015
            before)
1016
        if not all_props:
1017
            q = q % ("v.serial", subq)
1018
        else:
1019
            q = q % (("v.serial, v.node, v.hash, v.size, v.type, v.source, "
1020
                      "v.mtime, v.muser, v.uuid, v.checksum, v.cluster"),
1021
                     subq)
1022
        args += [except_cluster, parent, start, nextling]
1023
        start_index = len(args) - 2
1024

    
1025
        subq, subargs = self._construct_paths(pathq)
1026
        if subq is not None:
1027
            q += subq
1028
            args += subargs
1029
        subq, subargs = self._construct_size(sizeq)
1030
        if subq is not None:
1031
            q += subq
1032
            args += subargs
1033
        subq, subargs = self._construct_filters(domain, filterq)
1034
        if subq is not None:
1035
            q += subq
1036
            args += subargs
1037
        else:
1038
            q = q.replace("attributes a, ", "")
1039
            q = q.replace("and a.serial = v.serial ", "")
1040
        q += " order by n.path"
1041

    
1042
        if not delimiter:
1043
            q += " limit ?"
1044
            args.append(limit)
1045
            execute(q, args)
1046
            return self.fetchall(), ()
1047

    
1048
        pfz = len(prefix)
1049
        dz = len(delimiter)
1050
        count = 0
1051
        fetchone = self.fetchone
1052
        prefixes = []
1053
        pappend = prefixes.append
1054
        matches = []
1055
        mappend = matches.append
1056

    
1057
        execute(q, args)
1058
        while True:
1059
            props = fetchone()
1060
            if props is None:
1061
                break
1062
            path = props[0]
1063
            idx = path.find(delimiter, pfz)
1064

    
1065
            if idx < 0:
1066
                mappend(props)
1067
                count += 1
1068
                if count >= limit:
1069
                    break
1070
                continue
1071

    
1072
            if idx + dz == len(path):
1073
                mappend(props)
1074
                count += 1
1075
                continue  # Get one more, in case there is a path.
1076
            pf = path[:idx + dz]
1077
            pappend(pf)
1078
            if count >= limit:
1079
                break
1080

    
1081
            args[start_index] = strnextling(pf)  # New start.
1082
            execute(q, args)
1083

    
1084
        return matches, prefixes
1085

    
1086
    def latest_uuid(self, uuid, cluster):
1087
        """Return the latest version of the given uuid and cluster.
1088

1089
        Return a (path, serial) tuple.
1090
        If cluster is None, all clusters are considered.
1091

1092
        """
1093
        if cluster is not None:
1094
            cluster_where = "and cluster = ?"
1095
            args = (uuid, int(cluster))
1096
        else:
1097
            cluster_where = ""
1098
            args = (uuid,)
1099

    
1100
        q = ("select n.path, v.serial "
1101
             "from versions v, nodes n "
1102
             "where v.serial = (select max(serial) "
1103
             "from versions "
1104
             "where uuid = ? %s) "
1105
             "and n.node = v.node") % cluster_where
1106
        self.execute(q, args)
1107
        return self.fetchone()
1108

    
1109
    def domain_object_list(self, domain, paths, cluster=None):
1110
        """Return a list of (path, property list, attribute dictionary)
1111
           for the objects in the specific domain and cluster.
1112
        """
1113

    
1114
        q = ("select n.path, v.serial, v.node, v.hash, "
1115
             "v.size, v.type, v.source, v.mtime, v.muser, "
1116
             "v.uuid, v.checksum, v.cluster, a.key, a.value "
1117
             "from nodes n, versions v, attributes a "
1118
             "where v.serial = a.serial and "
1119
             "a.domain = ? and "
1120
             "a.node = n.node and "
1121
             "a.is_latest = 1 and "
1122
             "n.path in (%s)") % ','.join('?' for _ in paths)
1123
        args = [domain]
1124
        map(args.append, paths)
1125
        if cluster is not None:
1126
            q += "and v.cluster = ?"
1127
            args += [cluster]
1128

    
1129
        self.execute(q, args)
1130
        rows = self.fetchall()
1131

    
1132
        group_by = itemgetter(slice(12))
1133
        rows.sort(key=group_by)
1134
        groups = groupby(rows, group_by)
1135
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1136
                (k, data) in groups]
1137

    
1138
    def get_props(self, paths):
1139
        q = ("select distinct n.path, v.type "
1140
             "from nodes n inner join versions v "
1141
             "on v.serial = n.latest_version "
1142
             "where n.path in (%s)") % ','.join('?' for _ in paths)
1143
        self.execute(q, paths)
1144
        return self.fetchall()