Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / sqlalchemy / node.py @ 3d13f97a

History | View | Annotate | Download (33.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
from pithos.lib.filter import parse_filters
45

    
46
ROOTNODE  = 0
47

    
48
( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, CLUSTER ) = range(8)
49

    
50
inf = float('inf')
51

    
52

    
53
def strnextling(prefix):
54
    """Return the first unicode string
55
       greater than but not starting with given prefix.
56
       strnextling('hello') -> 'hellp'
57
    """
58
    if not prefix:
59
        ## all strings start with the null string,
60
        ## therefore we have to approximate strnextling('')
61
        ## with the last unicode character supported by python
62
        ## 0x10ffff for wide (32-bit unicode) python builds
63
        ## 0x00ffff for narrow (16-bit unicode) python builds
64
        ## We will not autodetect. 0xffff is safe enough.
65
        return unichr(0xffff)
66
    s = prefix[:-1]
67
    c = ord(prefix[-1])
68
    if c >= 0xffff:
69
        raise RuntimeError
70
    s += unichr(c+1)
71
    return s
72

    
73
def strprevling(prefix):
74
    """Return an approximation of the last unicode string
75
       less than but not starting with given prefix.
76
       strprevling(u'hello') -> u'helln\\xffff'
77
    """
78
    if not prefix:
79
        ## There is no prevling for the null string
80
        return prefix
81
    s = prefix[:-1]
82
    c = ord(prefix[-1])
83
    if c > 0:
84
        s += unichr(c-1) + unichr(0xffff)
85
    return s
86

    
87
_propnames = {
88
    'serial'    : 0,
89
    'node'      : 1,
90
    'hash'      : 2,
91
    'size'      : 3,
92
    'source'    : 4,
93
    'mtime'     : 5,
94
    'muser'     : 6,
95
    'cluster'   : 7,
96
}
97

    
98

    
99
class Node(DBWorker):
100
    """Nodes store path organization and have multiple versions.
101
       Versions store object history and have multiple attributes.
102
       Attributes store metadata.
103
    """
104
    
105
    # TODO: Provide an interface for included and excluded clusters.
106
    
107
    def __init__(self, **params):
108
        DBWorker.__init__(self, **params)
109
        metadata = MetaData()
110
        
111
        #create nodes table
112
        columns=[]
113
        columns.append(Column('node', Integer, primary_key=True))
114
        columns.append(Column('parent', Integer,
115
                              ForeignKey('nodes.node',
116
                                         ondelete='CASCADE',
117
                                         onupdate='CASCADE'),
118
                              autoincrement=False))
119
        path_length = 2048
120
        columns.append(Column('path', String(path_length), default='', nullable=False))
121
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
122
        
123
        #create policy table
124
        columns=[]
125
        columns.append(Column('node', Integer,
126
                              ForeignKey('nodes.node',
127
                                         ondelete='CASCADE',
128
                                         onupdate='CASCADE'),
129
                              primary_key=True))
130
        columns.append(Column('key', String(255), primary_key=True))
131
        columns.append(Column('value', String(255)))
132
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
133
        
134
        #create statistics table
135
        columns=[]
136
        columns.append(Column('node', Integer,
137
                              ForeignKey('nodes.node',
138
                                         ondelete='CASCADE',
139
                                         onupdate='CASCADE'),
140
                              primary_key=True))
141
        columns.append(Column('population', Integer, nullable=False, default=0))
142
        columns.append(Column('size', BigInteger, nullable=False, default=0))
143
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
144
        columns.append(Column('cluster', Integer, nullable=False, default=0,
145
                              primary_key=True, autoincrement=False))
146
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
147
        
148
        #create versions table
149
        columns=[]
150
        columns.append(Column('serial', Integer, primary_key=True))
151
        columns.append(Column('node', Integer,
152
                              ForeignKey('nodes.node',
153
                                         ondelete='CASCADE',
154
                                         onupdate='CASCADE')))
155
        columns.append(Column('hash', String(255)))
156
        columns.append(Column('size', BigInteger, nullable=False, default=0))
157
        columns.append(Column('source', Integer))
158
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
159
        columns.append(Column('muser', String(255), nullable=False, default=''))
160
        columns.append(Column('cluster', Integer, nullable=False, default=0))
161
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
162
        Index('idx_versions_node_mtime', self.versions.c.node,
163
              self.versions.c.mtime)
164
        
165
        #create attributes table
166
        columns = []
167
        columns.append(Column('serial', Integer,
168
                              ForeignKey('versions.serial',
169
                                         ondelete='CASCADE',
170
                                         onupdate='CASCADE'),
171
                              primary_key=True))
172
        columns.append(Column('key', String(255), primary_key=True))
173
        columns.append(Column('value', String(255)))
174
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
175
        
176
        metadata.create_all(self.engine)
177
        
178
        # the following code creates an index of specific length
179
        # this can be accompliced in sqlalchemy >= 0.7.3
180
        # providing mysql_length option during index creation
181
        insp = Inspector.from_engine(self.engine)
182
        indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
183
        if 'idx_nodes_path' not in indexes:
184
            explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
185
            s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
186
            self.conn.execute(s).close()
187
        
188
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
189
                                           self.nodes.c.parent == ROOTNODE))
