Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / sqlalchemy / node.py @ 7759260d

History | View | Annotate | Download (33 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, null, select, bindparam, text
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
ROOTNODE  = 0
45

    
46
( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
47

    
48
inf = float('inf')
49

    
50

    
51
def strnextling(prefix):
52
    """Return the first unicode string
53
       greater than but not starting with given prefix.
54
       strnextling('hello') -> 'hellp'
55
    """
56
    if not prefix:
57
        ## all strings start with the null string,
58
        ## therefore we have to approximate strnextling('')
59
        ## with the last unicode character supported by python
60
        ## 0x10ffff for wide (32-bit unicode) python builds
61
        ## 0x00ffff for narrow (16-bit unicode) python builds
62
        ## We will not autodetect. 0xffff is safe enough.
63
        return unichr(0xffff)
64
    s = prefix[:-1]
65
    c = ord(prefix[-1])
66
    if c >= 0xffff:
67
        raise RuntimeError
68
    s += unichr(c+1)
69
    return s
70

    
71
def strprevling(prefix):
72
    """Return an approximation of the last unicode string
73
       less than but not starting with given prefix.
74
       strprevling(u'hello') -> u'helln\\xffff'
75
    """
76
    if not prefix:
77
        ## There is no prevling for the null string
78
        return prefix
79
    s = prefix[:-1]
80
    c = ord(prefix[-1])
81
    if c > 0:
82
        s += unichr(c-1) + unichr(0xffff)
83
    return s
84

    
85

    
86
_propnames = {
87
    'serial'    : 0,
88
    'node'      : 1,
89
    'hash'      : 2,
90
    'size'      : 3,
91
    'source'    : 4,
92
    'mtime'     : 5,
93
    'muser'     : 6,
94
    'cluster'   : 7,
95
}
96

    
97

    
98
class Node(DBWorker):
99
    """Nodes store path organization and have multiple versions.
100
       Versions store object history and have multiple attributes.
101
       Attributes store metadata.
102
    """
103
    
104
    # TODO: Provide an interface for included and excluded clusters.
105
    
106
    def __init__(self, **params):
107
        DBWorker.__init__(self, **params)
108
        metadata = MetaData()
109
        
110
        #create nodes table
111
        columns=[]
112
        columns.append(Column('node', Integer, primary_key=True))
113
        columns.append(Column('parent', Integer,
114
                              ForeignKey('nodes.node',
115
                                         ondelete='CASCADE',
116
                                         onupdate='CASCADE'),
117
                              autoincrement=False))
118
        path_length = 2048
119
        columns.append(Column('path', String(path_length), default='', nullable=False))
120
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
121
        
122
        #create policy table
123
        columns=[]
124
        columns.append(Column('node', Integer,
125
                              ForeignKey('nodes.node',
126
                                         ondelete='CASCADE',
127
                                         onupdate='CASCADE'),
128
                              primary_key=True))
129
        columns.append(Column('key', String(255), primary_key=True))
130
        columns.append(Column('value', String(255)))
131
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
132
        
133
        #create statistics table
134
        columns=[]
135
        columns.append(Column('node', Integer,
136
                              ForeignKey('nodes.node',
137
                                         ondelete='CASCADE',
138
                                         onupdate='CASCADE'),
139
                              primary_key=True))
140
        columns.append(Column('population', Integer, nullable=False, default=0))
141
        columns.append(Column('size', BigInteger, nullable=False, default=0))
142
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
143
        columns.append(Column('cluster', Integer, nullable=False, default=0,
144
                              primary_key=True, autoincrement=False))
145
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
146
        
147
        #create versions table
148
        columns=[]
149
        columns.append(Column('serial', Integer, primary_key=True))
150
        columns.append(Column('node', Integer,
151
                              ForeignKey('nodes.node',
152
                                         ondelete='CASCADE',
153
                                         onupdate='CASCADE')))
154
        columns.append(Column('hash', String(255)))
155
        columns.append(Column('size', BigInteger, nullable=False, default=0))
156
        columns.append(Column('source', Integer))
157
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
158
        columns.append(Column('muser', String(255), nullable=False, default=''))
159
        columns.append(Column('cluster', Integer, nullable=False, default=0))
160
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
161
        Index('idx_versions_node_mtime', self.versions.c.node,
162
              self.versions.c.mtime)
163
        
164
        #create attributes table
165
        columns = []
166
        columns.append(Column('serial', Integer,
167
                              ForeignKey('versions.serial',
168
                                         ondelete='CASCADE',
169
                                         onupdate='CASCADE'),
170
                              primary_key=True))
171
        columns.append(Column('key', String(255), primary_key=True))
172
        columns.append(Column('value', String(255)))
173
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
174
        
175
        metadata.create_all(self.engine)
176
        
177
        # the following code creates an index of specific length
178
        # this can be accompliced in sqlalchemy >= 0.7.3
179
        # providing mysql_length option during index creation
180
        insp = Inspector.from_engine(self.engine)
181
        indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
182
        if 'idx_nodes_path' not in indexes:
183
            explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
184
            s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
185
            self.conn.execute(s).close()
186
        
187
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
188
                                           self.nodes.c.parent == ROOTNODE))
