Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / sqlalchemy / node.py @ 2e2c7257

History | View | Annotate | Download (32.7 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, null, select, bindparam, text
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
ROOTNODE  = 0
45

    
46
( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
47

    
48
inf = float('inf')
49

    
50

    
51
def strnextling(prefix):
52
    """Return the first unicode string
53
       greater than but not starting with given prefix.
54
       strnextling('hello') -> 'hellp'
55
    """
56
    if not prefix:
57
        ## all strings start with the null string,
58
        ## therefore we have to approximate strnextling('')
59
        ## with the last unicode character supported by python
60
        ## 0x10ffff for wide (32-bit unicode) python builds
61
        ## 0x00ffff for narrow (16-bit unicode) python builds
62
        ## We will not autodetect. 0xffff is safe enough.
63
        return unichr(0xffff)
64
    s = prefix[:-1]
65
    c = ord(prefix[-1])
66
    if c >= 0xffff:
67
        raise RuntimeError
68
    s += unichr(c+1)
69
    return s
70

    
71
def strprevling(prefix):
72
    """Return an approximation of the last unicode string
73
       less than but not starting with given prefix.
74
       strprevling(u'hello') -> u'helln\\xffff'
75
    """
76
    if not prefix:
77
        ## There is no prevling for the null string
78
        return prefix
79
    s = prefix[:-1]
80
    c = ord(prefix[-1])
81
    if c > 0:
82
        s += unichr(c-1) + unichr(0xffff)
83
    return s
84

    
85

    
86
_propnames = {
87
    'serial'    : 0,
88
    'node'      : 1,
89
    'hash'      : 2,
90
    'size'      : 3,
91
    'source'    : 4,
92
    'mtime'     : 5,
93
    'muser'     : 6,
94
    'cluster'   : 7,
95
}
96

    
97

    
98
class Node(DBWorker):
99
    """Nodes store path organization and have multiple versions.
100
       Versions store object history and have multiple attributes.
101
       Attributes store metadata.
102
    """
103
    
104
    # TODO: Provide an interface for included and excluded clusters.
105
    
106
    def __init__(self, **params):
107
        DBWorker.__init__(self, **params)
108
        metadata = MetaData()
109
        
110
        #create nodes table
111
        columns=[]
112
        columns.append(Column('node', Integer, primary_key=True))
113
        columns.append(Column('parent', Integer,
114
                              ForeignKey('nodes.node',
115
                                         ondelete='CASCADE',
116
                                         onupdate='CASCADE'),
117
                              autoincrement=False))
118
        path_length = 2048
119
        columns.append(Column('path', String(path_length), default='', nullable=False))
120
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
121
        
122
        #create policy table
123
        columns=[]
124
        columns.append(Column('node', Integer,
125
                              ForeignKey('nodes.node',
126
                                         ondelete='CASCADE',
127
                                         onupdate='CASCADE'),
128
                              primary_key=True))
129
        columns.append(Column('key', String(255), primary_key=True))
130
        columns.append(Column('value', String(255)))
131
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
132
        
133
        #create statistics table
134
        columns=[]
135
        columns.append(Column('node', Integer,
136
                              ForeignKey('nodes.node',
137
                                         ondelete='CASCADE',
138
                                         onupdate='CASCADE'),
139
                              primary_key=True))
140
        columns.append(Column('population', Integer, nullable=False, default=0))
141
        columns.append(Column('size', BigInteger, nullable=False, default=0))
142
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
143
        columns.append(Column('cluster', Integer, nullable=False, default=0,
144
                              primary_key=True, autoincrement=False))
145
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
146
        
147
        #create versions table
148
        columns=[]
149
        columns.append(Column('serial', Integer, primary_key=True))
150
        columns.append(Column('node', Integer,
151
                              ForeignKey('nodes.node',
152
                                         ondelete='CASCADE',
153
                                         onupdate='CASCADE')))
154
        columns.append(Column('hash', String(255)))
155
        columns.append(Column('size', BigInteger, nullable=False, default=0))
156
        columns.append(Column('source', Integer))
157
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
158
        columns.append(Column('muser', String(255), nullable=False, default=''))
159
        columns.append(Column('cluster', Integer, nullable=False, default=0))
160
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
161
        Index('idx_versions_node_mtime', self.versions.c.node,
162
              self.versions.c.mtime)
163
        
164
        #create attributes table
165
        columns = []
166
        columns.append(Column('serial', Integer,
167
                              ForeignKey('versions.serial',
168
                                         ondelete='CASCADE',
169
                                         onupdate='CASCADE'),
170
                              primary_key=True))
171
        columns.append(Column('key', String(255), primary_key=True))
172
        columns.append(Column('value', String(255)))
173
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
174
        
175
        metadata.create_all(self.engine)
176
        
177
        # the following code creates an index of specific length
178
        # this can be accompliced in sqlalchemy >= 0.7.3
179
        # providing mysql_length option during index creation
180
        insp = Inspector.from_engine(self.engine)
181
        indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
182
        if 'idx_nodes_path' not in indexes:
183
            explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
184
            s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
185
            self.conn.execute(s).close()
186
        
187
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
188
                                           self.nodes.c.parent == ROOTNODE))
