Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / sqlalchemy / node.py @ 2e662088

History | View | Annotate | Download (35.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
from pithos.lib.filter import parse_filters
45

    
46

    
47
ROOTNODE  = 0
48

    
49
( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
50

    
51
inf = float('inf')
52

    
53

    
54
def strnextling(prefix):
55
    """Return the first unicode string
56
       greater than but not starting with given prefix.
57
       strnextling('hello') -> 'hellp'
58
    """
59
    if not prefix:
60
        ## all strings start with the null string,
61
        ## therefore we have to approximate strnextling('')
62
        ## with the last unicode character supported by python
63
        ## 0x10ffff for wide (32-bit unicode) python builds
64
        ## 0x00ffff for narrow (16-bit unicode) python builds
65
        ## We will not autodetect. 0xffff is safe enough.
66
        return unichr(0xffff)
67
    s = prefix[:-1]
68
    c = ord(prefix[-1])
69
    if c >= 0xffff:
70
        raise RuntimeError
71
    s += unichr(c+1)
72
    return s
73

    
74
def strprevling(prefix):
75
    """Return an approximation of the last unicode string
76
       less than but not starting with given prefix.
77
       strprevling(u'hello') -> u'helln\\xffff'
78
    """
79
    if not prefix:
80
        ## There is no prevling for the null string
81
        return prefix
82
    s = prefix[:-1]
83
    c = ord(prefix[-1])
84
    if c > 0:
85
        s += unichr(c-1) + unichr(0xffff)
86
    return s
87

    
88
_propnames = {
89
    'serial'    : 0,
90
    'node'      : 1,
91
    'hash'      : 2,
92
    'size'      : 3,
93
    'source'    : 4,
94
    'mtime'     : 5,
95
    'muser'     : 6,
96
    'uuid'      : 7,
97
    'cluster'   : 8
98
}
99

    
100

    
101
class Node(DBWorker):
102
    """Nodes store path organization and have multiple versions.
103
       Versions store object history and have multiple attributes.
104
       Attributes store metadata.
105
    """
106
    
107
    # TODO: Provide an interface for included and excluded clusters.
108
    
109
    def __init__(self, **params):
110
        DBWorker.__init__(self, **params)
111
        metadata = MetaData()
112
        
113
        #create nodes table
114
        columns=[]
115
        columns.append(Column('node', Integer, primary_key=True))
116
        columns.append(Column('parent', Integer,
117
                              ForeignKey('nodes.node',
118
                                         ondelete='CASCADE',
119
                                         onupdate='CASCADE'),
120
                              autoincrement=False))
121
        path_length = 2048
122
        columns.append(Column('path', String(path_length), default='', nullable=False))
123
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
124
        
125
        #create policy table
126
        columns=[]
127
        columns.append(Column('node', Integer,
128
                              ForeignKey('nodes.node',
129
                                         ondelete='CASCADE',
130
                                         onupdate='CASCADE'),
131
                              primary_key=True))
132
        columns.append(Column('key', String(255), primary_key=True))
133
        columns.append(Column('value', String(255)))
134
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
135
        
136
        #create statistics table
137
        columns=[]
138
        columns.append(Column('node', Integer,
139
                              ForeignKey('nodes.node',
140
                                         ondelete='CASCADE',
141
                                         onupdate='CASCADE'),
142
                              primary_key=True))
143
        columns.append(Column('population', Integer, nullable=False, default=0))
144
        columns.append(Column('size', BigInteger, nullable=False, default=0))
145
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
146
        columns.append(Column('cluster', Integer, nullable=False, default=0,
147
                              primary_key=True, autoincrement=False))
148
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
149
        
150
        #create versions table
151
        columns=[]
152
        columns.append(Column('serial', Integer, primary_key=True))
153
        columns.append(Column('node', Integer,
154
                              ForeignKey('nodes.node',
155
                                         ondelete='CASCADE',
156
                                         onupdate='CASCADE')))
157
        columns.append(Column('hash', String(255)))
158
        columns.append(Column('size', BigInteger, nullable=False, default=0))
159
        columns.append(Column('source', Integer))
160
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
161
        columns.append(Column('muser', String(255), nullable=False, default=''))
162
        columns.append(Column('uuid', String(64), nullable=False, default=''))
163
        columns.append(Column('cluster', Integer, nullable=False, default=0))
164
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
165
        Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
166
        Index('idx_versions_node_uuid', self.versions.c.uuid)
167
        
168
        #create attributes table
169
        columns = []
170
        columns.append(Column('serial', Integer,
171
                              ForeignKey('versions.serial',
172
                                         ondelete='CASCADE',
173
                                         onupdate='CASCADE'),
174
                              primary_key=True))
175
        columns.append(Column('domain', String(255), primary_key=True))
176
        columns.append(Column('key', String(255), primary_key=True))
177
        columns.append(Column('value', String(255)))
178
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
179
        
180
        metadata.create_all(self.engine)
181
        
182
        # the following code creates an index of specific length
183
        # this can be accompliced in sqlalchemy >= 0.7.3
184
        # providing mysql_length option during index creation
185
        insp = Inspector.from_engine(self.engine)
186
        indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
187
        if 'idx_nodes_path' not in indexes:
188
            explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
189
            s = text('CREATE UNIQUE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
190
            self.conn.execute(s).close()
191
        
192
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
193
                                           self.nodes.c.parent == ROOTNODE))
194
        rp = self.conn.execute(s)
195
        r = rp.fetchone()
196
        rp.close()
197
        if not r:
198
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
199
            self.conn.execute(s)
200
    
201
    def node_create(self, parent, path):
202
        """Create a new node from the given properties.
203
           Return the node identifier of the new node.
204
        """
205
        #TODO catch IntegrityError?
206
        s = self.nodes.insert().values(parent=parent, path=path)
207
        r = self.conn.execute(s)
208
        inserted_primary_key = r.inserted_primary_key[0]
209
        r.close()
210
        return inserted_primary_key
211
    
212
    def node_lookup(self, path):
213
        """Lookup the current node of the given path.
214
           Return None if the path is not found.
215
        """
216
        
217
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
218
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
219
        r = self.conn.execute(s)
220
        row = r.fetchone()
221
        r.close()
222
        if row:
223
            return row[0]
224
        return None
225
    
226
    def node_get_properties(self, node):
227
        """Return the node's (parent, path).
228
           Return None if the node is not found.
229
        """
230
        
231
        s = select([self.nodes.c.parent, self.nodes.c.path])
232
        s = s.where(self.nodes.c.node == node)
233
        r = self.conn.execute(s)
234
        l = r.fetchone()
235
        r.close()
236
        return l
237
    
238
    def node_get_versions(self, node, keys=(), propnames=_propnames):
239
        """Return the properties of all versions at node.
240
           If keys is empty, return all properties in the order
241
           (serial, node, hash, size, source, mtime, muser, uuid, cluster).
242
        """
243
        
244
        s = select([self.versions.c.serial,
245
                    self.versions.c.node,
246
                    self.versions.c.hash,
247
                    self.versions.c.size,
248
                    self.versions.c.source,
249
                    self.versions.c.mtime,
250
                    self.versions.c.muser,
251
                    self.versions.c.uuid,
252
                    self.versions.c.cluster], self.versions.c.node == node)
253
        s = s.order_by(self.versions.c.serial)
254
        r = self.conn.execute(s)
255
        rows = r.fetchall()
256
        r.close()
257
        if not rows:
258
            return rows
259
        
260
        if not keys:
261
            return rows
262
        
263
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
264
    
265
    def node_count_children(self, node):
266
        """Return node's child count."""
267
        
268
        s = select([func.count(self.nodes.c.node)])
269
        s = s.where(and_(self.nodes.c.parent == node,
270
                         self.nodes.c.node != ROOTNODE))
271
        r = self.conn.execute(s)
272
        row = r.fetchone()
273
        r.close()
274
        return row[0]
275
    
276
    def node_purge_children(self, parent, before=inf, cluster=0):
277
        """Delete all versions with the specified
278
           parent and cluster, and return
279
           the hashes of versions deleted.
280
           Clears out nodes with no remaining versions.
281
        """
282
        #update statistics
283
        c1 = select([self.nodes.c.node],
284
            self.nodes.c.parent == parent)
285
        where_clause = and_(self.versions.c.node.in_(c1),
286
                            self.versions.c.cluster == cluster)
287
        s = select([func.count(self.versions.c.serial),
288
                    func.sum(self.versions.c.size)])
289
        s = s.where(where_clause)
290
        if before != inf:
291
            s = s.where(self.versions.c.mtime <= before)
292
        r = self.conn.execute(s)
293
        row = r.fetchone()
294
        r.close()
295
        if not row:
296
            return ()
297
        nr, size = row[0], -row[1] if row[1] else 0
298
        mtime = time()
299
        self.statistics_update(parent, -nr, size, mtime, cluster)
300
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
301
        
302
        s = select([self.versions.c.hash])
303
        s = s.where(where_clause)
304
        r = self.conn.execute(s)
305
        hashes = [row[0] for row in r.fetchall()]
306
        r.close()
307
        
308
        #delete versions
309
        s = self.versions.delete().where(where_clause)
310
        r = self.conn.execute(s)
311
        r.close()
312
        
313
        #delete nodes
314
        s = select([self.nodes.c.node],
315
            and_(self.nodes.c.parent == parent,
316
                 select([func.count(self.versions.c.serial)],
317
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
318
        rp = self.conn.execute(s)
319
        nodes = [r[0] for r in rp.fetchall()]
320
        rp.close()
321
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
322
        self.conn.execute(s).close()
323
        
324
        return hashes
325
    
326
    def node_purge(self, node, before=inf, cluster=0):
327
        """Delete all versions with the specified
328
           node and cluster, and return
329
           the hashes of versions deleted.
330
           Clears out the node if it has no remaining versions.
331
        """
332
        
333
        #update statistics
334
        s = select([func.count(self.versions.c.serial),
335
                    func.sum(self.versions.c.size)])
336
        where_clause = and_(self.versions.c.node == node,
337
                         self.versions.c.cluster == cluster)
338
        s = s.where(where_clause)
339
        if before != inf:
340
            s = s.where(self.versions.c.mtime <= before)
341
        r = self.conn.execute(s)
342
        row = r.fetchone()
343
        nr, size = row[0], row[1]
344
        r.close()
345
        if not nr:
346
            return ()
347
        mtime = time()
348
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
349
        
350
        s = select([self.versions.c.hash])
351
        s = s.where(where_clause)
352
        r = self.conn.execute(s)
353
        hashes = [r[0] for r in r.fetchall()]
354
        r.close()
355
        
356
        #delete versions
357
        s = self.versions.delete().where(where_clause)
358
        r = self.conn.execute(s)
359
        r.close()
360
        
361
        #delete nodes
362
        s = select([self.nodes.c.node],
363
            and_(self.nodes.c.node == node,
364
                 select([func.count(self.versions.c.serial)],
365
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
366
        r = self.conn.execute(s)
367
        nodes = r.fetchall()
368
        r.close()
369
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
370
        self.conn.execute(s).close()
371
        
372
        return hashes
373
    
374
    def node_remove(self, node):
375
        """Remove the node specified.
376
           Return false if the node has children or is not found.
377
        """
378
        
379
        if self.node_count_children(node):
380
            return False
381
        
382
        mtime = time()
383
        s = select([func.count(self.versions.c.serial),
384
                    func.sum(self.versions.c.size),
385
                    self.versions.c.cluster])
386
        s = s.where(self.versions.c.node == node)
387
        s = s.group_by(self.versions.c.cluster)
388
        r = self.conn.execute(s)
389
        for population, size, cluster in r.fetchall():
390
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
391
        r.close()
392
        
393
        s = self.nodes.delete().where(self.nodes.c.node == node)
394
        self.conn.execute(s).close()
395
        return True
396
    
397
    def policy_get(self, node):
398
        s = select([self.policies.c.key, self.policies.c.value],
399
            self.policies.c.node==node)
400
        r = self.conn.execute(s)
401
        d = dict(r.fetchall())
402
        r.close()
403
        return d
404
    
405
    def policy_set(self, node, policy):
406
        #insert or replace
407
        for k, v in policy.iteritems():
408
            s = self.policies.update().where(and_(self.policies.c.node == node,
409
                                                  self.policies.c.key == k))
410
            s = s.values(value = v)
411
            rp = self.conn.execute(s)
412
            rp.close()
413
            if rp.rowcount == 0:
414
                s = self.policies.insert()
415
                values = {'node':node, 'key':k, 'value':v}
416
                r = self.conn.execute(s, values)
417
                r.close()
418
    
419
    def statistics_get(self, node, cluster=0):
420
        """Return population, total size and last mtime
421
           for all versions under node that belong to the cluster.
422
        """
423
        
424
        s = select([self.statistics.c.population,
425
                    self.statistics.c.size,
426
                    self.statistics.c.mtime])
427
        s = s.where(and_(self.statistics.c.node == node,
428
                         self.statistics.c.cluster == cluster))
429
        r = self.conn.execute(s)
430
        row = r.fetchone()
431
        r.close()
432
        return row
433
    
434
    def statistics_update(self, node, population, size, mtime, cluster=0):
435
        """Update the statistics of the given node.
436
           Statistics keep track the population, total
437
           size of objects and mtime in the node's namespace.
438
           May be zero or positive or negative numbers.
439
        """
440
        s = select([self.statistics.c.population, self.statistics.c.size],
441
            and_(self.statistics.c.node == node,
442
                 self.statistics.c.cluster == cluster))
443
        rp = self.conn.execute(s)
444
        r = rp.fetchone()
445
        rp.close()
446
        if not r:
447
            prepopulation, presize = (0, 0)
448
        else:
449
            prepopulation, presize = r
450
        population += prepopulation
451
        size += presize
452
        
453
        #insert or replace
454
        #TODO better upsert
455
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
456
                                           self.statistics.c.cluster==cluster))
457
        u = u.values(population=population, size=size, mtime=mtime)
458
        rp = self.conn.execute(u)
459
        rp.close()
460
        if rp.rowcount == 0:
461
            ins = self.statistics.insert()
462
            ins = ins.values(node=node, population=population, size=size,
463
                             mtime=mtime, cluster=cluster)
464
            self.conn.execute(ins).close()
465
    
466
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
467
        """Update the statistics of the given node's parent.
468
           Then recursively update all parents up to the root.
469
           Population is not recursive.
470
        """
471
        
472
        while True:
473
            if node == ROOTNODE:
474
                break
475
            props = self.node_get_properties(node)
476
            if props is None:
477
                break
478
            parent, path = props
479
            self.statistics_update(parent, population, size, mtime, cluster)
480
            node = parent
481
            population = 0 # Population isn't recursive
482
    
483
    def statistics_latest(self, node, before=inf, except_cluster=0):
484
        """Return population, total size and last mtime
485
           for all latest versions under node that
486
           do not belong to the cluster.
487
        """
488
        
489
        # The node.
490
        props = self.node_get_properties(node)
491
        if props is None:
492
            return None
493
        parent, path = props
494
        
495
        # The latest version.
496
        s = select([self.versions.c.serial,
497
                    self.versions.c.node,
498
                    self.versions.c.hash,
499
                    self.versions.c.size,
500
                    self.versions.c.source,
501
                    self.versions.c.mtime,
502
                    self.versions.c.muser,
503
                    self.versions.c.uuid,
504
                    self.versions.c.cluster])
505
        filtered = select([func.max(self.versions.c.serial)],
506
                            self.versions.c.node == node)
507
        if before != inf:
508
            filtered = filtered.where(self.versions.c.mtime < before)
509
        s = s.where(and_(self.versions.c.cluster != except_cluster,
510
                         self.versions.c.serial == filtered))
511
        r = self.conn.execute(s)
512
        props = r.fetchone()
513
        r.close()
514
        if not props:
515
            return None
516
        mtime = props[MTIME]
517
        
518
        # First level, just under node (get population).
519
        v = self.versions.alias('v')
520
        s = select([func.count(v.c.serial),
521
                    func.sum(v.c.size),
522
                    func.max(v.c.mtime)])
523
        c1 = select([func.max(self.versions.c.serial)])
524
        if before != inf:
525
            c1 = c1.where(self.versions.c.mtime < before)
526
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
527
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
528
                         v.c.cluster != except_cluster,
529
                         v.c.node.in_(c2)))
530
        rp = self.conn.execute(s)
531
        r = rp.fetchone()
532
        rp.close()
533
        if not r:
534
            return None
535
        count = r[0]
536
        mtime = max(mtime, r[2])
537
        if count == 0:
538
            return (0, 0, mtime)
539
        
540
        # All children (get size and mtime).
541
        # XXX: This is why the full path is stored.
542
        s = select([func.count(v.c.serial),
543
                    func.sum(v.c.size),
544
                    func.max(v.c.mtime)])
545
        c1 = select([func.max(self.versions.c.serial)],
546
            self.versions.c.node == v.c.node)
547
        if before != inf:
548
            c1 = c1.where(self.versions.c.mtime < before)
549
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
550
        s = s.where(and_(v.c.serial == c1,
551
                         v.c.cluster != except_cluster,
552
                         v.c.node.in_(c2)))
553
        rp = self.conn.execute(s)
554
        r = rp.fetchone()
555
        rp.close()
556
        if not r:
557
            return None
558
        size = r[1] - props[SIZE]
559
        mtime = max(mtime, r[2])
560
        return (count, size, mtime)
561
    
562
    def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
563
        """Create a new version from the given properties.
564
           Return the (serial, mtime) of the new version.
565
        """
566
        
567
        mtime = time()
568
        s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
569
                                          mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
570
        serial = self.conn.execute(s).inserted_primary_key[0]
571
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
572
        return serial, mtime
573
    
574
    def version_lookup(self, node, before=inf, cluster=0):
575
        """Lookup the current version of the given node.
576
           Return a list with its properties:
577
           (serial, node, hash, size, source, mtime, muser, uuid, cluster)
578
           or None if the current version is not found in the given cluster.
579
        """
580
        
581
        v = self.versions.alias('v')
582
        s = select([v.c.serial, v.c.node, v.c.hash,
583
                    v.c.size, v.c.source, v.c.mtime,
584
                    v.c.muser, v.c.uuid, v.c.cluster])
585
        c = select([func.max(self.versions.c.serial)],
586
            self.versions.c.node == node)
587
        if before != inf:
588
            c = c.where(self.versions.c.mtime < before)
589
        s = s.where(and_(v.c.serial == c,
590
                         v.c.cluster == cluster))
591
        r = self.conn.execute(s)
592
        props = r.fetchone()
593
        r.close()
594
        if props:
595
            return props
596
        return None
597
    
598
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
599
        """Return a sequence of values for the properties of
600
           the version specified by serial and the keys, in the order given.
601
           If keys is empty, return all properties in the order
602
           (serial, node, hash, size, source, mtime, muser, uuid, cluster).
603
        """
604
        
605
        v = self.versions.alias()
606
        s = select([v.c.serial, v.c.node, v.c.hash,
607
                    v.c.size, v.c.source, v.c.mtime,
608
                    v.c.muser, v.c.uuid, v.c.cluster], v.c.serial == serial)
609
        rp = self.conn.execute(s)
610
        r = rp.fetchone()
611
        rp.close()
612
        if r is None:
613
            return r
614
        
615
        if not keys:
616
            return r
617
        return [r[propnames[k]] for k in keys if k in propnames]
618
    
619
    def version_recluster(self, serial, cluster):
620
        """Move the version into another cluster."""
621
        
622
        props = self.version_get_properties(serial)
623
        if not props:
624
            return
625
        node = props[NODE]
626
        size = props[SIZE]
627
        oldcluster = props[CLUSTER]
628
        if cluster == oldcluster:
629
            return
630
        
631
        mtime = time()
632
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
633
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
634
        
635
        s = self.versions.update()
636
        s = s.where(self.versions.c.serial == serial)
637
        s = s.values(cluster = cluster)
638
        self.conn.execute(s).close()
639
    
640
    def version_remove(self, serial):
641
        """Remove the serial specified."""
642
        
643
        props = self.version_get_properties(serial)
644
        if not props:
645
            return
646
        node = props[NODE]
647
        hash = props[HASH]
648
        size = props[SIZE]
649
        cluster = props[CLUSTER]
650
        
651
        mtime = time()
652
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
653
        
654
        s = self.versions.delete().where(self.versions.c.serial == serial)
655
        self.conn.execute(s).close()
656
        return hash
657
    
658
    def attribute_get(self, serial, domain, keys=()):
659
        """Return a list of (key, value) pairs of the version specified by serial.
660
           If keys is empty, return all attributes.
661
           Othwerise, return only those specified.
662
        """
663
        
664
        if keys:
665
            attrs = self.attributes.alias()
666
            s = select([attrs.c.key, attrs.c.value])
667
            s = s.where(and_(attrs.c.key.in_(keys),
668
                             attrs.c.serial == serial,
669
                             attrs.c.domain == domain))
670
        else:
671
            attrs = self.attributes.alias()
672
            s = select([attrs.c.key, attrs.c.value])
673
            s = s.where(and_(attrs.c.serial == serial,
674
                             attrs.c.domain == domain))
675
        r = self.conn.execute(s)
676
        l = r.fetchall()
677
        r.close()
678
        return l
679
    
680
    def attribute_set(self, serial, domain, items):
681
        """Set the attributes of the version specified by serial.
682
           Receive attributes as an iterable of (key, value) pairs.
683
        """
684
        #insert or replace
685
        #TODO better upsert
686
        for k, v in items:
687
            s = self.attributes.update()
688
            s = s.where(and_(self.attributes.c.serial == serial,
689
                             self.attributes.c.domain == domain,
690
                             self.attributes.c.key == k))
691
            s = s.values(value = v)
692
            rp = self.conn.execute(s)
693
            rp.close()
694
            if rp.rowcount == 0:
695
                s = self.attributes.insert()
696
                s = s.values(serial=serial, domain=domain, key=k, value=v)
697
                self.conn.execute(s).close()
698
    
699
    def attribute_del(self, serial, domain, keys=()):
700
        """Delete attributes of the version specified by serial.
701
           If keys is empty, delete all attributes.
702
           Otherwise delete those specified.
703
        """
704
        
705
        if keys:
706
            #TODO more efficient way to do this?
707
            for key in keys:
708
                s = self.attributes.delete()
709
                s = s.where(and_(self.attributes.c.serial == serial,
710
                                 self.attributes.c.domain == domain,
711
                                 self.attributes.c.key == key))
712
                self.conn.execute(s).close()
713
        else:
714
            s = self.attributes.delete()
715
            s = s.where(and_(self.attributes.c.serial == serial,
716
                             self.attributes.c.domain == domain))
717
            self.conn.execute(s).close()
718
    
719
    def attribute_copy(self, source, dest):
720
        s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
721
            self.attributes.c.serial == source)
722
        rp = self.conn.execute(s)
723
        attributes = rp.fetchall()
724
        rp.close()
725
        for dest, domain, k, v in attributes:
726
            #insert or replace
727
            s = self.attributes.update().where(and_(
728
                self.attributes.c.serial == dest,
729
                self.attributes.c.domain == domain,
730
                self.attributes.c.key == k))
731
            rp = self.conn.execute(s, value=v)
732
            rp.close()
733
            if rp.rowcount == 0:
734
                s = self.attributes.insert()
735
                values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
736
                self.conn.execute(s, values).close()
737
    
738
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
739
        """Return a list with all keys pairs defined
740
           for all latest versions under parent that
741
           do not belong to the cluster.
742
        """
743
        
744
        # TODO: Use another table to store before=inf results.
745
        a = self.attributes.alias('a')
746
        v = self.versions.alias('v')
747
        n = self.nodes.alias('n')
748
        s = select([a.c.key]).distinct()
749
        filtered = select([func.max(self.versions.c.serial)])
750
        if before != inf:
751
            filtered = filtered.where(self.versions.c.mtime < before)
752
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
753
        s = s.where(v.c.cluster != except_cluster)
754
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
755
            self.nodes.c.parent == parent)))