189
        rp = self.conn.execute(s)
190
        r = rp.fetchone()
191
        rp.close()
192
        if not r:
193
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
194
            self.conn.execute(s)
195
    
196
    def node_create(self, parent, path):
197
        """Create a new node from the given properties.
198
           Return the node identifier of the new node.
199
        """
200
        #TODO catch IntegrityError?
201
        s = self.nodes.insert().values(parent=parent, path=path)
202
        r = self.conn.execute(s)
203
        inserted_primary_key = r.inserted_primary_key[0]
204
        r.close()
205
        return inserted_primary_key
206
    
207
    def node_lookup(self, path):
208
        """Lookup the current node of the given path.
209
           Return None if the path is not found.
210
        """
211
        
212
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
213
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
214
        r = self.conn.execute(s)
215
        row = r.fetchone()
216
        r.close()
217
        if row:
218
            return row[0]
219
        return None
220
    
221
    def node_get_properties(self, node):
222
        """Return the node's (parent, path).
223
           Return None if the node is not found.
224
        """
225
        
226
        s = select([self.nodes.c.parent, self.nodes.c.path])
227
        s = s.where(self.nodes.c.node == node)
228
        r = self.conn.execute(s)
229
        l = r.fetchone()
230
        r.close()
231
        return l
232
    
233
    def node_get_versions(self, node, keys=(), propnames=_propnames):
234
        """Return the properties of all versions at node.
235
           If keys is empty, return all properties in the order
236
           (serial, node, size, source, mtime, muser, cluster).
237
        """
238
        
239
        s = select([self.versions.c.serial,
240
                    self.versions.c.node,
241
                    self.versions.c.hash,
242
                    self.versions.c.size,
243
                    self.versions.c.source,
244
                    self.versions.c.mtime,
245
                    self.versions.c.muser,
246
                    self.versions.c.cluster], self.versions.c.node == node)
247
        s = s.order_by(self.versions.c.serial)
248
        r = self.conn.execute(s)
249
        rows = r.fetchall()
250
        r.close()
251
        if not rows:
252
            return rows
253
        
254
        if not keys:
255
            return rows
256
        
257
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
258
    
259
    def node_count_children(self, node):
260
        """Return node's child count."""
261
        
262
        s = select([func.count(self.nodes.c.node)])
263
        s = s.where(and_(self.nodes.c.parent == node,
264
                         self.nodes.c.node != ROOTNODE))
265
        r = self.conn.execute(s)
266
        row = r.fetchone()
267
        r.close()
268
        return row[0]
269
    
270
    def node_purge_children(self, parent, before=inf, cluster=0):
271
        """Delete all versions with the specified
272
           parent and cluster, and return
273
           the hashes of versions deleted.
274
           Clears out nodes with no remaining versions.
275
        """
276
        #update statistics
