Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / node.py @ c915d3bf

History | View | Annotate | Download (25.6 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35

    
36
from dbworker import DBWorker
37

    
38

    
39
ROOTNODE  = 0
40

    
41
( SERIAL, NODE, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(7)
42

    
43
inf = float('inf')
44

    
45

    
46
def strnextling(prefix):
47
    """return the first unicode string
48
       greater than but not starting with given prefix.
49
       strnextling('hello') -> 'hellp'
50
    """
51
    if not prefix:
52
        ## all strings start with the null string,
53
        ## therefore we have to approximate strnextling('')
54
        ## with the last unicode character supported by python
55
        ## 0x10ffff for wide (32-bit unicode) python builds
56
        ## 0x00ffff for narrow (16-bit unicode) python builds
57
        ## We will not autodetect. 0xffff is safe enough.
58
        return unichr(0xffff)
59
    s = prefix[:-1]
60
    c = ord(prefix[-1])
61
    if c >= 0xffff:
62
        raise RuntimeError
63
    s += unichr(c+1)
64
    return s
65

    
66
def strprevling(prefix):
67
    """return an approximation of the last unicode string
68
       less than but not starting with given prefix.
69
       strprevling(u'hello') -> u'helln\\xffff'
70
    """
71
    if not prefix:
72
        ## There is no prevling for the null string
73
        return prefix
74
    s = prefix[:-1]
75
    c = ord(prefix[-1])
76
    if c > 0:
77
        s += unichr(c-1) + unichr(0xffff)
78
    return s
79

    
80

    
81
import re
82
_regexfilter = re.compile('(!?)\s*([\w-]+)\s*(=|!=|<=|>=|<|>)?\s*(.*)$', re.UNICODE)
83

    
84
_propnames = {
85
    'serial'    : 0,
86
    'node'      : 1,
87
    'size'      : 2,
88
    'source'    : 3,
89
    'mtime'     : 4,
90
    'muser'     : 5,
91
    'cluster'   : 6,
92
}
93

    
94

    
95
class Node(DBWorker):
96
    """Nodes store path organization.
97
       Versions store object history.
98
       Attributes store metadata.
99
    """
100
    
101
    # TODO: Keep size of object in one place.
102
    
103
    def __init__(self, **params):
104
        execute = self.execute
105
        
106
        execute(""" pragma foreign_keys = on """)
107
        
108
        execute(""" create table if not exists nodes
109
                          ( node       integer primary key,
110
                            parent     integer not null default 0,
111
                            path       text    not null default '',
112
                            foreign key (parent)
113
                            references nodes(node)
114
                            on update cascade
115
                            on delete cascade )""")
116
        execute(""" create unique index if not exists idx_nodes_path
117
                    on nodes(path) """)
118
        
119
        execute(""" create table if not exists statistics
120
                          ( node       integer not null,
121
                            population integer not null default 0,
122
                            size       integer not null default 0,
123
                            mtime      integer,
124
                            muser      text    not null default '',
125
                            cluster    integer not null default 0,
126
                            primary key (node, cluster)
127
                            foreign key (node)
128
                            references nodes(node)
129
                            on update cascade
130
                            on delete cascade )""")
131
        
132
        execute(""" create table if not exists versions
133
                          ( serial     integer primary key,
134
                            node       integer not null,
135
                            size       integer not null default 0,
136
                            source     integer,
137
                            mtime      integer,
138
                            cluster    integer not null default 0,
139
                            foreign key (node)
140
                            references nodes(node)
141
                            on update cascade
142
                            on delete cascade ) """)
143
        # execute(""" create index if not exists idx_versions_path
144
        #             on nodes(cluster, node, path) """)
145
        # execute(""" create index if not exists idx_versions_mtime
146
        #             on nodes(mtime) """)
147
        
148
        execute(""" create table if not exists attributes
149
                          ( serial integer,
150
                            key    text,
151
                            value  text,
152
                            primary key (serial, key)
153
                            foreign key (serial)
154
                            references versions(serial)
155
                            on update cascade
156
                            on delete cascade ) """)
157
        
158
        q = "insert or ignore into nodes(node, parent) values (?, ?)"
159
        execute(q, (ROOTNODE, ROOTNODE))
160
    
161
    def node_create(self, parent, path):
162
        """Create a new node from the given properties.
163
           Return the node identifier of the new node.
164
        """
165
        
166
        q = ("insert into nodes (parent, path) "
167
             "values (?, ?)")
168
        props = (parent, path)
169
        return self.execute(q, props).lastrowid
170
    
171
    def node_lookup(self, path):
172
        """Lookup the current node of the given path.
173
           Return None if the path is not found.
174
        """
175
        
176
        q = ("select node from nodes where path = ?")
177
        self.execute(q, (path,))
178
        r = self.fetchone()
179
        if r is not None:
180
            return r[0]
181
        return None
182
    
183
    def node_update_ancestors(self, node, population, size, mtime, cluster=0):
184
        """Update the population properties of the given node.
185
           Population properties keep track the population and total
186
           size of objects in the node's namespace.
187
           May be zero or positive or negative numbers.
188
        """
189
        
190
        qs = ("select population, size from statistics"
191
              "where node = ? and cluster = ?")
192
        qu = ("insert or replace into statistics (node, population, size, mtime, cluster) "
193
              "values (?, ?, ?, ?, ?)")
194
        qp = "select parent from nodes where serial = ?"
195
        execute = self.execute
196
        fetchone = self.fetchone
197
        while 1:
198
            execute(qs, (node, cluster))
199
            r = fetchone()
200
            if r is None:
201
                prepopulation, presize = (0, 0)
202
            else:
203
                prepopulation, presize = r
204
            population += prepopulation
205
            size += presize
206
            
207
            execute(qu, (node, population, size, mtime, cluster))
208
            if node == 0:
209
                break
210
            
211
            population = 0 # Population isn't recursive
212
            execute(qp, (node,))
213
            r = fetchone()
214
            if r is None:
215
                break
216
            node = r[0]
217
    
218
    def node_statistics(self, node, cluster=0):
219
        """Return population, total size and last mtime
220
           for all versions under node that belong to the cluster.
221
        """
222
        
223
        q = ("select population, size, mtime from statistics"
224
             "where node = ? and cluster = ?")
225
        self.execute(q, (node, cluster))
226
        r = fetchone()
227
        if r is None:
228
            return (0, 0, 0)
229
        return r
230
    
231
    def node_count_children(self, node):
232
        """Return node's child count."""
233
        
234
        q = "select count(node) from nodes where parent = ? and node != 0"
235
        self.execute(q, (node,))
236
        r = fetchone()
237
        if r is None:
238
            return 0
239
        return r
240
    
241
    def node_purge_children(self, parent, before=inf, cluster=0):
242
        """Delete all versions with the specified
243
           parent and cluster, and return
244
           the serials of versions deleted.
245
           Clears out nodes with no remaining versions.
246
        """
247
        
248
        execute = self.execute
249
        q = ("select count(serial), sum(size) from versions "
250
             "where node in (select node "
251
                            "from nodes "
252
                            "where parent = ?) "
253
             "and cluster = ? "
254
             "and mtime <= ?")
255
        args = (parent, cluster, before)
256
        execute(q, args)
257
        nr, size = self.fetchone()
258
        if not nr:
259
            return ()
260
        # TODO: Statistics for nodes (children) will be wrong.
261
        self.node_update_ancestors(parent, -nr, -size, cluster)
262
        
263
        q = ("select serial from versions "
264
             "where node in (select node "
265
                            "from nodes "
266
                            "where parent = ?) "
267
             "and cluster = ? "
268
             "and mtime <= ?")
269
        execute(q, args)
270
        serials = [r[SERIAL] for r in self.fetchall()]
271
        q = ("delete from versions "
272
             "where node in (select node "
273
                            "from nodes "
274
                            "where parent = ?) "
275
             "and cluster = ? "
276
             "and mtime <= ?")
277
        execute(q, args)
278
        q = ("delete from nodes n "
279
             "where (select count(serial) "
280
                    "from versions "
281
                    "where node = n.node) = 0 "
282
             "and parent = ?")
283
        execute(q, parent)
284
        return serials
285
    
286
    def node_purge(self, node, before=inf, cluster=0):
287
        """Delete all versions with the specified
288
           node and cluster, and return
289
           the serials of versions deleted.
290
           Clears out the node if it has no remaining versions.
291
        """
292
        
293
        execute = self.execute
294
        q = ("select count(serial), sum(size) from versions "
295
             "where node = ? "
296
             "and cluster = ? "
297
             "and mtime <= ?")
298
        args = (node, cluster, before)
299
        execute(q, args)
300
        nr, size = self.fetchone()
301
        if not nr:
302
            return ()
303
        self.node_update_ancestors(node, -nr, -size, cluster)
304
        
305
        q = ("select serial from versions "
306
             "where node = ? "
307
             "and cluster = ? "
308
             "and mtime <= ?")
309
        execute(q, args)
310
        serials = [r[SERIAL] for r in self.fetchall()]
311
        q = ("delete from versions "
312
             "where node = ? "
313
             "and cluster = ? "
314
             "and mtime <= ?")
315
        execute(q, args)
316
        q = ("delete from nodes n "
317
             "where (select count(serial) "
318
                    "from versions "
319
                    "where node = n.node) = 0 "
320
             "and node = ?")
321
        execute(q, node)
322
        return serials
323
    
324
    def node_remove(self, node):
325
        """Remove the node specified.
326
           Return false if the node has children or is not found.
327
        """
328
        
329
        if self.node_children(node):
330
            return False
331
        
332
        q = "select parent from node where node = ?"
333
        self.execute(q, (node,))
334
        r = self.fetchone()
335
        if r is None:
336
            return False
337
        parent = r[0]
338
        
339
        mtime = time()
340
        q = "select population, size, cluster from statistics where node = ?"
341
        self.execute(q, (node,))
342
        for population, size, cluster in self.fetchall():
343
            self.node_update_ancestors(parent, -population, -size, mtime, cluster)
344
        
345
        q = "delete from nodes where node = ?"
346
        self.execute(q, (node,))
347
        return True
348
    
349
#     def node_remove(self, serial, recursive=0):
350
#         """Remove the node specified by serial.
351
#            Return false if the node is not found,
352
#            or has ancestors and recursive is not set.
353
#         """
354
#         
355
#         props = self.node_get_properties(serial)
356
#         if props is None:
357
#             return False
358
#         size = props[SIZE]
359
#         parent = props[PARENT]
360
#         pop = props[POPULATION]
361
#         popsize = props[POPSIZE]
362
#         if pop and not recursive:
363
#             return False
364
#         
365
#         q = ("delete from nodes where serial = ?")
366
#         self.execute(q, (serial,))
367
#         self.node_update_ancestors(parent, -pop-1, -size-popsize)
368
#         return True
369
    
370
    def version_create(self, node, size, source, muser, cluster=0):
371
        """Create a new version from the given properties.
372
           Return the (serial, mtime) of the new version.
373
        """
374
        
375
        q = ("insert into nodes (node, size, source, mtime, muser, cluster) "
376
             "values (?, ?, ?, ?, ?)")
377
        mtime = time()
378
        props = (node, path, size, source, mtime, muser, cluster)
379
        serial = self.execute(q, props).lastrowid
380
        self.node_update_ancestors(node, 1, size, mtime, cluster)
381
        return serial, mtime
382
    
383
    def version_lookup(self, node, before=inf, cluster=0):
384
        """Lookup the current version of the given node.
385
           Return a list with its properties:
386
           (serial, node, size, source, mtime, muser, cluster)
387
           or None if the current version is not found in the given cluster.
388
        """
389
        
390
        q = ("select serial, node, size, source, mtime, muser, cluster "
391
             "from versions "
392
             "where serial = (select max(serial) "
393
                             "from versions "
394
                             "where node = ? and mtime < ?) "
395
             "and cluster = ?")
396
        self.execute(q, (node, before, cluster))
397
        props = self.fetchone()
398
        if props is not None:
399
            return props
400
        return None
401
    
402
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
403
        """Return a sequence of values for the properties of
404
           the version specified by serial and the keys, in the order given.
405
           If keys is empty, return all properties in the order
406
           (serial, node, size, source, mtime, muser, cluster).
407
        """
408
        
409
        q = ("select serial, node, path, size, source, mtime, muser, cluster "
410
             "from nodes "
411
             "where serial = ?")
412
        self.execute(q, (serial,))
413
        r = self.fetchone()
414
        if r is None:
415
            return r
416
        
417
        if not keys:
418
            return r
419
        return [r[propnames[k]] for k in keys if k in propnames]
420
    
421
#     def node_set_properties(self, serial, items, propnames=_mutablepropnames):
422
#         """Set the properties of a node specified by the node serial and
423
#            the items iterable of (name, value) pairs.
424
#            Mutable properties are %s.
425
#            Invalid property names and 'serial' are not set.
426
#         """ % (_mutables,)
427
#         
428
#         if not items:
429
#             return
430
#         
431
#         keys, vals = zip(*items)
432
#         keystr = ','.join(("%s = ?" % k) for k in keys if k in propnames)
433
#         if not keystr:
434
#             return
435
#         q = "update nodes set %s where serial = ?" % keystr
436
#         vals += (serial,)
437
#         self.execute(q, vals)
438
    
439
    def version_recluster(self, serial, cluster):
440
        """Move the version into another cluster."""
441
        
442
        props = self.node_get_properties(source)
443
        node = props[NODE]
444
        size = props[SIZE]
445
        mtime = props[MTIME]
446
        oldcluster = props[CLUSTER]
447
        if cluster == oldcluster:
448
            return
449
        
450
        self.node_update_ancestors(node, -1, -size, mtime, oldcluster)
451
        self.node_update_ancestors(node, 1, size, mtime, cluster)
452

    
453
        q = "update nodes set cluster = ? where serial = ?"
454
        self.execute(q, (cluster, serial))
455
    
456
#     def version_copy(self, serial, node, muser, copy_attr=True):
457
#         """Copy the version specified by serial into
458
#            a new version of node. Optionally copy attributes.
459
#            Return the (serial, mtime) of the new version.
460
#         """
461
#         
462
#         props = self.version_get_properties(serial)
463
#         if props is None:
464
#             return None
465
#         size = props[SIZE]
466
#         cluster = props[CLUSTER]
467
#         new_serial, mtime = self.version_create(node, size, serial, muser, cluster)
468
#         if copy_attr:
469
#             self.attribute_copy(serial, new_serial)
470
#         return (new_serial, mtime)
471
    
472
    def path_statistics(self, prefix, before=inf, except_cluster=0):
473
        """Return population, total size and last mtime
474
           for all latest versions under prefix that
475
           do not belong to the cluster.
476
        """
477
        
478
        q = ("select count(serial), sum(size), max(mtime) "
479
             "from versions v "
480
             "where serial = (select max(serial) "
481
                             "from versions "
482
                             "where node = v.node and mtime < ?) "
483
             "and cluster != ? "
484
             "and node in (select node "
485
                          "from nodes "
486
                          "where path like ?)")
487
        self.execute(q, (before, except_cluster, prefix + '%'))
488
        r = fetchone()
489
        if r is None:
490
            return (0, 0, 0)
491
        return r
492
    
493
    def parse_filters(self, filterq):
494
        preterms = filterq.split(',')
495
        included = []
496
        excluded = []
497
        opers = []
498
        match = _regexfilter.match
499
        for term in preterms:
500
            m = match(term)
501
            if m is None:
502
                continue
503
            neg, key, op, value = m.groups()
504
            if neg:
505
                excluded.append(key)
506
            elif not value:
507
                included.append(key)
508
            elif op:
509
                opers.append((key, op, value))
510
        
511
        return included, excluded, opers
512
    
513
    def construct_filters(self, filterq):
514
        subqlist = []
515
        append = subqlist.append
516
        included, excluded, opers = self.parse_filters(filterq)
517
        args = []
518
        
519
        if included:
520
            subq = "key in ("
521
            subq += ','.join(('?' for x in included)) + ")"
522
            args += included
523
            append(subq)
524
        
525
        if excluded:
526
            subq = "key not in ("
527
            subq += ','.join(('?' for x in exluded)) + ")"
528
            args += excluded
529
            append(subq)
530
        
531
        if opers:
532
            t = (("(key = %s and value %s %s)" % (k, o, v)) for k, o, v in opers)
533
            subq = "(" + ' or '.join(t) + ")"
534
            args += opers
535
        
536
        if not subqlist:
537
            return None, None
538
        
539
        subq = " and serial in (select serial from attributes where "
540
        subq += ' and '.join(subqlist)
541
        subq += ")"
542
        
543
        return subq, args
544
    
545
#     def node_list(self, parent, prefix='',
546
#                    start='', delimiter=None,
547
#                    after=0.0, before=inf,
548
#                    filterq=None, versions=0,
549
#                    cluster=0, limit=10000):
550
#         """Return (a list of property tuples, a list of common prefixes)
551
#            for the current versions of the paths with the given parent,
552
#            matching the following criteria.
553
#            
554
#            The property tuple for a version is returned if all
555
#            of these conditions are true:
556
#            
557
#                 a. parent (and cluster) matches
558
#                 
559
#                 b. path > start
560
#                 
561
#                 c. path starts with prefix
562
#                 
563
#                 d. i  [versions=true]  version is in (after, before)
564
#                    ii [versions=false] version is the max in (after, before)
565
#                 
566
#                 e. the path does not have the delimiter occuring
567
#                    after the prefix.
568
#                 
569
#                 f. serial matches the attribute filter query.
570
#                    
571
#                    A filter query is a comma-separated list of
572
#                    terms in one of these three forms:
573
#                    
574
#                    key
575
#                        an attribute with this key must exist
576
#                    
577
#                    !key
578
#                        an attribute with this key must not exist
579
#                    
580
#                    key ?op value
581
#                        the attribute with this key satisfies the value
582
#                        where ?op is one of ==, != <=, >=, <, >.
583
#            
584
#            matching up to the first delimiter after prefix,
585
#            and are reported only once, as "virtual directories".
586
#            The delimiter is included in the prefixes.
587
#            Prefixes do appear from (e) even if no paths would match in (f).
588
#            
589
#            If arguments are None, then the corresponding matching rule
590
#            will always match.
591
#         """
592
#         
593
#         execute = self.execute
594
# 
595
#         if start < prefix:
596
#             start = strprevling(prefix)
597
# 
598
#         nextling = strnextling(prefix)
599
# 
600
#         q = ("select serial, parent, path, size, "
601
#                     "population, popsize, source, mtime, cluster "
602
#              "from nodes "
603
#              "where parent = ? and path > ? and path < ? "
604
#              "and mtime > ? and mtime < ? and cluster = ?")
605
#         args = [parent, start, nextling, after, before, cluster]
606
# 
607
#         if filterq:
608
#             subq, subargs = self.construct_filters(filterq)
609
#             if subq is not None:
610
#                 q += subq
611
#                 args += subargs
612
#         q += " order by path"
613
# 
614
#         if delimiter is None:
615
#             q += " limit ?"
616
#             args.append(limit)
617
#             execute(q, args)
618
#             return self.fetchall(), ()
619
# 
620
#         pfz = len(prefix)
621
#         dz = len(delimiter)
622
#         count = 0
623
#         fetchone = self.fetchone
624
#         prefixes = []
625
#         pappend = prefixes.append
626
#         matches = []
627
#         mappend = matches.append
628
#         
629
#         execute(q, args)
630
#         while 1:
631
#             props = fetchone()
632
#             if props is None:
633
#                 break
634
#             path = props[PATH]
635
#             idx = path.find(delimiter, pfz)
636
#             if idx < 0:
637
#                 mappend(props)
638
#                 count += 1
639
#                 if count >= limit:
640
#                     break
641
#                 continue
642
# 
643
#             pf = path[:idx + dz]
644
#             pappend(pf)
645
#             count += 1
646
#             ## XXX: if we break here due to limit,
647
#             ##      but a path would also be matched below,
648
#             ##      the path match would be lost since the
649
#             ##      next call with start=path would skip both of them.
650
#             ##      In this case, it is impossible to obey the limit,
651
#             ##      therefore we will break later, at limit + 1.
652
#             if idx + dz == len(path):
653
#                 mappend(props)
654
#                 count += 1
655
# 
656
#             if count >= limit: 
657
#                 break
658
# 
659
#             args[1] = strnextling(pf) # new start
660
#             execute(q, args)
661
# 
662
#         return matches, prefixes
663
    
664
    def attribute_get(self, serial, keys=()):
665
        """Return a list of (key, value) pairs of the version specified by serial.
666
           If keys is empty, return all attributes.
667
           Othwerise, return only those specified.
668
        """
669
        
670
        execute = self.execute
671
        if keys:
672
            marks = ','.join('?' for k in keys)
673
            q = ("select key, value from attributes "
674
                 "where key in (%s) and serial = ?" % (marks,))
675
            execute(q, keys + (serial,))
676
        else:
677
            q = "select key, value from attributes where serial = ?"
678
            execute(q, (serial,))
679
        return self.fetchall()
680
    
681
    def attribute_set(self, serial, items):
682
        """Set the attributes of the version specified by serial.
683
           Receive attributes as an iterable of (key, value) pairs.
684
        """
685
        
686
        q = ("insert or replace into attributes (serial, key, value) "
687
             "values (?, ?, ?)")
688
        self.executemany(q, ((serial, k, v) for k, v in items))
689
    
690
    def attribute_del(self, serial, keys=()):
691
        """Delete attributes of the version specified by serial.
692
           If keys is empty, delete all attributes.
693
           Otherwise delete those specified.
694
        """
695
        
696
        if keys:
697
            q = "delete from attributes where serial = ? and key = ?"
698
            self.executemany(q, ((serial, key) for key in keys))
699
        else:
700
            q = "delete from attributes where serial = ?"
701
            self.execute(q, (serial,))
702
    
703
#     def node_get_attribute_keys(self, parent):
704
#         """Return a list with all keys pairs defined
705
#            for the namespace of the node specified.
706
#         """
707
#         
708
#         q = ("select distinct key from attributes a, versions v, nodes n "
709
#              "where a.serial = v.serial and v.node = n.node and n.parent = ?")
710
#         self.execute(q, (parent,))
711
#         return [r[0] for r in self.fetchall()]
712
    
713
    def attribute_copy(self, source, dest):
714
        q = ("insert or replace into attributes "
715
             "select ?, key, value from attributes "
716
             "where serial = ?")
717
        self.execute(q, (dest, source))