756
        s = s.where(a.c.serial == v.c.serial)
757
        s = s.where(a.c.domain == domain)
758
        s = s.where(n.c.node == v.c.node)
759
        conj = []
760
        for x in pathq:
761
            conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
762
        if conj:
763
            s = s.where(or_(*conj))
764
        rp = self.conn.execute(s)
765
        rows = rp.fetchall()
766
        rp.close()
767
        return [r[0] for r in rows]
768
    
769
    def latest_version_list(self, parent, prefix='', delimiter=None,
770
                            start='', limit=10000, before=inf,
771
                            except_cluster=0, pathq=[], domain=None, filterq=[], sizeq=None):
772
        """Return a (list of (path, serial) tuples, list of common prefixes)
773
           for the current versions of the paths with the given parent,
774
           matching the following criteria.
775
           
776
           The property tuple for a version is returned if all
777
           of these conditions are true:
778
                
779
                a. parent matches
780
                
781
                b. path > start
782
                
783
                c. path starts with prefix (and paths in pathq)
784
                
785
                d. version is the max up to before
786
                
787
                e. version is not in cluster
788
                
789
                f. the path does not have the delimiter occuring
790
                   after the prefix, or ends with the delimiter
791
                
792
                g. serial matches the attribute filter query.
793
                   
794
                   A filter query is a comma-separated list of
795
                   terms in one of these three forms:
796
                   
797
                   key
798
                       an attribute with this key must exist
799
                   
800
                   !key
801
                       an attribute with this key must not exist
802
                   
803
                   key ?op value
804
                       the attribute with this key satisfies the value
805
                       where ?op is one of ==, != <=, >=, <, >.
806
                
807
                h. the size is in the range set by sizeq
808
           
809
           The list of common prefixes includes the prefixes
810
           matching up to the first delimiter after prefix,
811
           and are reported only once, as "virtual directories".
812
           The delimiter is included in the prefixes.
813
           
814
           If arguments are None, then the corresponding matching rule
815
           will always match.
816
           
817
           Limit applies to the first list of tuples returned.
818
        """
