Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / sqlalchemy / node.py @ 956e3c9f

History | View | Annotate | Download (32.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, null, select, bindparam, text
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
ROOTNODE  = 0
45

    
46
( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
47

    
48
inf = float('inf')
49

    
50

    
51
def strnextling(prefix):
52
    """Return the first unicode string
53
       greater than but not starting with given prefix.
54
       strnextling('hello') -> 'hellp'
55
    """
56
    if not prefix:
57
        ## all strings start with the null string,
58
        ## therefore we have to approximate strnextling('')
59
        ## with the last unicode character supported by python
60
        ## 0x10ffff for wide (32-bit unicode) python builds
61
        ## 0x00ffff for narrow (16-bit unicode) python builds
62
        ## We will not autodetect. 0xffff is safe enough.
63
        return unichr(0xffff)
64
    s = prefix[:-1]
65
    c = ord(prefix[-1])
66
    if c >= 0xffff:
67
        raise RuntimeError
68
    s += unichr(c+1)
69
    return s
70

    
71
def strprevling(prefix):
72
    """Return an approximation of the last unicode string
73
       less than but not starting with given prefix.
74
       strprevling(u'hello') -> u'helln\\xffff'
75
    """
76
    if not prefix:
77
        ## There is no prevling for the null string
78
        return prefix
79
    s = prefix[:-1]
80
    c = ord(prefix[-1])
81
    if c > 0:
82
        s += unichr(c-1) + unichr(0xffff)
83
    return s
84

    
85

    
86
_propnames = {
87
    'serial'    : 0,
88
    'node'      : 1,
89
    'hash'      : 2,
90
    'size'      : 3,
91
    'source'    : 4,
92
    'mtime'     : 5,
93
    'muser'     : 6,
94
    'cluster'   : 7,
95
}
96

    
97

    
98
class Node(DBWorker):
99
    """Nodes store path organization and have multiple versions.
100
       Versions store object history and have multiple attributes.
101
       Attributes store metadata.
102
    """
103
    
104
    # TODO: Provide an interface for included and excluded clusters.
105
    
106
    def __init__(self, **params):
107
        DBWorker.__init__(self, **params)
108
        metadata = MetaData()
109
        
110
        #create nodes table
111
        columns=[]
112
        columns.append(Column('node', Integer, primary_key=True))
113
        columns.append(Column('parent', Integer,
114
                              ForeignKey('nodes.node',
115
                                         ondelete='CASCADE',
116
                                         onupdate='CASCADE'),
117
                              autoincrement=False))
118
        path_length = 2048
119
        columns.append(Column('path', String(path_length), default='', nullable=False))
120
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
121
        
122
        #create policy table
123
        columns=[]
124
        columns.append(Column('node', Integer,
125
                              ForeignKey('nodes.node',
126
                                         ondelete='CASCADE',
127
                                         onupdate='CASCADE'),
128
                              primary_key=True))
129
        columns.append(Column('key', String(255), primary_key=True))
130
        columns.append(Column('value', String(255)))
131
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
132
        
133
        #create statistics table
134
        columns=[]
135
        columns.append(Column('node', Integer,
136
                              ForeignKey('nodes.node',
137
                                         ondelete='CASCADE',
138
                                         onupdate='CASCADE'),
139
                              primary_key=True))
140
        columns.append(Column('population', Integer, nullable=False, default=0))
141
        columns.append(Column('size', BigInteger, nullable=False, default=0))
142
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
143
        columns.append(Column('cluster', Integer, nullable=False, default=0,
144
                              primary_key=True, autoincrement=False))
145
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
146
        
147
        #create versions table
148
        columns=[]
149
        columns.append(Column('serial', Integer, primary_key=True))
150
        columns.append(Column('node', Integer,
151
                              ForeignKey('nodes.node',
152
                                         ondelete='CASCADE',
153
                                         onupdate='CASCADE')))
154
        columns.append(Column('hash', String(255)))
155
        columns.append(Column('size', BigInteger, nullable=False, default=0))
156
        columns.append(Column('source', Integer))
157
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
158
        columns.append(Column('muser', String(255), nullable=False, default=''))
159
        columns.append(Column('cluster', Integer, nullable=False, default=0))
160
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
161
        Index('idx_versions_node_mtime', self.versions.c.node,
162
              self.versions.c.mtime)
163
        
164
        #create attributes table
165
        columns = []
166
        columns.append(Column('serial', Integer,
167
                              ForeignKey('versions.serial',
168
                                         ondelete='CASCADE',
169
                                         onupdate='CASCADE'),
170
                              primary_key=True))
171
        columns.append(Column('key', String(255), primary_key=True))
172
        columns.append(Column('value', String(255)))
173
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
174
        
175
        metadata.create_all(self.engine)
176
        
177
        # the following code creates an index of specific length
178
        # this can be accompliced in sqlalchemy >= 0.7.3
179
        # providing mysql_length option during index creation
180
        insp = Inspector.from_engine(self.engine)
181
        indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
182
        if 'idx_nodes_path' not in indexes:
183
            explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
184
            s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
185
            self.conn.execute(s).close()
186
        
187
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
188
                                           self.nodes.c.parent == ROOTNODE))