277
        c1 = select([self.nodes.c.node],
278
            self.nodes.c.parent == parent)
279
        where_clause = and_(self.versions.c.node.in_(c1),
280
                            self.versions.c.cluster == cluster)
281
        s = select([func.count(self.versions.c.serial),
282
                    func.sum(self.versions.c.size)])
283
        s = s.where(where_clause)
284
        if before != inf:
285
            s = s.where(self.versions.c.mtime <= before)
286
        r = self.conn.execute(s)
287
        row = r.fetchone()
288
        r.close()
289
        if not row:
290
            return ()
291
        nr, size = row[0], -row[1] if row[1] else 0
292
        mtime = time()
293
        self.statistics_update(parent, -nr, size, mtime, cluster)
294
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
295
        
296
        s = select([self.versions.c.hash])
297
        s = s.where(where_clause)
298
        r = self.conn.execute(s)
299
        hashes = [row[0] for row in r.fetchall()]
300
        r.close()
301
        
302
        #delete versions
303
        s = self.versions.delete().where(where_clause)
304
        r = self.conn.execute(s)
305
        r.close()
306
        
307
        #delete nodes
308
        s = select([self.nodes.c.node],
309
            and_(self.nodes.c.parent == parent,
310
                 select([func.count(self.versions.c.serial)],
311
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
312
        rp = self.conn.execute(s)
313
        nodes = [r[0] for r in rp.fetchall()]
314
        rp.close()
315
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
316
        self.conn.execute(s).close()
317
        
318
        return hashes
319
    
320
    def node_purge(self, node, before=inf, cluster=0):
321
        """Delete all versions with the specified
322
           node and cluster, and return
323
           the hashes of versions deleted.
324
           Clears out the node if it has no remaining versions.
325
        """
326
        
327
        #update statistics
328
        s = select([func.count(self.versions.c.serial),
329
                    func.sum(self.versions.c.size)])
330
        where_clause = and_(self.versions.c.node == node,
331
                         self.versions.c.cluster == cluster)
332
        s = s.where(where_clause)
333
        if before != inf:
334
            s = s.where(self.versions.c.mtime <= before)
335
        r = self.conn.execute(s)
336
        row = r.fetchone()
337
        nr, size = row[0], row[1]
338
        r.close()
339
        if not nr:
340
            return ()
341
        mtime = time()
342
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
343
        
344
        s = select([self.versions.c.hash])
345
        s = s.where(where_clause)
346
        r = self.conn.execute(s)
347
        hashes = [r[0] for r in r.fetchall()]
348
        r.close()
349
        
350
        #delete versions
351
        s = self.versions.delete().where(where_clause)
352
        r = self.conn.execute(s)
353
        r.close()
354
        
355
        #delete nodes
356
        s = select([self.nodes.c.node],
357
            and_(self.nodes.c.node == node,
358
                 select([func.count(self.versions.c.serial)],
359
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
360
        r = self.conn.execute(s)
361
        nodes = r.fetchall()
362
        r.close()
363
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
364
        self.conn.execute(s).close()
365
        
366
        return hashes
367
    
368
    def node_remove(self, node):
369
        """Remove the node specified.
370
           Return false if the node has children or is not found.
371
        """
372
        
373
        if self.node_count_children(node):
374
            return False
375
        
376
        mtime = time()
377
        s = select([func.count(self.versions.c.serial),
378
                    func.sum(self.versions.c.size),
379
                    self.versions.c.cluster])
380
        s = s.where(self.versions.c.node == node)
381
        s = s.group_by(self.versions.c.cluster)
382
        r = self.conn.execute(s)
383
        for population, size, cluster in r.fetchall():
384
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
385
        r.close()
386
        
387
        s = self.nodes.delete().where(self.nodes.c.node == node)
388
        self.conn.execute(s).close()
389
        return True
390
    
391
    def policy_get(self, node):
392
        s = select([self.policies.c.key, self.policies.c.value],
393
            self.policies.c.node==node)
394
        r = self.conn.execute(s)
395
        d = dict(r.fetchall())
396
        r.close()
397
        return d
398
    
399
    def policy_set(self, node, policy):
400
        #insert or replace
401
        for k, v in policy.iteritems():
402
            s = self.policies.update().where(and_(self.policies.c.node == node,
403
                                                  self.policies.c.key == k))
404
            s = s.values(value = v)
405
            rp = self.conn.execute(s)
406
            rp.close()
407
            if rp.rowcount == 0:
408
                s = self.policies.insert()
409
                values = {'node':node, 'key':k, 'value':v}
410
                r = self.conn.execute(s, values)
411
                r.close()
412
    
413
    def statistics_get(self, node, cluster=0):
414
        """Return population, total size and last mtime
415
           for all versions under node that belong to the cluster.
416
        """
417
        
418
        s = select([self.statistics.c.population,
419
                    self.statistics.c.size,
420
                    self.statistics.c.mtime])
421
        s = s.where(and_(self.statistics.c.node == node,
422
                         self.statistics.c.cluster == cluster))
423
        r = self.conn.execute(s)
424
        row = r.fetchone()
425
        r.close()
426
        return row
427
    
428
    def statistics_update(self, node, population, size, mtime, cluster=0):
429
        """Update the statistics of the given node.
430
           Statistics keep track the population, total
431
           size of objects and mtime in the node's namespace.
432
           May be zero or positive or negative numbers.
433
        """
434
        s = select([self.statistics.c.population, self.statistics.c.size],
435
            and_(self.statistics.c.node == node,
436
                 self.statistics.c.cluster == cluster))
437
        rp = self.conn.execute(s)
438
        r = rp.fetchone()
439
        rp.close()
440
        if not r:
441
            prepopulation, presize = (0, 0)
442
        else:
443
            prepopulation, presize = r
444
        population += prepopulation
445
        size += presize
446
        
447
        #insert or replace
448
        #TODO better upsert
449
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
450
                                           self.statistics.c.cluster==cluster))
451
        u = u.values(population=population, size=size, mtime=mtime)
452
        rp = self.conn.execute(u)
453
        rp.close()
454
        if rp.rowcount == 0:
455
            ins = self.statistics.insert()
456
            ins = ins.values(node=node, population=population, size=size,
457
                             mtime=mtime, cluster=cluster)
458
            self.conn.execute(ins).close()
459
    
460
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
461
        """Update the statistics of the given node's parent.
462
           Then recursively update all parents up to the root.
463
           Population is not recursive.
464
        """
465
        
466
        while True:
467
            if node == ROOTNODE:
468
                break
469
            props = self.node_get_properties(node)
470
            if props is None:
471
                break
472
            parent, path = props
473
            self.statistics_update(parent, population, size, mtime, cluster)
474
            node = parent
475
            population = 0 # Population isn't recursive
476
    
477
    def statistics_latest(self, node, before=inf, except_cluster=0):
478
        """Return population, total size and last mtime
479
           for all latest versions under node that
480
           do not belong to the cluster.
481
        """
482
        
483
        # The node.
484
        props = self.node_get_properties(node)
485
        if props is None:
486
            return None
487
        parent, path = props
488
        
489
        # The latest version.
490
        s = select([self.versions.c.serial,
491
                    self.versions.c.node,
492
                    self.versions.c.hash,
493
                    self.versions.c.size,
494
                    self.versions.c.source,
495
                    self.versions.c.mtime,
496
                    self.versions.c.muser,
497
                    self.versions.c.cluster])
498
        filtered = select([func.max(self.versions.c.serial)],
499
                            self.versions.c.node == node)
500
        if before != inf:
501
            filtered = filtered.where(self.versions.c.mtime < before)
502
        s = s.where(and_(self.versions.c.cluster != except_cluster,
503
                         self.versions.c.serial == filtered))
504
        r = self.conn.execute(s)
505
        props = r.fetchone()
506
        r.close()
507
        if not props:
508
            return None
509
        mtime = props[MTIME]
510
        
511
        # First level, just under node (get population).
512
        v = self.versions.alias('v')
513
        s = select([func.count(v.c.serial),
514
                    func.sum(v.c.size),
515
                    func.max(v.c.mtime)])
516
        c1 = select([func.max(self.versions.c.serial)])
517
        if before != inf:
518
            c1 = c1.where(self.versions.c.mtime < before)
519
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
520
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
521
                         v.c.cluster != except_cluster,
522
                         v.c.node.in_(c2)))
523
        rp = self.conn.execute(s)
524
        r = rp.fetchone()
525
        rp.close()
526
        if not r:
527
            return None
528
        count = r[0]
529
        mtime = max(mtime, r[2])
530
        if count == 0:
531
            return (0, 0, mtime)
532
        
533
        # All children (get size and mtime).
534
        # XXX: This is why the full path is stored.
535
        s = select([func.count(v.c.serial),
536
                    func.sum(v.c.size),
537
                    func.max(v.c.mtime)])
538
        c1 = select([func.max(self.versions.c.serial)],
539
            self.versions.c.node == v.c.node)
540
        if before != inf:
541
            c1 = c1.where(self.versions.c.mtime < before)
542
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
543
        s = s.where(and_(v.c.serial == c1,
544
                         v.c.cluster != except_cluster,
545
                         v.c.node.in_(c2)))
546
        rp = self.conn.execute(s)
547
        r = rp.fetchone()
548
        rp.close()
549
        if not r:
550
            return None
551
        size = r[1] - props[SIZE]
552
        mtime = max(mtime, r[2])
553
        return (count, size, mtime)
554
    
555
    def version_create(self, node, hash, size, source, muser, cluster=0):
556
        """Create a new version from the given properties.
557
           Return the (serial, mtime) of the new version.
558
        """
559
        
560
        mtime = time()
561
        s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
562
                                          mtime=mtime, muser=muser, cluster=cluster)
563
        serial = self.conn.execute(s).inserted_primary_key[0]
564
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
565
        return serial, mtime
566
    
567
    def version_lookup(self, node, before=inf, cluster=0):
568
        """Lookup the current version of the given node.
569
           Return a list with its properties:
570
           (serial, node, hash, size, source, mtime, muser, cluster)
571
           or None if the current version is not found in the given cluster.
572
        """
573
        
574
        v = self.versions.alias('v')
575
        s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
576
                    v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
577
        c = select([func.max(self.versions.c.serial)],
578
            self.versions.c.node == node)
579
        if before != inf:
580
            c = c.where(self.versions.c.mtime < before)
581
        s = s.where(and_(v.c.serial == c,
582
                         v.c.cluster == cluster))
583
        r = self.conn.execute(s)
584
        props = r.fetchone()
585
        r.close()
586
        if props:
587
            return props
588
        return None
589
    
590
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
591
        """Return a sequence of values for the properties of
592
           the version specified by serial and the keys, in the order given.
593
           If keys is empty, return all properties in the order
594
           (serial, node, hash, size, source, mtime, muser, cluster).
595
        """
596
        
597
        v = self.versions.alias()
598
        s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
599
                    v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
600
        rp = self.conn.execute(s)
601
        r = rp.fetchone()
602
        rp.close()
603
        if r is None:
604
            return r
605
        
606
        if not keys:
607
            return r
608
        return [r[propnames[k]] for k in keys if k in propnames]
609
    
610
    def version_recluster(self, serial, cluster):
611
        """Move the version into another cluster."""
612
        
613
        props = self.version_get_properties(serial)
614
        if not props:
615
            return
616
        node = props[NODE]
617
        size = props[SIZE]
618
        oldcluster = props[CLUSTER]
619
        if cluster == oldcluster:
620
            return
621
        
622
        mtime = time()
623
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
624
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
625
        
626
        s = self.versions.update()
627
        s = s.where(self.versions.c.serial == serial)
628
        s = s.values(cluster = cluster)
629
        self.conn.execute(s).close()
630
    
631
    def version_remove(self, serial):
632
        """Remove the serial specified."""
633
        
634
        props = self.version_get_properties(serial)
635
        if not props:
636
            return
637
        node = props[NODE]
638
        hash = props[HASH]
639
        size = props[SIZE]
640
        cluster = props[CLUSTER]
641
        
642
        mtime = time()
643
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
644
        
645
        s = self.versions.delete().where(self.versions.c.serial == serial)
646
        self.conn.execute(s).close()
647
        return hash
648
    
649
    def attribute_get(self, serial, keys=()):
650
        """Return a list of (key, value) pairs of the version specified by serial.
651
           If keys is empty, return all attributes.
652
           Othwerise, return only those specified.
653
        """
654
        
655
        if keys:
656
            attrs = self.attributes.alias()
657
            s = select([attrs.c.key, attrs.c.value])
658
            s = s.where(and_(attrs.c.key.in_(keys),
659
                             attrs.c.serial == serial))
660
        else:
661
            attrs = self.attributes.alias()
662
            s = select([attrs.c.key, attrs.c.value])
663
            s = s.where(attrs.c.serial == serial)
664
        r = self.conn.execute(s)
665
        l = r.fetchall()
666
        r.close()
667
        return l
668
    
669
    def attribute_set(self, serial, items):
670
        """Set the attributes of the version specified by serial.
671
           Receive attributes as an iterable of (key, value) pairs.
672
        """
673
        #insert or replace
674
        #TODO better upsert
675
        for k, v in items:
676
            s = self.attributes.update()
677
            s = s.where(and_(self.attributes.c.serial == serial,
678
                             self.attributes.c.key == k))
679
            s = s.values(value = v)
680
            rp = self.conn.execute(s)
681
            rp.close()
682
            if rp.rowcount == 0:
683
                s = self.attributes.insert()
684
                s = s.values(serial=serial, key=k, value=v)
685
                self.conn.execute(s).close()
686
    
687
    def attribute_del(self, serial, keys=()):
688
        """Delete attributes of the version specified by serial.
689
           If keys is empty, delete all attributes.
690
           Otherwise delete those specified.
691
        """
692
        
693
        if keys:
694
            #TODO more efficient way to do this?
695
            for key in keys:
696
                s = self.attributes.delete()
697
                s = s.where(and_(self.attributes.c.serial == serial,
698
                                 self.attributes.c.key == key))
699
                self.conn.execute(s).close()
700
        else:
701
            s = self.attributes.delete()
702
            s = s.where(self.attributes.c.serial == serial)
703
            self.conn.execute(s).close()
704
    
705
    def attribute_copy(self, source, dest):
706
        s = select([dest, self.attributes.c.key, self.attributes.c.value],
707
            self.attributes.c.serial == source)
708
        rp = self.conn.execute(s)
709
        attributes = rp.fetchall()
710
        rp.close()
711
        for dest, k, v in attributes:
712
            #insert or replace
713
            s = self.attributes.update().where(and_(
714
                self.attributes.c.serial == dest,
715
                self.attributes.c.key == k))
716
            rp = self.conn.execute(s, value=v)
717
            rp.close()
718
            if rp.rowcount == 0:
719
                s = self.attributes.insert()
720
                values = {'serial':dest, 'key':k, 'value':v}
721
                self.conn.execute(s, values).close()
722
    
723
    def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
724
        """Return a list with all keys pairs defined
725
           for all latest versions under parent that
726
           do not belong to the cluster.
727
        """
728
        
729
        # TODO: Use another table to store before=inf results.
730
        a = self.attributes.alias('a')
731
        v = self.versions.alias('v')
732
        n = self.nodes.alias('n')
733
        s = select([a.c.key]).distinct()
734
        filtered = select([func.max(self.versions.c.serial)])
735
        if before != inf:
736
            filtered = filtered.where(self.versions.c.mtime < before)
737
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
738
        s = s.where(v.c.cluster != except_cluster)
739
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
740
            self.nodes.c.parent == parent)))