819
        
820
        if not start or start < prefix:
821
            start = strprevling(prefix)
822
        nextling = strnextling(prefix)
823
        
824
        v = self.versions.alias('v')
825
        n = self.nodes.alias('n')
826
        s = select([n.c.path, v.c.serial]).distinct()
827
        filtered = select([func.max(self.versions.c.serial)])
828
        if before != inf:
829
            filtered = filtered.where(self.versions.c.mtime < before)
830
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
831
        s = s.where(v.c.cluster != except_cluster)
832
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
833
            self.nodes.c.parent == parent)))
834
        
835
        s = s.where(n.c.node == v.c.node)
836
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
837
        conj = []
838
        for x in pathq:
839
            conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
840
        if conj:
841
            s = s.where(or_(*conj))
842
        
843
        if sizeq and len(sizeq) == 2:
844
            if sizeq[0]:
845
                s = s.where(v.c.size >= sizeq[0])
846
            if sizeq[1]:
847
                s = s.where(v.c.size < sizeq[1])
848
        
849
        if domain and filterq:
850
            a = self.attributes.alias('a')
851
            included, excluded, opers = parse_filters(filterq)
852
            if included:
853
                subs = select([1])
854
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
855
                subs = subs.where(a.c.domain == domain)
856
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
857
                s = s.where(exists(subs))