189
        rp = self.conn.execute(s)
190
        r = rp.fetchone()
191
        rp.close()
192
        if not r:
193
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
194
            self.conn.execute(s)
195
    
196
    def node_create(self, parent, path):
197
        """Create a new node from the given properties.
198
           Return the node identifier of the new node.
199
        """
200
        #TODO catch IntegrityError?
201
        s = self.nodes.insert().values(parent=parent, path=path)
202
        r = self.conn.execute(s)
203
        inserted_primary_key = r.inserted_primary_key[0]
204
        r.close()
205
        return inserted_primary_key
206
    
207
    def node_lookup(self, path):
208
        """Lookup the current node of the given path.
209
           Return None if the path is not found.
210
        """
211
        
212
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
213
        path = path.replace('%', '\%')
214
        path = path.replace('_', '\_')
215
        s = select([self.nodes.c.node], self.nodes.c.path.like(path, escape='\\'))
216
        r = self.conn.execute(s)
217
        row = r.fetchone()
218
        r.close()
219
        if row:
220
            return row[0]
221
        return None
222
    
223
    def node_get_properties(self, node):
224
        """Return the node's (parent, path).
225
           Return None if the node is not found.
226
        """
227
        
228
        s = select([self.nodes.c.parent, self.nodes.c.path])
229
        s = s.where(self.nodes.c.node == node)
230
        r = self.conn.execute(s)
231
        l = r.fetchone()
232
        r.close()
233
        return l
234
    
235
    def node_get_versions(self, node, keys=(), propnames=_propnames):
236
        """Return the properties of all versions at node.
237
           If keys is empty, return all properties in the order
238
           (serial, node, size, source, mtime, muser, cluster).
239
        """
240
        
241
        s = select([self.versions.c.serial,
242
                    self.versions.c.node,
243
                    self.versions.c.hash,
244
                    self.versions.c.size,
245
                    self.versions.c.source,
246
                    self.versions.c.mtime,
247
                    self.versions.c.muser,
248
                    self.versions.c.cluster], self.versions.c.node == node)
249
        s = s.order_by(self.versions.c.serial)
250
        r = self.conn.execute(s)
251
        rows = r.fetchall()
252
        r.close()
253
        if not rows:
254
            return rows
255
        
256
        if not keys:
257
            return rows
258
        
259
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
260
    
261
    def node_count_children(self, node):
262
        """Return node's child count."""
263
        
264
        s = select([func.count(self.nodes.c.node)])
265
        s = s.where(and_(self.nodes.c.parent == node,
266
                         self.nodes.c.node != ROOTNODE))
267
        r = self.conn.execute(s)
268
        row = r.fetchone()
269
        r.close()
270
        return row[0]
271
    
272
    def node_purge_children(self, parent, before=inf, cluster=0):
273
        """Delete all versions with the specified
274
           parent and cluster, and return
275
           the hashes of versions deleted.
276
           Clears out nodes with no remaining versions.
277
        """