189
        rp = self.conn.execute(s)
190
        r = rp.fetchone()
191
        rp.close()
192
        if not r:
193
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
194
            self.conn.execute(s)
195
    
196
    def node_create(self, parent, path):
197
        """Create a new node from the given properties.
198
           Return the node identifier of the new node.
199
        """
200
        #TODO catch IntegrityError?
201
        s = self.nodes.insert().values(parent=parent, path=path)
202
        r = self.conn.execute(s)
203
        inserted_primary_key = r.inserted_primary_key[0]
204
        r.close()
205
        return inserted_primary_key
206
    
207
    def node_lookup(self, path):
208
        """Lookup the current node of the given path.
209
           Return None if the path is not found.
210
        """
211
        
212
        s = select([self.nodes.c.node], self.nodes.c.path.like(path))
213
        r = self.conn.execute(s)
214
        row = r.fetchone()
215
        r.close()
216
        if row:
217
            return row[0]
218
        return None
219
    
220
    def node_get_properties(self, node):
221
        """Return the node's (parent, path).
222
           Return None if the node is not found.
223
        """
224
        
225
        s = select([self.nodes.c.parent, self.nodes.c.path])
226
        s = s.where(self.nodes.c.node == node)
227
        r = self.conn.execute(s)
228
        l = r.fetchone()
229
        r.close()
230
        return l
231
    
232
    def node_get_versions(self, node, keys=(), propnames=_propnames):
233
        """Return the properties of all versions at node.
234
           If keys is empty, return all properties in the order
235
           (serial, node, size, source, mtime, muser, cluster).
236
        """
237
        
238
        s = select([self.versions.c.serial,
239
                    self.versions.c.node,
240
                    self.versions.c.hash,
241
                    self.versions.c.size,
242
                    self.versions.c.source,
243
                    self.versions.c.mtime,
244
                    self.versions.c.muser,
245
                    self.versions.c.cluster], self.versions.c.node == node)
246
        s = s.order_by(self.versions.c.serial)
247
        r = self.conn.execute(s)
248
        rows = r.fetchall()
249
        r.close()
250
        if not rows:
251
            return rows
252
        
253
        if not keys:
254
            return rows
255
        
256
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
257
    
258
    def node_count_children(self, node):
259
        """Return node's child count."""
260
        
261
        s = select([func.count(self.nodes.c.node)])
262
        s = s.where(and_(self.nodes.c.parent == node,
263
                         self.nodes.c.node != ROOTNODE))
264
        r = self.conn.execute(s)
265
        row = r.fetchone()
266
        r.close()
267
        return row[0]
268
    
269
    def node_purge_children(self, parent, before=inf, cluster=0):
270
        """Delete all versions with the specified
271
           parent and cluster, and return
272
           the serials of versions deleted.
273
           Clears out nodes with no remaining versions.
274
        """
275
        #update statistics