190
        rp = self.conn.execute(s)
191
        r = rp.fetchone()
192
        rp.close()
193
        if not r:
194
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
195
            self.conn.execute(s)
196
    
197
    def node_create(self, parent, path):
198
        """Create a new node from the given properties.
199
           Return the node identifier of the new node.
200
        """
201
        #TODO catch IntegrityError?
202
        s = self.nodes.insert().values(parent=parent, path=path)
203
        r = self.conn.execute(s)
204
        inserted_primary_key = r.inserted_primary_key[0]
205
        r.close()
206
        return inserted_primary_key
207
    
208
    def node_lookup(self, path):
209
        """Lookup the current node of the given path.
210
           Return None if the path is not found.
211
        """
212
        
213
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
214
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
215
        r = self.conn.execute(s)
216
        row = r.fetchone()
217
        r.close()
218
        if row:
219
            return row[0]
220
        return None
221
    
222
    def node_get_properties(self, node):
223
        """Return the node's (parent, path).
224
           Return None if the node is not found.
225
        """
226
        
227
        s = select([self.nodes.c.parent, self.nodes.c.path])
228
        s = s.where(self.nodes.c.node == node)
229
        r = self.conn.execute(s)
230
        l = r.fetchone()
231
        r.close()
232
        return l
233
    
234
    def node_get_versions(self, node, keys=(), propnames=_propnames):
235
        """Return the properties of all versions at node.
236
           If keys is empty, return all properties in the order
237
           (serial, node, size, source, mtime, muser, cluster).
238
        """
239
        
240
        s = select([self.versions.c.serial,
241
                    self.versions.c.node,
242
                    self.versions.c.hash,
243
                    self.versions.c.size,
244
                    self.versions.c.source,
245
                    self.versions.c.mtime,
246
                    self.versions.c.muser,
247
                    self.versions.c.cluster], self.versions.c.node == node)
248
        s = s.order_by(self.versions.c.serial)
249
        r = self.conn.execute(s)
250
        rows = r.fetchall()
251
        r.close()
252
        if not rows:
253
            return rows
254
        
255
        if not keys:
256
            return rows
257
        
258
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
259
    
260
    def node_count_children(self, node):
261
        """Return node's child count."""
262
        
263
        s = select([func.count(self.nodes.c.node)])
264
        s = s.where(and_(self.nodes.c.parent == node,
265
                         self.nodes.c.node != ROOTNODE))
266
        r = self.conn.execute(s)
267
        row = r.fetchone()
268
        r.close()
269
        return row[0]
270
    
271
    def node_purge_children(self, parent, before=inf, cluster=0):
272
        """Delete all versions with the specified
273
           parent and cluster, and return
274
           the hashes of versions deleted.
275
           Clears out nodes with no remaining versions.
276
        """
277
        #update statistics
278
        c1 = select([self.nodes.c.node],
279
            self.nodes.c.parent == parent)
280
        where_clause = and_(self.versions.c.node.in_(c1),
281
                            self.versions.c.cluster == cluster)