278
        #update statistics
279
        c1 = select([self.nodes.c.node],
280
            self.nodes.c.parent == parent)
281
        where_clause = and_(self.versions.c.node.in_(c1),
282
                            self.versions.c.cluster == cluster)
283
        s = select([func.count(self.versions.c.serial),
284
                    func.sum(self.versions.c.size)])
285
        s = s.where(where_clause)
286
        if before != inf:
287
            s = s.where(self.versions.c.mtime <= before)
288
        r = self.conn.execute(s)
289
        row = r.fetchone()
290
        r.close()
291
        if not row:
292
            return ()
293
        nr, size = row[0], -row[1] if row[1] else 0
294
        mtime = time()
295
        self.statistics_update(parent, -nr, size, mtime, cluster)
296
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
297
        
298
        s = select([self.versions.c.hash])
299
        s = s.where(where_clause)
300
        r = self.conn.execute(s)
301
        hashes = [row[0] for row in r.fetchall()]
302
        r.close()
303
        
304
        #delete versions
305
        s = self.versions.delete().where(where_clause)
306
        r = self.conn.execute(s)
307
        r.close()
308
        
309
        #delete nodes
310
        s = select([self.nodes.c.node],
311
            and_(self.nodes.c.parent == parent,
312
                 select([func.count(self.versions.c.serial)],
313
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
314
        rp = self.conn.execute(s)
315
        nodes = [r[0] for r in rp.fetchall()]
316
        rp.close()
317
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
318
        self.conn.execute(s).close()
319
        
320
        return hashes
321
    
322
    def node_purge(self, node, before=inf, cluster=0):
323
        """Delete all versions with the specified
324
           node and cluster, and return
325
           the hashes of versions deleted.
326
           Clears out the node if it has no remaining versions.
327
        """
328
        
329
        #update statistics
330
        s = select([func.count(self.versions.c.serial),
331
                    func.sum(self.versions.c.size)])
332
        where_clause = and_(self.versions.c.node == node,
333
                         self.versions.c.cluster == cluster)
334
        s = s.where(where_clause)
335
        if before != inf:
336
            s = s.where(self.versions.c.mtime <= before)
337
        r = self.conn.execute(s)
338
        row = r.fetchone()
339
        nr, size = row[0], row[1]
340
        r.close()
341
        if not nr:
342
            return ()
343
        mtime = time()
344
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
345
        
346
        s = select([self.versions.c.hash])
347
        s = s.where(where_clause)
348
        r = self.conn.execute(s)
349
        hashes = [r[0] for r in r.fetchall()]
350
        r.close()
351
        
352
        #delete versions
353
        s = self.versions.delete().where(where_clause)
354
        r = self.conn.execute(s)
355
        r.close()
356
        
357
        #delete nodes
358
        s = select([self.nodes.c.node],
359
            and_(self.nodes.c.node == node,
360
                 select([func.count(self.versions.c.serial)],
361
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
362
        r = self.conn.execute(s)
363
        nodes = r.fetchall()
364
        r.close()
365
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
366
        self.conn.execute(s).close()
367
        
368
        return hashes
369
    
370
    def node_remove(self, node):
371
        """Remove the node specified.
372
           Return false if the node has children or is not found.
373
        """
374
        
375
        if self.node_count_children(node):
376
            return False
377
        
378
        mtime = time()
379
        s = select([func.count(self.versions.c.serial),
380
                    func.sum(self.versions.c.size),
381
                    self.versions.c.cluster])
382
        s = s.where(self.versions.c.node == node)
383
        s = s.group_by(self.versions.c.cluster)
384
        r = self.conn.execute(s)
385
        for population, size, cluster in r.fetchall():
386
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
387
        r.close()
388
        
389
        s = self.nodes.delete().where(self.nodes.c.node == node)
390
        self.conn.execute(s).close()
391
        return True
392
    
393
    def policy_get(self, node):
394
        s = select([self.policies.c.key, self.policies.c.value],
395
            self.policies.c.node==node)
396
        r = self.conn.execute(s)
397
        d = dict(r.fetchall())
398
        r.close()
399
        return d
400
    
401
    def policy_set(self, node, policy):
402
        #insert or replace
403
        for k, v in policy.iteritems():
404
            s = self.policies.update().where(and_(self.policies.c.node == node,
405
                                                  self.policies.c.key == k))
406
            s = s.values(value = v)
407
            rp = self.conn.execute(s)
408
            rp.close()
409
            if rp.rowcount == 0:
410
                s = self.policies.insert()
411
                values = {'node':node, 'key':k, 'value':v}
412
                r = self.conn.execute(s, values)
413
                r.close()
414
    
415
    def statistics_get(self, node, cluster=0):
416
        """Return population, total size and last mtime
417
           for all versions under node that belong to the cluster.
418
        """
419
        
420
        s = select([self.statistics.c.population,
421
                    self.statistics.c.size,
422
                    self.statistics.c.mtime])
423
        s = s.where(and_(self.statistics.c.node == node,
424
                         self.statistics.c.cluster == cluster))
425
        r = self.conn.execute(s)
426
        row = r.fetchone()
427
        r.close()
428
        return row
429
    
430
    def statistics_update(self, node, population, size, mtime, cluster=0):
431
        """Update the statistics of the given node.
432
           Statistics keep track the population, total
433
           size of objects and mtime in the node's namespace.
434
           May be zero or positive or negative numbers.
435
        """
436
        s = select([self.statistics.c.population, self.statistics.c.size],
437
            and_(self.statistics.c.node == node,
438
                 self.statistics.c.cluster == cluster))
439
        rp = self.conn.execute(s)
440
        r = rp.fetchone()
441
        rp.close()
442
        if not r:
443
            prepopulation, presize = (0, 0)
444
        else:
445
            prepopulation, presize = r
446
        population += prepopulation
447
        size += presize
448
        
449
        #insert or replace
450
        #TODO better upsert
451
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
452
                                           self.statistics.c.cluster==cluster))
453
        u = u.values(population=population, size=size, mtime=mtime)
454
        rp = self.conn.execute(u)
455
        rp.close()
456
        if rp.rowcount == 0:
457
            ins = self.statistics.insert()
458
            ins = ins.values(node=node, population=population, size=size,
459
                             mtime=mtime, cluster=cluster)
460
            self.conn.execute(ins).close()
461
    
462
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
463
        """Update the statistics of the given node's parent.
464
           Then recursively update all parents up to the root.
465
           Population is not recursive.
466
        """
467
        
468
        while True:
469
            if node == ROOTNODE:
470
                break
471
            props = self.node_get_properties(node)
472
            if props is None:
473
                break
474
            parent, path = props
475
            self.statistics_update(parent, population, size, mtime, cluster)
476
            node = parent
477
            population = 0 # Population isn't recursive
478
    
479
    def statistics_latest(self, node, before=inf, except_cluster=0):
480
        """Return population, total size and last mtime
481
           for all latest versions under node that
482
           do not belong to the cluster.
483
        """
484
        
485
        # The node.
486
        props = self.node_get_properties(node)
487
        if props is None:
488
            return None
489
        parent, path = props
490
        
491
        # The latest version.
492
        s = select([self.versions.c.serial,
493
                    self.versions.c.node,
494
                    self.versions.c.hash,
495
                    self.versions.c.size,
496
                    self.versions.c.source,
497
                    self.versions.c.mtime,
498
                    self.versions.c.muser,
499
                    self.versions.c.cluster])
500
        filtered = select([func.max(self.versions.c.serial)],
501
                            self.versions.c.node == node)
502
        if before != inf:
503
            filtered = filtered.where(self.versions.c.mtime < before)
504
        s = s.where(and_(self.versions.c.cluster != except_cluster,
505
                         self.versions.c.serial == filtered))
506
        r = self.conn.execute(s)
507
        props = r.fetchone()
508
        r.close()
509
        if not props:
510
            return None
511
        mtime = props[MTIME]
512
        
513
        # First level, just under node (get population).
514
        v = self.versions.alias('v')
515
        s = select([func.count(v.c.serial),
516
                    func.sum(v.c.size),
517
                    func.max(v.c.mtime)])
518
        c1 = select([func.max(self.versions.c.serial)])
519
        if before != inf:
520
            c1 = c1.where(self.versions.c.mtime < before)
521
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
522
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
523
                         v.c.cluster != except_cluster,
524
                         v.c.node.in_(c2)))
525
        rp = self.conn.execute(s)
526
        r = rp.fetchone()
527
        rp.close()
528
        if not r:
529
            return None
530
        count = r[0]
531
        mtime = max(mtime, r[2])
532
        if count == 0:
533
            return (0, 0, mtime)
534
        
535
        # All children (get size and mtime).
536
        # XXX: This is why the full path is stored.
537
        s = select([func.count(v.c.serial),
538
                    func.sum(v.c.size),
539
                    func.max(v.c.mtime)])
540
        c1 = select([func.max(self.versions.c.serial)],
541
            self.versions.c.node == v.c.node)
542
        if before != inf:
543
            c1 = c1.where(self.versions.c.mtime < before)
544
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(path + '%'))
545
        s = s.where(and_(v.c.serial == c1,
546
                         v.c.cluster != except_cluster,
547
                         v.c.node.in_(c2)))
548
        rp = self.conn.execute(s)
549
        r = rp.fetchone()
550
        rp.close()
551
        if not r:
552
            return None
553
        size = r[1] - props[SIZE]
554
        mtime = max(mtime, r[2])
555
        return (count, size, mtime)
556
    
557
    def version_create(self, node, hash, size, source, muser, cluster=0):
558
        """Create a new version from the given properties.
559
           Return the (serial, mtime) of the new version.
560
        """
561
        
562
        mtime = time()
563
        s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
564
                                          mtime=mtime, muser=muser, cluster=cluster)