741
        s = s.where(a.c.serial == v.c.serial)
742
        s = s.where(n.c.node == v.c.node)
743
        conj = []
744
        for x in pathq:
745
            conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
746
        if conj:
747
            s = s.where(or_(*conj))
748
        rp = self.conn.execute(s)
749
        rows = rp.fetchall()
750
        rp.close()
751
        return [r[0] for r in rows]
752
    
753
    def latest_version_list(self, parent, prefix='', delimiter=None,
754
                            start='', limit=10000, before=inf,
755
                            except_cluster=0, pathq=[], filterq=None):
756
        """Return a (list of (path, serial) tuples, list of common prefixes)
757
           for the current versions of the paths with the given parent,
758
           matching the following criteria.
759
           
760
           The property tuple for a version is returned if all
761
           of these conditions are true:
762
                
763
                a. parent matches
764
                
765
                b. path > start
766
                
767
                c. path starts with prefix (and paths in pathq)
768
                
769
                d. version is the max up to before
770
                
771
                e. version is not in cluster
772
                
773
                f. the path does not have the delimiter occuring
774
                   after the prefix, or ends with the delimiter
775
                
776
                g. serial matches the attribute filter query.
777
                   
778
                   A filter query is a comma-separated list of
779
                   terms in one of these three forms:
780
                   
781
                   key
782
                       an attribute with this key must exist
783
                   
784
                   !key
785
                       an attribute with this key must not exist
786
                   
787
                   key ?op value
788
                       the attribute with this key satisfies the value
789
                       where ?op is one of ==, != <=, >=, <, >.
790
           
791
           The list of common prefixes includes the prefixes
792
           matching up to the first delimiter after prefix,
793
           and are reported only once, as "virtual directories".
794
           The delimiter is included in the prefixes.
795
           
796
           If arguments are None, then the corresponding matching rule
797
           will always match.
798
           
799
           Limit applies to the first list of tuples returned.
800
        """