282
        s = select([func.count(self.versions.c.serial),
283
                    func.sum(self.versions.c.size)])
284
        s = s.where(where_clause)
285
        if before != inf:
286
            s = s.where(self.versions.c.mtime <= before)
287
        r = self.conn.execute(s)
288
        row = r.fetchone()
289
        r.close()
290
        if not row:
291
            return ()
292
        nr, size = row[0], -row[1] if row[1] else 0
293
        mtime = time()
294
        self.statistics_update(parent, -nr, size, mtime, cluster)
295
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
296
        
297
        s = select([self.versions.c.hash])
298
        s = s.where(where_clause)
299
        r = self.conn.execute(s)
300
        hashes = [row[0] for row in r.fetchall()]
301
        r.close()
302
        
303
        #delete versions
304
        s = self.versions.delete().where(where_clause)
305
        r = self.conn.execute(s)
306
        r.close()
307
        
308
        #delete nodes
309
        s = select([self.nodes.c.node],
310
            and_(self.nodes.c.parent == parent,
311
                 select([func.count(self.versions.c.serial)],
312
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
313
        rp = self.conn.execute(s)
314
        nodes = [r[0] for r in rp.fetchall()]
315
        rp.close()
316
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
317
        self.conn.execute(s).close()
318
        
319
        return hashes
320
    
321
    def node_purge(self, node, before=inf, cluster=0):
322
        """Delete all versions with the specified
323
           node and cluster, and return
324
           the hashes of versions deleted.
325
           Clears out the node if it has no remaining versions.
326
        """
327
        
328
        #update statistics
329
        s = select([func.count(self.versions.c.serial),
330
                    func.sum(self.versions.c.size)])
331
        where_clause = and_(self.versions.c.node == node,
332
                         self.versions.c.cluster == cluster)
333
        s = s.where(where_clause)
334
        if before != inf:
335
            s = s.where(self.versions.c.mtime <= before)
336
        r = self.conn.execute(s)
337
        row = r.fetchone()
338
        nr, size = row[0], row[1]
339
        r.close()
340
        if not nr:
341
            return ()
342
        mtime = time()
343
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
344
        
345
        s = select([self.versions.c.hash])
346
        s = s.where(where_clause)
347
        r = self.conn.execute(s)
348
        hashes = [r[0] for r in r.fetchall()]
349
        r.close()
350
        
351
        #delete versions
352
        s = self.versions.delete().where(where_clause)
353
        r = self.conn.execute(s)
354
        r.close()
355
        
356
        #delete nodes
357
        s = select([self.nodes.c.node],
358
            and_(self.nodes.c.node == node,
359
                 select([func.count(self.versions.c.serial)],
360
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
361
        r = self.conn.execute(s)
362
        nodes = r.fetchall()
363
        r.close()
364
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
365
        self.conn.execute(s).close()
366
        
367
        return hashes
368
    
369
    def node_remove(self, node):
370
        """Remove the node specified.
371
           Return false if the node has children or is not found.
372
        """
373
        
374
        if self.node_count_children(node):
375
            return False
376
        
377
        mtime = time()
378
        s = select([func.count(self.versions.c.serial),
379
                    func.sum(self.versions.c.size),
380
                    self.versions.c.cluster])
381
        s = s.where(self.versions.c.node == node)
382
        s = s.group_by(self.versions.c.cluster)
383
        r = self.conn.execute(s)
384
        for population, size, cluster in r.fetchall():
385
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
386
        r.close()
387
        
388
        s = self.nodes.delete().where(self.nodes.c.node == node)
389
        self.conn.execute(s).close()
390
        return True
391
    
392
    def policy_get(self, node):
393
        s = select([self.policies.c.key, self.policies.c.value],
394
            self.policies.c.node==node)
395
        r = self.conn.execute(s)
396
        d = dict(r.fetchall())
397
        r.close()
398
        return d
399
    
400
    def policy_set(self, node, policy):
401
        #insert or replace
402
        for k, v in policy.iteritems():
403
            s = self.policies.update().where(and_(self.policies.c.node == node,
404
                                                  self.policies.c.key == k))
405
            s = s.values(value = v)
406
            rp = self.conn.execute(s)
407
            rp.close()
408
            if rp.rowcount == 0:
409
                s = self.policies.insert()
410
                values = {'node':node, 'key':k, 'value':v}
411
                r = self.conn.execute(s, values)
412
                r.close()
413
    
414
    def statistics_get(self, node, cluster=0):
415
        """Return population, total size and last mtime
416
           for all versions under node that belong to the cluster.
417
        """
418
        
419
        s = select([self.statistics.c.population,
420
                    self.statistics.c.size,
421
                    self.statistics.c.mtime])
422
        s = s.where(and_(self.statistics.c.node == node,
423
                         self.statistics.c.cluster == cluster))
424
        r = self.conn.execute(s)
425
        row = r.fetchone()
426
        r.close()
427
        return row
428
    
429
    def statistics_update(self, node, population, size, mtime, cluster=0):
430
        """Update the statistics of the given node.
431
           Statistics keep track the population, total
432
           size of objects and mtime in the node's namespace.
433
           May be zero or positive or negative numbers.
434
        """
435
        s = select([self.statistics.c.population, self.statistics.c.size],
436
            and_(self.statistics.c.node == node,
437
                 self.statistics.c.cluster == cluster))
438
        rp = self.conn.execute(s)
439
        r = rp.fetchone()
440
        rp.close()
441
        if not r:
442
            prepopulation, presize = (0, 0)
443
        else:
444
            prepopulation, presize = r
445
        population += prepopulation
446
        size += presize
447
        
448
        #insert or replace
449
        #TODO better upsert
450
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
451
                                           self.statistics.c.cluster==cluster))
452
        u = u.values(population=population, size=size, mtime=mtime)
453
        rp = self.conn.execute(u)
454
        rp.close()
455
        if rp.rowcount == 0:
456
            ins = self.statistics.insert()
457
            ins = ins.values(node=node, population=population, size=size,
458
                             mtime=mtime, cluster=cluster)
459
            self.conn.execute(ins).close()
460
    
461
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
462
        """Update the statistics of the given node's parent.
463
           Then recursively update all parents up to the root.
464
           Population is not recursive.
465
        """
466
        
467
        while True:
468
            if node == ROOTNODE:
469
                break
470
            props = self.node_get_properties(node)
471
            if props is None:
472
                break
473
            parent, path = props
474
            self.statistics_update(parent, population, size, mtime, cluster)
475
            node = parent
476
            population = 0 # Population isn't recursive
477
    
478
    def statistics_latest(self, node, before=inf, except_cluster=0):
479
        """Return population, total size and last mtime
480
           for all latest versions under node that
481
           do not belong to the cluster.
482
        """
483
        
484
        # The node.
485
        props = self.node_get_properties(node)
486
        if props is None:
487
            return None
488
        parent, path = props
489
        
490
        # The latest version.
491
        s = select([self.versions.c.serial,
492
                    self.versions.c.node,
493
                    self.versions.c.hash,
494
                    self.versions.c.size,
495
                    self.versions.c.source,
496
                    self.versions.c.mtime,
497
                    self.versions.c.muser,
498
                    self.versions.c.cluster])
499
        filtered = select([func.max(self.versions.c.serial)],
500
                            self.versions.c.node == node)
501
        if before != inf:
502
            filtered = filtered.where(self.versions.c.mtime < before)
503
        s = s.where(and_(self.versions.c.cluster != except_cluster,
504
                         self.versions.c.serial == filtered))
505
        r = self.conn.execute(s)
506
        props = r.fetchone()
507
        r.close()
508
        if not props:
509
            return None
510
        mtime = props[MTIME]
511
        
512
        # First level, just under node (get population).
513
        v = self.versions.alias('v')
514
        s = select([func.count(v.c.serial),
515
                    func.sum(v.c.size),
516
                    func.max(v.c.mtime)])
517
        c1 = select([func.max(self.versions.c.serial)])
518
        if before != inf:
519
            c1 = c1.where(self.versions.c.mtime < before)
520
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
521
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
522
                         v.c.cluster != except_cluster,
523
                         v.c.node.in_(c2)))
524
        rp = self.conn.execute(s)
525
        r = rp.fetchone()
526
        rp.close()
527
        if not r:
528
            return None
529
        count = r[0]
530
        mtime = max(mtime, r[2])
531
        if count == 0:
532
            return (0, 0, mtime)
533
        
534
        # All children (get size and mtime).
535
        # XXX: This is why the full path is stored.
536
        s = select([func.count(v.c.serial),
537
                    func.sum(v.c.size),
538
                    func.max(v.c.mtime)])
539
        c1 = select([func.max(self.versions.c.serial)],
540
            self.versions.c.node == v.c.node)
541
        if before != inf:
542
            c1 = c1.where(self.versions.c.mtime < before)
543
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
544
        s = s.where(and_(v.c.serial == c1,
545
                         v.c.cluster != except_cluster,
546
                         v.c.node.in_(c2)))
547
        rp = self.conn.execute(s)
548
        r = rp.fetchone()
549
        rp.close()
550
        if not r:
551
            return None
552
        size = r[1] - props[SIZE]
553
        mtime = max(mtime, r[2])
554
        return (count, size, mtime)
555
    
556
    def version_create(self, node, hash, size, source, muser, cluster=0):
557
        """Create a new version from the given properties.
558
           Return the (serial, mtime) of the new version.
559
        """
560
        
561
        mtime = time()
562
        s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
563
                                          mtime=mtime, muser=muser, cluster=cluster)
564
        serial = self.conn.execute(s).inserted_primary_key[0]
565
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
566
        return serial, mtime
567
    
568
    def version_lookup(self, node, before=inf, cluster=0):
569
        """Lookup the current version of the given node.
570
           Return a list with its properties:
571
           (serial, node, hash, size, source, mtime, muser, cluster)
572
           or None if the current version is not found in the given cluster.
573
        """
574
        
575
        v = self.versions.alias('v')
576
        s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
577
                    v.c.source, v.c.mtime, v.c.muser, v.c.cluster])