565
        serial = self.conn.execute(s).inserted_primary_key[0]
566
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
567
        return serial, mtime
568
    
569
    def version_lookup(self, node, before=inf, cluster=0):
570
        """Lookup the current version of the given node.
571
           Return a list with its properties:
572
           (serial, node, hash, size, source, mtime, muser, cluster)
573
           or None if the current version is not found in the given cluster.
574
        """
575
        
576
        v = self.versions.alias('v')
577
        s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
578
                    v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
579
        c = select([func.max(self.versions.c.serial)],
580
            self.versions.c.node == node)
581
        if before != inf:
582
            c = c.where(self.versions.c.mtime < before)
583
        s = s.where(and_(v.c.serial == c,
584
                         v.c.cluster == cluster))
585
        r = self.conn.execute(s)
586
        props = r.fetchone()
587
        r.close()
588
        if props:
589
            return props
590
        return None
591
    
592
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
593
        """Return a sequence of values for the properties of
594
           the version specified by serial and the keys, in the order given.
595
           If keys is empty, return all properties in the order
596
           (serial, node, hash, size, source, mtime, muser, cluster).
597
        """
598
        
599
        v = self.versions.alias()
600
        s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
601
                    v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
602
        rp = self.conn.execute(s)
603
        r = rp.fetchone()