801
        
802
        if not start or start < prefix:
803
            start = strprevling(prefix)
804
        nextling = strnextling(prefix)
805
        
806
        a = self.attributes.alias('a')
807
        v = self.versions.alias('v')
808
        n = self.nodes.alias('n')
809
        s = select([n.c.path, v.c.serial]).distinct()
810
        filtered = select([func.max(self.versions.c.serial)])
811
        if before != inf:
812
            filtered = filtered.where(self.versions.c.mtime < before)
813
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
814
        s = s.where(v.c.cluster != except_cluster)
815
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
816
            self.nodes.c.parent == parent)))
817
        if filterq:
818
            s = s.where(a.c.serial == v.c.serial)
819
        
820
        s = s.where(n.c.node == v.c.node)
821
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
822
        conj = []
823
        for x in pathq:
824
            conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
825
        
826
        if conj:
827
            s = s.where(or_(*conj))
828
        
829
        if filterq:
830
            s = s.where(a.c.key.in_(filterq.split(',')))
831
        
832
        s = s.order_by(n.c.path)
833
        
834
        if not delimiter:
835
            s = s.limit(limit)
836
            rp = self.conn.execute(s, start=start)
837
            r = rp.fetchall()
838
            rp.close()
839
            return r, ()
840
        
841
        pfz = len(prefix)
842
        dz = len(delimiter)
843
        count = 0
844
        prefixes = []
845
        pappend = prefixes.append
846
        matches = []
847
        mappend = matches.append
848
        
849
        rp = self.conn.execute(s, start=start)
850
        while True:
851
            props = rp.fetchone()
852
            if props is None:
853
                break
854
            path, serial = props
855
            idx = path.find(delimiter, pfz)
856
            
857
            if idx < 0:
858
                mappend(props)
859
                count += 1
860
                if count >= limit:
861
                    break
862
                continue
863
            
864
            if idx + dz == len(path):
865
                mappend(props)
866
                count += 1
867
                continue # Get one more, in case there is a path.
868
            pf = path[:idx + dz]
869
            pappend(pf)
870
            if count >= limit: 
871
                break
872
            
873
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
874
        rp.close()
875
        
876
        return matches, prefixes