578
        c = select([func.max(self.versions.c.serial)],
579
            self.versions.c.node == node)
580
        if before != inf:
581
            c = c.where(self.versions.c.mtime < before)
582
        s = s.where(and_(v.c.serial == c,
583
                         v.c.cluster == cluster))
584
        r = self.conn.execute(s)
585
        props = r.fetchone()
586
        r.close()
587
        if props:
588
            return props
589
        return None
590
    
591
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
592
        """Return a sequence of values for the properties of
593
           the version specified by serial and the keys, in the order given.
594
           If keys is empty, return all properties in the order
595
           (serial, node, hash, size, source, mtime, muser, cluster).
596
        """
597
        
598
        v = self.versions.alias()
599
        s = select([v.c.serial, v.c.node, v.c.hash, v.c.size,
600
                    v.c.source, v.c.mtime, v.c.muser, v.c.cluster], v.c.serial == serial)
601
        rp = self.conn.execute(s)
602
        r = rp.fetchone()
603
        rp.close()
604
        if r is None:
605
            return r
606
        
607
        if not keys:
608
            return r
609
        return [r[propnames[k]] for k in keys if k in propnames]
610
    
611
    def version_recluster(self, serial, cluster):
612
        """Move the version into another cluster."""
613
        
614
        props = self.version_get_properties(serial)