604
        rp.close()
605
        if r is None:
606
            return r
607
        
608
        if not keys:
609
            return r
610
        return [r[propnames[k]] for k in keys if k in propnames]
611
    
612
    def version_recluster(self, serial, cluster):
613
        """Move the version into another cluster."""
614
        
615
        props = self.version_get_properties(serial)
616
        if not props:
617
            return
618
        node = props[NODE]
619
        size = props[SIZE]
620
        oldcluster = props[CLUSTER]
621
        if cluster == oldcluster:
622
            return
623
        
624
        mtime = time()
625
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
626
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
627
        
628
        s = self.versions.update()
629
        s = s.where(self.versions.c.serial == serial)
630
        s = s.values(cluster = cluster)
631
        self.conn.execute(s).close()
632
    
633
    def version_remove(self, serial):
634
        """Remove the serial specified."""
635
        
636
        props = self.version_get_properties(serial)
637
        if not props:
638
            return
639
        node = props[NODE]
640
        hash = props[HASH]
641
        size = props[SIZE]
642
        cluster = props[CLUSTER]
643
        
644
        mtime = time()
645
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
646
        
647
        s = self.versions.delete().where(self.versions.c.serial == serial)
648
        self.conn.execute(s).close()
649
        return hash
650
    
651
    def attribute_get(self, serial, keys=()):