276
        c1 = select([self.nodes.c.node],
277
            self.nodes.c.parent == parent)
278
        where_clause = and_(self.versions.c.node.in_(c1),
279
                            self.versions.c.cluster == cluster)
280
        s = select([func.count(self.versions.c.serial),
281
                    func.sum(self.versions.c.size)])
282
        s = s.where(where_clause)
283
        if before != inf:
284
            s = s.where(self.versions.c.mtime <= before)
285
        r = self.conn.execute(s)
286
        row = r.fetchone()
287
        r.close()
288
        if not row:
289
            return ()
290
        nr, size = row[0], -row[1] if row[1] else 0
291
        mtime = time()
292
        self.statistics_update(parent, -nr, size, mtime, cluster)
293
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
294
        
295
        s = select([self.versions.c.serial])
296
        s = s.where(where_clause)
297
        r = self.conn.execute(s)
298
        serials = [row[SERIAL] for row in r.fetchall()]
299
        r.close()
300
        
301
        #delete versions
302
        s = self.versions.delete().where(where_clause)
303
        r = self.conn.execute(s)
304
        r.close()
305
        
306
        #delete nodes
307
        s = select([self.nodes.c.node],
308
            and_(self.nodes.c.parent == parent,
309
                 select([func.count(self.versions.c.serial)],
310
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
311
        rp = self.conn.execute(s)
312
        nodes = [r[0] for r in rp.fetchall()]
313
        rp.close()
314
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
315
        self.conn.execute(s).close()
316
        
317
        return serials
318
    
319
    def node_purge(self, node, before=inf, cluster=0):
320
        """Delete all versions with the specified
321
           node and cluster, and return
322
           the serials of versions deleted.
323
           Clears out the node if it has no remaining versions.
324
        """
325
        
326
        #update statistics
327
        s = select([func.count(self.versions.c.serial),
328
                    func.sum(self.versions.c.size)])
329
        where_clause = and_(self.versions.c.node == node,
330
                         self.versions.c.cluster == cluster)
331
        s = s.where(where_clause)
332
        if before != inf:
333
            s = s.where(self.versions.c.mtime <= before)
334
        r = self.conn.execute(s)
335
        row = r.fetchone()
336
        nr, size = row[0], row[1]
337
        r.close()
338
        if not nr:
339
            return ()
340
        mtime = time()
341
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
342
        
343
        s = select([self.versions.c.serial])
344
        s = s.where(where_clause)
345
        r = self.conn.execute(s)
346
        serials = [r[SERIAL] for r in r.fetchall()]
347
        r.close()
348
        
349
        #delete versions
350
        s = self.versions.delete().where(where_clause)
351
        r = self.conn.execute(s)
352
        r.close()
353
        
354
        #delete nodes
355
        s = select([self.nodes.c.node],
356
            and_(self.nodes.c.node == node,
357
                 select([func.count(self.versions.c.serial)],
358
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
359
        r = self.conn.execute(s)
360
        nodes = r.fetchall()
361
        r.close()
362
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
363
        self.conn.execute(s).close()
364
        
365
        return serials
366
    
367
    def node_remove(self, node):
368
        """Remove the node specified.
369
           Return false if the node has children or is not found.
370
        """
371
        
372
        if self.node_count_children(node):
373
            return False
374
        
375
        mtime = time()
376
        s = select([func.count(self.versions.c.serial),
377
                    func.sum(self.versions.c.size),
378
                    self.versions.c.cluster])
379
        s = s.where(self.versions.c.node == node)
380
        s = s.group_by(self.versions.c.cluster)
381
        r = self.conn.execute(s)
382
        for population, size, cluster in r.fetchall():
383
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
384
        r.close()
385
        
386
        s = self.nodes.delete().where(self.nodes.c.node == node)
387
        self.conn.execute(s).close()
388
        return True
389
    
390
    def policy_get(self, node):
391
        s = select([self.policies.c.key, self.policies.c.value],
392
            self.policies.c.node==node)
393
        r = self.conn.execute(s)
394
        d = dict(r.fetchall())
395
        r.close()
396
        return d
397
    
398
    def policy_set(self, node, policy):
399
        #insert or replace
400
        for k, v in policy.iteritems():
401
            s = self.policies.update().where(and_(self.policies.c.node == node,
402
                                                  self.policies.c.key == k))
403
            s = s.values(value = v)
404
            rp = self.conn.execute(s)
405
            rp.close()
406
            if rp.rowcount == 0:
407
                s = self.policies.insert()
408
                values = {'node':node, 'key':k, 'value':v}
409
                r = self.conn.execute(s, values)
410
                r.close()
411
    
412
    def statistics_get(self, node, cluster=0):
413
        """Return population, total size and last mtime
414
           for all versions under node that belong to the cluster.
415
        """
416
        
417
        s = select([self.statistics.c.population,
418
                    self.statistics.c.size,
419
                    self.statistics.c.mtime])
420
        s = s.where(and_(self.statistics.c.node == node,
421
                         self.statistics.c.cluster == cluster))
422
        r = self.conn.execute(s)
423
        row = r.fetchone()
424
        r.close()
425
        return row
426
    
427
    def statistics_update(self, node, population, size, mtime, cluster=0):
428
        """Update the statistics of the given node.
429
           Statistics keep track the population, total
430
           size of objects and mtime in the node's namespace.
431
           May be zero or positive or negative numbers.
432
        """
433
        s = select([self.statistics.c.population, self.statistics.c.size],
434
            and_(self.statistics.c.node == node,
435
                 self.statistics.c.cluster == cluster))
436
        rp = self.conn.execute(s)
437
        r = rp.fetchone()
438
        rp.close()
439
        if not r:
440
            prepopulation, presize = (0, 0)
441
        else:
442
            prepopulation, presize = r
443
        population += prepopulation
444
        size += presize
445
        
446
        #insert or replace
447
        #TODO better upsert
448
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
449
                                           self.statistics.c.cluster==cluster))
450
        u = u.values(population=population, size=size, mtime=mtime)
451
        rp = self.conn.execute(u)
452
        rp.close()
453
        if rp.rowcount == 0:
454
            ins = self.statistics.insert()
455
            ins = ins.values(node=node, population=population, size=size,
456
                             mtime=mtime, cluster=cluster)
457
            self.conn.execute(ins).close()
458
    
459
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
460
        """Update the statistics of the given node's parent.
461
           Then recursively update all parents up to the root.
462
           Population is not recursive.
463
        """
464
        
465
        while True:
466
            if node == ROOTNODE:
467
                break
468
            props = self.node_get_properties(node)
469
            if props is None:
470
                break
471
            parent, path = props
472
            self.statistics_update(parent, population, size, mtime, cluster)
473
            node = parent
474
            population = 0 # Population isn't recursive
475
    
476
    def statistics_latest(self, node, before=inf, except_cluster=0):
477
        """Return population, total size and last mtime
478
           for all latest versions under node that
479
           do not belong to the cluster.
480
        """
481
        
482
        # The node.
483
        props = self.node_get_properties(node)
484
        if props is None:
485
            return None
486
        parent, path = props
487
        
488
        # The latest version.
489
        s = select([self.versions.c.serial,
490
                    self.versions.c.node,
491
                    self.versions.c.hash,
492
                    self.versions.c.size,
493
                    self.versions.c.source,
494
                    self.versions.c.mtime,
495
                    self.versions.c.muser,
496
                    self.versions.c.cluster])
497
        filtered = select([func.max(self.versions.c.serial)],
498
                            self.versions.c.node == node)
499
        if before != inf:
500
            filtered = filtered.where(self.versions.c.mtime < before)
501
        s = s.where(and_(self.versions.c.cluster != except_cluster,
502
                         self.versions.c.serial == filtered))
503
        r = self.conn.execute(s)
504
        props = r.fetchone()
505
        r.close()
506
        if not props:
507
            return None
508
        mtime = props[MTIME]
509
        
510
        # First level, just under node (get population).
511
        v = self.versions.alias('v')
512
        s = select([func.count(v.c.serial),
513
                    func.sum(v.c.size),
514
                    func.max(v.c.mtime)])
515
        c1 = select([func.max(self.versions.c.serial)])
516
        if before != inf:
517
            c1 = c1.where(self.versions.c.mtime < before)
518
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
519
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
520
                         v.c.cluster != except_cluster,
521
                         v.c.node.in_(c2)))
522
        rp = self.conn.execute(s)
523
        r = rp.fetchone()
524
        rp.close()
525
        if not r:
526
            return None
527
        count = r[0]
528
        mtime = max(mtime, r[2])
529
        if count == 0:
530
            return (0, 0, mtime)
531
        
532
        # All children (get size and mtime).
533
        # XXX: This is why the full path is stored.
534
        s = select([func.count(v.c.serial),
535
                    func.sum(v.c.size),
536
                    func.max(v.c.mtime)])
537
        c1 = select([func.max(self.versions.c.serial)],
538
            self.versions.c.node == v.c.node)
539
        if before != inf:
540
            c1 = c1.where(self.versions.c.mtime < before)
541
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
542
        s = s.where(and_(v.c.serial == c1,
543
                         v.c.cluster != except_cluster,
544
                         v.c.node.in_(c2)))
545
        rp = self.conn.execute(s)
546
        r = rp.fetchone()
547
        rp.close()
548
        if not r:
549
            return None
550
        size = r[1] - props[SIZE]
551
        mtime = max(mtime, r[2])
552
        return (count, size, mtime)
553
    
554
    def version_create(self, node, hash, size, source, muser, cluster=0):
555
        """Create a new version from the given properties.
556
           Return the (serial, mtime) of the new version.
557
        """
558
        
559
        mtime = time()
560
        s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
561
                                          mtime=mtime, muser=muser, cluster=cluster)
562
        serial = self.conn.execute(s).inserted_primary_key[0]
563
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
564
        return serial, mtime
565
    
566
    def version_lookup(self, node, before=inf, cluster=0):
567
        """Lookup the current version of the given node.
568
           Return a list with its properties:
569
           (serial, node, hash, size, source, mtime, muser, cluster)
570
           or None if the current version is not found in the given cluster.
571
        """
572
        
573
        v = self.versions.alias('v')
574
        s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
575
                    v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
576
        c = select([func.max(self.versions.c.serial)],
577
            self.versions.c.node == node)
578
        if before != inf:
579
            c = c.where(self.versions.c.mtime < before)
580
        s = s.where(and_(v.c.serial == c,
581
                         v.c.cluster == cluster))
582
        r = self.conn.execute(s)
583
        props = r.fetchone()
584
        r.close()
585
        if props:
586
            return props
587
        return None
588
    
589
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
590
        """Return a sequence of values for the properties of
591
           the version specified by serial and the keys, in the order given.
592
           If keys is empty, return all properties in the order
593
           (serial, node, hash, size, source, mtime, muser, cluster).
594
        """
595
        
596
        v = self.versions.alias()
597
        s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
598
                    v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
599
        rp = self.conn.execute(s)
600
        r = rp.fetchone()
601
        rp.close()
602
        if r is None:
603
            return r
604
        
605
        if not keys:
606
            return r
607
        return [r[propnames[k]] for k in keys if k in propnames]
608
    
609
    def version_recluster(self, serial, cluster):
610
        """Move the version into another cluster."""
611
        
612
        props = self.version_get_properties(serial)
613
        if not props:
614
            return
615
        node = props[NODE]
616
        size = props[SIZE]
617
        oldcluster = props[CLUSTER]
618
        if cluster == oldcluster:
619
            return
620
        
621
        mtime = time()
622
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
623
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
624
        
625
        s = self.versions.update()
626
        s = s.where(self.versions.c.serial == serial)
627
        s = s.values(cluster = cluster)
628
        self.conn.execute(s).close()
629
    
630
    def version_remove(self, serial):
631
        """Remove the serial specified."""
632
        
633
        props = self.node_get_properties(serial)
634
        if not props:
635
            return
636
        node = props[NODE]
637
        size = props[SIZE]
638
        cluster = props[CLUSTER]
639
        
640
        mtime = time()
641
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
642
        
643
        s = self.versions.delete().where(self.versions.c.serial == serial)
644
        self.conn.execute(s).close()
645
        return True
646
    
647
    def attribute_get(self, serial, keys=()):
648
        """Return a list of (key, value) pairs of the version specified by serial.
649
           If keys is empty, return all attributes.
650
           Othwerise, return only those specified.
651
        """
652
        
653
        if keys:
654
            attrs = self.attributes.alias()
655
            s = select([attrs.c.key, attrs.c.value])
656
            s = s.where(and_(attrs.c.key.in_(keys),
657
                             attrs.c.serial == serial))
658
        else:
659
            attrs = self.attributes.alias()
660
            s = select([attrs.c.key, attrs.c.value])
661
            s = s.where(attrs.c.serial == serial)
662
        r = self.conn.execute(s)
663
        l = r.fetchall()
664
        r.close()
665
        return l
666
    
667
    def attribute_set(self, serial, items):
668
        """Set the attributes of the version specified by serial.
669
           Receive attributes as an iterable of (key, value) pairs.
670
        """
671
        #insert or replace
672
        #TODO better upsert
673
        for k, v in items:
674
            s = self.attributes.update()
675
            s = s.where(and_(self.attributes.c.serial == serial,
676
                             self.attributes.c.key == k))
677
            s = s.values(value = v)
678
            rp = self.conn.execute(s)
679
            rp.close()
680
            if rp.rowcount == 0:
681
                s = self.attributes.insert()
682
                s = s.values(serial=serial, key=k, value=v)
683
                self.conn.execute(s).close()
684
    
685
    def attribute_del(self, serial, keys=()):
686
        """Delete attributes of the version specified by serial.
687
           If keys is empty, delete all attributes.
688
           Otherwise delete those specified.
689
        """
690
        
691
        if keys:
692
            #TODO more efficient way to do this?
693
            for key in keys:
694
                s = self.attributes.delete()
695
                s = s.where(and_(self.attributes.c.serial == serial,
696
                                 self.attributes.c.key == key))
697
                self.conn.execute(s).close()
698
        else:
699
            s = self.attributes.delete()
700
            s = s.where(self.attributes.c.serial == serial)
701
            self.conn.execute(s).close()
702
    
703
    def attribute_copy(self, source, dest):
704
        s = select([dest, self.attributes.c.key, self.attributes.c.value],
705
            self.attributes.c.serial == source)
706
        rp = self.conn.execute(s)
707
        attributes = rp.fetchall()
708
        rp.close()
709
        for dest, k, v in attributes:
710
            #insert or replace
711
            s = self.attributes.update().where(and_(
712
                self.attributes.c.serial == dest,
713
                self.attributes.c.key == k))
714
            rp = self.conn.execute(s, value=v)
715
            rp.close()
716
            if rp.rowcount == 0:
717
                s = self.attributes.insert()
718
                values = {'serial':dest, 'key':k, 'value':v}
719
                self.conn.execute(s, values).close()
720
    
721
    def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
722
        """Return a list with all keys pairs defined
723
           for all latest versions under parent that
724
           do not belong to the cluster.
725
        """
726
        
727
        # TODO: Use another table to store before=inf results.
728
        a = self.attributes.alias('a')
729
        v = self.versions.alias('v')
730
        n = self.nodes.alias('n')
731
        s = select([a.c.key]).distinct()
732
        filtered = select([func.max(self.versions.c.serial)])
733
        if before != inf:
734
            filtered = filtered.where(self.versions.c.mtime < before)
735
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
736
        s = s.where(v.c.cluster != except_cluster)
737
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
738
            self.nodes.c.parent == parent)))