615
        if not props:
616
            return
617
        node = props[NODE]
618
        size = props[SIZE]
619
        oldcluster = props[CLUSTER]
620
        if cluster == oldcluster:
621
            return
622
        
623
        mtime = time()
624
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
625
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
626
        
627
        s = self.versions.update()
628
        s = s.where(self.versions.c.serial == serial)
629
        s = s.values(cluster = cluster)
630
        self.conn.execute(s).close()
631
    
632
    def version_remove(self, serial):
633
        """Remove the serial specified."""
634
        
635
        props = self.version_get_properties(serial)
636
        if not props:
637
            return
638
        node = props[NODE]
639
        hash = props[HASH]
640
        size = props[SIZE]
641
        cluster = props[CLUSTER]
642
        
643
        mtime = time()
644
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
645
        
646
        s = self.versions.delete().where(self.versions.c.serial == serial)
647
        self.conn.execute(s).close()
648
        return hash
649
    
650
    def attribute_get(self, serial, keys=()):
651
        """Return a list of (key, value) pairs of the version specified by serial.
652
           If keys is empty, return all attributes.
653
           Othwerise, return only those specified.
654
        """
655
        
656
        if keys:
657
            attrs = self.attributes.alias()
658
            s = select([attrs.c.key, attrs.c.value])