652
        """Return a list of (key, value) pairs of the version specified by serial.
653
           If keys is empty, return all attributes.
654
           Othwerise, return only those specified.
655
        """
656
        
657
        if keys:
658
            attrs = self.attributes.alias()
659
            s = select([attrs.c.key, attrs.c.value])
660
            s = s.where(and_(attrs.c.key.in_(keys),
661
                             attrs.c.serial == serial))
662
        else:
663
            attrs = self.attributes.alias()
664
            s = select([attrs.c.key, attrs.c.value])
665
            s = s.where(attrs.c.serial == serial)
666
        r = self.conn.execute(s)
667
        l = r.fetchall()
668
        r.close()
669
        return l
670
    
671
    def attribute_set(self, serial, items):
672
        """Set the attributes of the version specified by serial.
673
           Receive attributes as an iterable of (key, value) pairs.
674
        """
675
        #insert or replace
676
        #TODO better upsert
677
        for k, v in items:
678
            s = self.attributes.update()
679
            s = s.where(and_(self.attributes.c.serial == serial,
680
                             self.attributes.c.key == k))
681
            s = s.values(value = v)
682
            rp = self.conn.execute(s)
683
            rp.close()
684
            if rp.rowcount == 0:
685
                s = self.attributes.insert()
686
                s = s.values(serial=serial, key=k, value=v)
687
                self.conn.execute(s).close()
688
    
689
    def attribute_del(self, serial, keys=()):
690
        """Delete attributes of the version specified by serial.
691
           If keys is empty, delete all attributes.
692
           Otherwise delete those specified.
693
        """
694
        
695
        if keys:
696
            #TODO more efficient way to do this?
697
            for key in keys:
698
                s = self.attributes.delete()
699
                s = s.where(and_(self.attributes.c.serial == serial,
700
                                 self.attributes.c.key == key))
701
                self.conn.execute(s).close()
702
        else:
703
            s = self.attributes.delete()
704
            s = s.where(self.attributes.c.serial == serial)
705
            self.conn.execute(s).close()
706
    
707
    def attribute_copy(self, source, dest):
708
        s = select([dest, self.attributes.c.key, self.attributes.c.value],
709
            self.attributes.c.serial == source)
710
        rp = self.conn.execute(s)
711
        attributes = rp.fetchall()
712
        rp.close()
713
        for dest, k, v in attributes:
714
            #insert or replace
715
            s = self.attributes.update().where(and_(
716
                self.attributes.c.serial == dest,
717
                self.attributes.c.key == k))
718
            rp = self.conn.execute(s, value=v)
719
            rp.close()
720
            if rp.rowcount == 0:
721
                s = self.attributes.insert()
722
                values = {'serial':dest, 'key':k, 'value':v}
723
                self.conn.execute(s, values).close()
724
    
725
    def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
726
        """Return a list with all keys pairs defined
727
           for all latest versions under parent that
728
           do not belong to the cluster.
729
        """
730
        
731
        # TODO: Use another table to store before=inf results.
732
        a = self.attributes.alias('a')
733
        v = self.versions.alias('v')
734
        n = self.nodes.alias('n')
735
        s = select([a.c.key]).distinct()
736
        filtered = select([func.max(self.versions.c.serial)])
737
        if before != inf:
738
            filtered = filtered.where(self.versions.c.mtime < before)
739
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
740
        s = s.where(v.c.cluster != except_cluster)
741
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
742
            self.nodes.c.parent == parent)))
743
        s = s.where(a.c.serial == v.c.serial)
744
        s = s.where(n.c.node == v.c.node)
745
        conj = []