739
        s = s.where(a.c.serial == v.c.serial)
740
        s = s.where(n.c.node == v.c.node)
741
        conj = []
742
        for x in pathq:
743
            conj.append(n.c.path.like(x + '%'))
744
        if conj:
745
            s = s.where(or_(*conj))
746
        rp = self.conn.execute(s)
747
        rows = rp.fetchall()
748
        rp.close()
749
        return [r[0] for r in rows]
750
    
751
    def latest_version_list(self, parent, prefix='', delimiter=None,
752
                            start='', limit=10000, before=inf,
753
                            except_cluster=0, pathq=[], filterq=None):
754
        """Return a (list of (path, serial) tuples, list of common prefixes)
755
           for the current versions of the paths with the given parent,
756
           matching the following criteria.
757
           
758
           The property tuple for a version is returned if all
759
           of these conditions are true:
760
                
761
                a. parent matches
762
                
763
                b. path > start
764
                
765
                c. path starts with prefix (and paths in pathq)
766
                
767
                d. version is the max up to before
768
                
769
                e. version is not in cluster
770
                
771
                f. the path does not have the delimiter occuring
772
                   after the prefix, or ends with the delimiter
773
                
774
                g. serial matches the attribute filter query.
775
                   
776
                   A filter query is a comma-separated list of
777
                   terms in one of these three forms:
778
                   
779
                   key
780
                       an attribute with this key must exist
781
                   
782
                   !key
783
                       an attribute with this key must not exist
784
                   
785
                   key ?op value
786
                       the attribute with this key satisfies the value
787
                       where ?op is one of ==, != <=, >=, <, >.
788
           
789
           The list of common prefixes includes the prefixes
790
           matching up to the first delimiter after prefix,
791
           and are reported only once, as "virtual directories".
792
           The delimiter is included in the prefixes.
793
           
794
           If arguments are None, then the corresponding matching rule
795
           will always match.
796
           
797
           Limit applies to the first list of tuples returned.
798
        """