659
            s = s.where(and_(attrs.c.key.in_(keys),
660
                             attrs.c.serial == serial))
661
        else:
662
            attrs = self.attributes.alias()
663
            s = select([attrs.c.key, attrs.c.value])
664
            s = s.where(attrs.c.serial == serial)
665
        r = self.conn.execute(s)
666
        l = r.fetchall()
667
        r.close()
668
        return l
669
    
670
    def attribute_set(self, serial, items):
671
        """Set the attributes of the version specified by serial.
672
           Receive attributes as an iterable of (key, value) pairs.
673
        """
674
        #insert or replace
675
        #TODO better upsert
676
        for k, v in items:
677
            s = self.attributes.update()
678
            s = s.where(and_(self.attributes.c.serial == serial,
679
                             self.attributes.c.key == k))
680
            s = s.values(value = v)
681
            rp = self.conn.execute(s)
682
            rp.close()
683
            if rp.rowcount == 0:
684
                s = self.attributes.insert()
685
                s = s.values(serial=serial, key=k, value=v)
686
                self.conn.execute(s).close()
687
    
688
    def attribute_del(self, serial, keys=()):
689
        """Delete attributes of the version specified by serial.
690
           If keys is empty, delete all attributes.
691
           Otherwise delete those specified.
692
        """
693
        
694
        if keys:
695
            #TODO more efficient way to do this?
696
            for key in keys:
697
                s = self.attributes.delete()
698
                s = s.where(and_(self.attributes.c.serial == serial,
699
                                 self.attributes.c.key == key))
700
                self.conn.execute(s).close()
701
        else:
702
            s = self.attributes.delete()
703
            s = s.where(self.attributes.c.serial == serial)
704
            self.conn.execute(s).close()
705
    
706
    def attribute_copy(self, source, dest):
707
        s = select([dest, self.attributes.c.key, self.attributes.c.value],
708
            self.attributes.c.serial == source)
709
        rp = self.conn.execute(s)
710
        attributes = rp.fetchall()
711
        rp.close()
712
        for dest, k, v in attributes:
713
            #insert or replace
714
            s = self.attributes.update().where(and_(
715
                self.attributes.c.serial == dest,
716
                self.attributes.c.key == k))
717
            rp = self.conn.execute(s, value=v)
718
            rp.close()
719
            if rp.rowcount == 0:
720
                s = self.attributes.insert()
721
                values = {'serial':dest, 'key':k, 'value':v}
722
                self.conn.execute(s, values).close()
723
    
724
    def latest_attribute_keys(self, parent, before=inf, except_cluster=0, pathq=[]):
725
        """Return a list with all keys pairs defined
726
           for all latest versions under parent that
727
           do not belong to the cluster.
728
        """
729
        
730
        # TODO: Use another table to store before=inf results.
731
        a = self.attributes.alias('a')
732
        v = self.versions.alias('v')
733
        n = self.nodes.alias('n')
734
        s = select([a.c.key]).distinct()
735
        filtered = select([func.max(self.versions.c.serial)])
736
        if before != inf:
737
            filtered = filtered.where(self.versions.c.mtime < before)
738
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
739
        s = s.where(v.c.cluster != except_cluster)
740
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
741
            self.nodes.c.parent == parent)))
742
        s = s.where(a.c.serial == v.c.serial)
743
        s = s.where(n.c.node == v.c.node)
744
        conj = []
745
        for x in pathq:
746
            conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
747
        if conj:
748
            s = s.where(or_(*conj))
749
        rp = self.conn.execute(s)
750
        rows = rp.fetchall()