858
            if excluded:
859
                subs = select([1])
860
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
861
                subs = subs.where(a.c.domain == domain)
862
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
863
                s = s.where(not_(exists(subs)))
864
            if opers:
865
                for k, o, val in opers:
866
                    subs = select([1])
867
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
868
                    subs = subs.where(a.c.domain == domain)
869
                    subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
870
                    s = s.where(exists(subs))
871
        
872
        s = s.order_by(n.c.path)
873
        
874
        if not delimiter:
875
            s = s.limit(limit)
876
            rp = self.conn.execute(s, start=start)
877
            r = rp.fetchall()
878
            rp.close()
879
            return r, ()
880
        
881
        pfz = len(prefix)
882
        dz = len(delimiter)
883
        count = 0
884
        prefixes = []
885
        pappend = prefixes.append
886
        matches = []
887
        mappend = matches.append
888
        
889
        rp = self.conn.execute(s, start=start)
890
        while True:
891
            props = rp.fetchone()
892
            if props is None:
893
                break
894
            path, serial = props
895
            idx = path.find(delimiter, pfz)
896
            
897
            if idx < 0:
898
                mappend(props)
899
                count += 1
900
                if count >= limit:
901
                    break
902
                continue
903
            
904
            if idx + dz == len(path):
905
                mappend(props)
906
                count += 1
907
                continue # Get one more, in case there is a path.
908
            pf = path[:idx + dz]
909
            pappend(pf)
910
            if count >= limit: 
911
                break
912
            
913
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
914
        rp.close()
915
        
916
        return matches, prefixes
917
    
918
    def latest_uuid(self, uuid):
919
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
920
        
921
        v = self.versions.alias('v')
922
        n = self.nodes.alias('n')
923
        s = select([n.c.path, v.c.serial])
924
        filtered = select([func.max(self.versions.c.serial)])
925
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
926
        s = s.where(n.c.node == v.c.node)
927
        
928
        r = self.conn.execute(s)
929
        l = r.fetchone()
930
        r.close()
931
        return l