746
        for x in pathq:
747
            conj.append(n.c.path.like(x + '%'))
748
        if conj:
749
            s = s.where(or_(*conj))
750
        rp = self.conn.execute(s)
751
        rows = rp.fetchall()
752
        rp.close()
753
        return [r[0] for r in rows]
754
    
755
    def latest_version_list(self, parent, prefix='', delimiter=None,
756
                            start='', limit=10000, before=inf,
757
                            except_cluster=0, pathq=[], filterq=None):
758
        """Return a (list of (path, serial) tuples, list of common prefixes)
759
           for the current versions of the paths with the given parent,
760
           matching the following criteria.
761
           
762
           The property tuple for a version is returned if all
763
           of these conditions are true:
764
                
765
                a. parent matches
766
                
767
                b. path > start
768
                
769
                c. path starts with prefix (and paths in pathq)
770
                
771
                d. version is the max up to before
772
                
773
                e. version is not in cluster
774
                
775
                f. the path does not have the delimiter occuring
776
                   after the prefix, or ends with the delimiter
777
                
778
                g. serial matches the attribute filter query.
779
                   
780
                   A filter query is a comma-separated list of
781
                   terms in one of these three forms:
782
                   
783
                   key
784
                       an attribute with this key must exist
785
                   
786
                   !key
787
                       an attribute with this key must not exist
788
                   
789
                   key ?op value
790
                       the attribute with this key satisfies the value
791
                       where ?op is one of ==, != <=, >=, <, >.
792
           
793
           The list of common prefixes includes the prefixes
794
           matching up to the first delimiter after prefix,
795
           and are reported only once, as "virtual directories".
796
           The delimiter is included in the prefixes.
797
           
798
           If arguments are None, then the corresponding matching rule
799
           will always match.
800
           
801
           Limit applies to the first list of tuples returned.
802
        """
803
        
804
        if not start or start < prefix:
805
            start = strprevling(prefix)
806
        nextling = strnextling(prefix)
807
        
808
        a = self.attributes.alias('a')
809
        v = self.versions.alias('v')
810
        n = self.nodes.alias('n')
811
        s = select([n.c.path, v.c.serial]).distinct()
812
        filtered = select([func.max(self.versions.c.serial)])
813
        if before != inf:
814
            filtered = filtered.where(self.versions.c.mtime < before)
815
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
816
        s = s.where(v.c.cluster != except_cluster)
817
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
818
            self.nodes.c.parent == parent)))
819
        if filterq:
820
            s = s.where(a.c.serial == v.c.serial)
821
        
822
        s = s.where(n.c.node == v.c.node)
823
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
824
        conj = []
825
        for x in pathq:
826
            conj.append(n.c.path.like(x + '%'))
827
        
828
        if conj:
829
            s = s.where(or_(*conj))
830
        
831
        if filterq:
832
            s = s.where(a.c.key.in_(filterq.split(',')))
833
        
834
        s = s.order_by(n.c.path)
835
        
836
        if not delimiter:
837
            s = s.limit(limit)
838
            rp = self.conn.execute(s, start=start)
839
            r = rp.fetchall()
840
            rp.close()
841
            return r, ()
842
        
843
        pfz = len(prefix)
844
        dz = len(delimiter)
845
        count = 0
846
        prefixes = []
847
        pappend = prefixes.append
848
        matches = []
849
        mappend = matches.append
850
        
851
        rp = self.conn.execute(s, start=start)
852
        while True:
853
            props = rp.fetchone()
854
            if props is None:
855
                break
856
            path, serial = props
857
            idx = path.find(delimiter, pfz)
858
            
859
            if idx < 0:
860
                mappend(props)
861
                count += 1
862
                if count >= limit:
863
                    break
864
                continue
865
            
866
            if idx + dz == len(path):
867
                mappend(props)
868
                count += 1
869
                continue # Get one more, in case there is a path.
870
            pf = path[:idx + dz]
871
            pappend(pf)
872
            if count >= limit: 
873
                break
874
            
875
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
876
        rp.close()
877
        
878
        return matches, prefixes