751
        rp.close()
752
        return [r[0] for r in rows]
753
    
754
    def latest_version_list(self, parent, prefix='', delimiter=None,
755
                            start='', limit=10000, before=inf,
756
                            except_cluster=0, pathq=[], filterq=[]):
757
        """Return a (list of (path, serial) tuples, list of common prefixes)
758
           for the current versions of the paths with the given parent,
759
           matching the following criteria.
760
           
761
           The property tuple for a version is returned if all
762
           of these conditions are true:
763
                
764
                a. parent matches
765
                
766
                b. path > start
767
                
768
                c. path starts with prefix (and paths in pathq)
769
                
770
                d. version is the max up to before
771
                
772
                e. version is not in cluster
773
                
774
                f. the path does not have the delimiter occuring
775
                   after the prefix, or ends with the delimiter
776
                
777
                g. serial matches the attribute filter query.
778
                   
779
                   A filter query is a comma-separated list of
780
                   terms in one of these three forms:
781
                   
782
                   key
783
                       an attribute with this key must exist
784
                   
785
                   !key
786
                       an attribute with this key must not exist
787
                   
788
                   key ?op value
789
                       the attribute with this key satisfies the value
790
                       where ?op is one of ==, != <=, >=, <, >.
791
           
792
           The list of common prefixes includes the prefixes
793
           matching up to the first delimiter after prefix,
794
           and are reported only once, as "virtual directories".
795
           The delimiter is included in the prefixes.
796
           
797
           If arguments are None, then the corresponding matching rule
798
           will always match.
799
           
800
           Limit applies to the first list of tuples returned.
801
        """
802
        
803
        if not start or start < prefix:
804
            start = strprevling(prefix)
805
        nextling = strnextling(prefix)
806
        
807
        a = self.attributes.alias('a')
808
        v = self.versions.alias('v')
809
        n = self.nodes.alias('n')
810
        s = select([n.c.path, v.c.serial]).distinct()
811
        filtered = select([func.max(self.versions.c.serial)])
812
        if before != inf:
813
            filtered = filtered.where(self.versions.c.mtime < before)
814
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
815
        s = s.where(v.c.cluster != except_cluster)
816
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
817
            self.nodes.c.parent == parent)))
818
        if filterq:
819
            s = s.where(a.c.serial == v.c.serial)
820
        
821
        s = s.where(n.c.node == v.c.node)
822
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
823
        conj = []
824
        for x in pathq:
825
            conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
826
        
827
        if conj:
828
            s = s.where(or_(*conj))
829
        
830
        if filterq:
831
            included, excluded, opers = parse_filters(filterq)
832
            if included:
833
                s = s.where(a.c.key.in_(x for x in included))
834
            if excluded:
835
                s = s.where(not_(a.c.key.in_(x for x in excluded)))
836
            if opers:
837
                for k, o, v in opers:
838
                    s = s.where(or_(a.c.key == k and a.c.value.op(o)(v)))
839
        
840
        s = s.order_by(n.c.path)
841
        
842
        if not delimiter:
843
            s = s.limit(limit)
844
            rp = self.conn.execute(s, start=start)
845
            r = rp.fetchall()
846
            rp.close()
847
            return r, ()
848
        
849
        pfz = len(prefix)
850
        dz = len(delimiter)
851
        count = 0
852
        prefixes = []
853
        pappend = prefixes.append
854
        matches = []
855
        mappend = matches.append
856
        
857
        rp = self.conn.execute(s, start=start)
858
        while True:
859
            props = rp.fetchone()
860
            if props is None:
861
                break
862
            path, serial = props
863
            idx = path.find(delimiter, pfz)
864
            
865
            if idx < 0:
866
                mappend(props)
867
                count += 1
868
                if count >= limit:
869
                    break
870
                continue
871
            
872
            if idx + dz == len(path):
873
                mappend(props)
874
                count += 1
875
                continue # Get one more, in case there is a path.
876
            pf = path[:idx + dz]
877
            pappend(pf)
878
            if count >= limit: 
879
                break
880
            
881
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
882
        rp.close()
883
        
884
        return matches, prefixes