799
        
800
        if not start or start < prefix:
801
            start = strprevling(prefix)
802
        nextling = strnextling(prefix)
803
        
804
        a = self.attributes.alias('a')
805
        v = self.versions.alias('v')
806
        n = self.nodes.alias('n')
807
        s = select([n.c.path, v.c.serial]).distinct()
808
        filtered = select([func.max(self.versions.c.serial)])
809
        if before != inf:
810
            filtered = filtered.where(self.versions.c.mtime < before)
811
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
812
        s = s.where(v.c.cluster != except_cluster)
813
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
814
            self.nodes.c.parent == parent)))
815
        if filterq:
816
            s = s.where(a.c.serial == v.c.serial)
817
        
818
        s = s.where(n.c.node == v.c.node)
819
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
820
        conj = []
821
        for x in pathq:
822
            conj.append(n.c.path.like(x + '%'))
823
        
824
        if conj:
825
            s = s.where(or_(*conj))
826
        
827
        if filterq:
828
            s = s.where(a.c.key.in_(filterq.split(',')))
829
        
830
        s = s.order_by(n.c.path)
831
        
832
        if not delimiter:
833
            s = s.limit(limit)
834
            rp = self.conn.execute(s, start=start)
835
            r = rp.fetchall()
836
            rp.close()
837
            return r, ()
838
        
839
        pfz = len(prefix)
840
        dz = len(delimiter)
841
        count = 0
842
        prefixes = []
843
        pappend = prefixes.append
844
        matches = []
845
        mappend = matches.append
846
        
847
        rp = self.conn.execute(s, start=start)
848
        while True:
849
            props = rp.fetchone()
850
            if props is None:
851
                break
852
            path, serial = props
853
            idx = path.find(delimiter, pfz)
854
            
855
            if idx < 0:
856
                mappend(props)
857
                count += 1
858
                if count >= limit:
859
                    break
860
                continue
861
            
862
            if idx + dz == len(path):
863
                mappend(props)
864
                count += 1
865
                continue # Get one more, in case there is a path.
866
            pf = path[:idx + dz]
867
            pappend(pf)
868
            if count >= limit: 
869
                break
870
            
871
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
872
        rp.close()
873
        
874
        return matches, prefixes