Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / sqlalchemy / node.py @ f897bea9

History | View | Annotate | Download (35.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
from pithos.lib.filter import parse_filters, OPERATORS
45

    
46
ROOTNODE  = 0
47

    
48
( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
49

    
50
inf = float('inf')
51

    
52

    
53
def strnextling(prefix):
54
    """Return the first unicode string
55
       greater than but not starting with given prefix.
56
       strnextling('hello') -> 'hellp'
57
    """
58
    if not prefix:
59
        ## all strings start with the null string,
60
        ## therefore we have to approximate strnextling('')
61
        ## with the last unicode character supported by python
62
        ## 0x10ffff for wide (32-bit unicode) python builds
63
        ## 0x00ffff for narrow (16-bit unicode) python builds
64
        ## We will not autodetect. 0xffff is safe enough.
65
        return unichr(0xffff)
66
    s = prefix[:-1]
67
    c = ord(prefix[-1])
68
    if c >= 0xffff:
69
        raise RuntimeError
70
    s += unichr(c+1)
71
    return s
72

    
73
def strprevling(prefix):
74
    """Return an approximation of the last unicode string
75
       less than but not starting with given prefix.
76
       strprevling(u'hello') -> u'helln\\xffff'
77
    """
78
    if not prefix:
79
        ## There is no prevling for the null string
80
        return prefix
81
    s = prefix[:-1]
82
    c = ord(prefix[-1])
83
    if c > 0:
84
        s += unichr(c-1) + unichr(0xffff)
85
    return s
86

    
87
_propnames = {
88
    'serial'    : 0,
89
    'node'      : 1,
90
    'hash'      : 2,
91
    'size'      : 3,
92
    'source'    : 4,
93
    'mtime'     : 5,
94
    'muser'     : 6,
95
    'uuid'      : 7,
96
    'cluster'   : 8
97
}
98

    
99

    
100
class Node(DBWorker):
101
    """Nodes store path organization and have multiple versions.
102
       Versions store object history and have multiple attributes.
103
       Attributes store metadata.
104
    """
105
    
106
    # TODO: Provide an interface for included and excluded clusters.
107
    
108
    def __init__(self, **params):
109
        DBWorker.__init__(self, **params)
110
        metadata = MetaData()
111
        
112
        #create nodes table
113
        columns=[]
114
        columns.append(Column('node', Integer, primary_key=True))
115
        columns.append(Column('parent', Integer,
116
                              ForeignKey('nodes.node',
117
                                         ondelete='CASCADE',
118
                                         onupdate='CASCADE'),
119
                              autoincrement=False))
120
        path_length = 2048
121
        columns.append(Column('path', String(path_length), default='', nullable=False))
122
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123
        
124
        #create policy table
125
        columns=[]
126
        columns.append(Column('node', Integer,
127
                              ForeignKey('nodes.node',
128
                                         ondelete='CASCADE',
129
                                         onupdate='CASCADE'),
130
                              primary_key=True))
131
        columns.append(Column('key', String(255), primary_key=True))
132
        columns.append(Column('value', String(255)))
133
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
134
        
135
        #create statistics table
136
        columns=[]
137
        columns.append(Column('node', Integer,
138
                              ForeignKey('nodes.node',
139
                                         ondelete='CASCADE',
140
                                         onupdate='CASCADE'),
141
                              primary_key=True))
142
        columns.append(Column('population', Integer, nullable=False, default=0))
143
        columns.append(Column('size', BigInteger, nullable=False, default=0))
144
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
145
        columns.append(Column('cluster', Integer, nullable=False, default=0,
146
                              primary_key=True, autoincrement=False))
147
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
148
        
149
        #create versions table
150
        columns=[]
151
        columns.append(Column('serial', Integer, primary_key=True))
152
        columns.append(Column('node', Integer,
153
                              ForeignKey('nodes.node',
154
                                         ondelete='CASCADE',
155
                                         onupdate='CASCADE')))
156
        columns.append(Column('hash', String(255)))
157
        columns.append(Column('size', BigInteger, nullable=False, default=0))
158
        columns.append(Column('source', Integer))
159
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
160
        columns.append(Column('muser', String(255), nullable=False, default=''))
161
        columns.append(Column('uuid', String(64), nullable=False, default=''))
162
        columns.append(Column('cluster', Integer, nullable=False, default=0))
163
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
164
        Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
165
        Index('idx_versions_node_uuid', self.versions.c.uuid)
166
        
167
        #create attributes table
168
        columns = []
169
        columns.append(Column('serial', Integer,
170
                              ForeignKey('versions.serial',
171
                                         ondelete='CASCADE',
172
                                         onupdate='CASCADE'),
173
                              primary_key=True))
174
        columns.append(Column('domain', String(255), primary_key=True))
175
        columns.append(Column('key', String(255), primary_key=True))
176
        columns.append(Column('value', String(255)))
177
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
178
        
179
        metadata.create_all(self.engine)
180
        
181
        # the following code creates an index of specific length
182
        # this can be accompliced in sqlalchemy >= 0.7.3
183
        # providing mysql_length option during index creation
184
        insp = Inspector.from_engine(self.engine)
185
        indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
186
        if 'idx_nodes_path' not in indexes:
187
            explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
188
            s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
189
            self.conn.execute(s).close()
190
        
191
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
192
                                           self.nodes.c.parent == ROOTNODE))
193
        rp = self.conn.execute(s)
194
        r = rp.fetchone()
195
        rp.close()
196
        if not r:
197
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
198
            self.conn.execute(s)
199
    
200
    def node_create(self, parent, path):
201
        """Create a new node from the given properties.
202
           Return the node identifier of the new node.
203
        """
204
        #TODO catch IntegrityError?
205
        s = self.nodes.insert().values(parent=parent, path=path)
206
        r = self.conn.execute(s)
207
        inserted_primary_key = r.inserted_primary_key[0]
208
        r.close()
209
        return inserted_primary_key
210
    
211
    def node_lookup(self, path):
212
        """Lookup the current node of the given path.
213
           Return None if the path is not found.
214
        """
215
        
216
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
217
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
218
        r = self.conn.execute(s)
219
        row = r.fetchone()
220
        r.close()
221
        if row:
222
            return row[0]
223
        return None
224
    
225
    def node_get_properties(self, node):
226
        """Return the node's (parent, path).
227
           Return None if the node is not found.
228
        """
229
        
230
        s = select([self.nodes.c.parent, self.nodes.c.path])
231
        s = s.where(self.nodes.c.node == node)
232
        r = self.conn.execute(s)
233
        l = r.fetchone()
234
        r.close()
235
        return l
236
    
237
    def node_get_versions(self, node, keys=(), propnames=_propnames):
238
        """Return the properties of all versions at node.
239
           If keys is empty, return all properties in the order
240
           (serial, node, hash, size, source, mtime, muser, uuid, cluster).
241
        """
242
        
243
        s = select([self.versions.c.serial,
244
                    self.versions.c.node,
245
                    self.versions.c.hash,
246
                    self.versions.c.size,
247
                    self.versions.c.source,
248
                    self.versions.c.mtime,
249
                    self.versions.c.muser,
250
                    self.versions.c.uuid,
251
                    self.versions.c.cluster], self.versions.c.node == node)
252
        s = s.order_by(self.versions.c.serial)
253
        r = self.conn.execute(s)
254
        rows = r.fetchall()
255
        r.close()
256
        if not rows:
257
            return rows
258
        
259
        if not keys:
260
            return rows
261
        
262
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
263
    
264
    def node_count_children(self, node):
265
        """Return node's child count."""
266
        
267
        s = select([func.count(self.nodes.c.node)])
268
        s = s.where(and_(self.nodes.c.parent == node,
269
                         self.nodes.c.node != ROOTNODE))
270
        r = self.conn.execute(s)
271
        row = r.fetchone()
272
        r.close()
273
        return row[0]
274
    
275
    def node_purge_children(self, parent, before=inf, cluster=0):
276
        """Delete all versions with the specified
277
           parent and cluster, and return
278
           the hashes of versions deleted.
279
           Clears out nodes with no remaining versions.
280
        """
281
        #update statistics
282
        c1 = select([self.nodes.c.node],
283
            self.nodes.c.parent == parent)
284
        where_clause = and_(self.versions.c.node.in_(c1),
285
                            self.versions.c.cluster == cluster)
286
        s = select([func.count(self.versions.c.serial),
287
                    func.sum(self.versions.c.size)])
288
        s = s.where(where_clause)
289
        if before != inf:
290
            s = s.where(self.versions.c.mtime <= before)
291
        r = self.conn.execute(s)
292
        row = r.fetchone()
293
        r.close()
294
        if not row:
295
            return ()
296
        nr, size = row[0], -row[1] if row[1] else 0
297
        mtime = time()
298
        self.statistics_update(parent, -nr, size, mtime, cluster)
299
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
300
        
301
        s = select([self.versions.c.hash])
302
        s = s.where(where_clause)
303
        r = self.conn.execute(s)
304
        hashes = [row[0] for row in r.fetchall()]
305
        r.close()
306
        
307
        #delete versions
308
        s = self.versions.delete().where(where_clause)
309
        r = self.conn.execute(s)
310
        r.close()
311
        
312
        #delete nodes
313
        s = select([self.nodes.c.node],
314
            and_(self.nodes.c.parent == parent,
315
                 select([func.count(self.versions.c.serial)],
316
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
317
        rp = self.conn.execute(s)
318
        nodes = [r[0] for r in rp.fetchall()]
319
        rp.close()
320
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
321
        self.conn.execute(s).close()
322
        
323
        return hashes
324
    
325
    def node_purge(self, node, before=inf, cluster=0):
326
        """Delete all versions with the specified
327
           node and cluster, and return
328
           the hashes of versions deleted.
329
           Clears out the node if it has no remaining versions.
330
        """
331
        
332
        #update statistics
333
        s = select([func.count(self.versions.c.serial),
334
                    func.sum(self.versions.c.size)])
335
        where_clause = and_(self.versions.c.node == node,
336
                         self.versions.c.cluster == cluster)
337
        s = s.where(where_clause)
338
        if before != inf:
339
            s = s.where(self.versions.c.mtime <= before)
340
        r = self.conn.execute(s)
341
        row = r.fetchone()
342
        nr, size = row[0], row[1]
343
        r.close()
344
        if not nr:
345
            return ()
346
        mtime = time()
347
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
348
        
349
        s = select([self.versions.c.hash])
350
        s = s.where(where_clause)
351
        r = self.conn.execute(s)
352
        hashes = [r[0] for r in r.fetchall()]
353
        r.close()
354
        
355
        #delete versions
356
        s = self.versions.delete().where(where_clause)
357
        r = self.conn.execute(s)
358
        r.close()
359
        
360
        #delete nodes
361
        s = select([self.nodes.c.node],
362
            and_(self.nodes.c.node == node,
363
                 select([func.count(self.versions.c.serial)],
364
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
365
        r = self.conn.execute(s)
366
        nodes = r.fetchall()
367
        r.close()
368
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
369
        self.conn.execute(s).close()
370
        
371
        return hashes
372
    
373
    def node_remove(self, node):
374
        """Remove the node specified.
375
           Return false if the node has children or is not found.
376
        """
377
        
378
        if self.node_count_children(node):
379
            return False
380
        
381
        mtime = time()
382
        s = select([func.count(self.versions.c.serial),
383
                    func.sum(self.versions.c.size),
384
                    self.versions.c.cluster])
385
        s = s.where(self.versions.c.node == node)
386
        s = s.group_by(self.versions.c.cluster)
387
        r = self.conn.execute(s)
388
        for population, size, cluster in r.fetchall():
389
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
390
        r.close()
391
        
392
        s = self.nodes.delete().where(self.nodes.c.node == node)
393
        self.conn.execute(s).close()
394
        return True
395
    
396
    def policy_get(self, node):
397
        s = select([self.policies.c.key, self.policies.c.value],
398
            self.policies.c.node==node)
399
        r = self.conn.execute(s)
400
        d = dict(r.fetchall())
401
        r.close()
402
        return d
403
    
404
    def policy_set(self, node, policy):
405
        #insert or replace
406
        for k, v in policy.iteritems():
407
            s = self.policies.update().where(and_(self.policies.c.node == node,
408
                                                  self.policies.c.key == k))
409
            s = s.values(value = v)
410
            rp = self.conn.execute(s)
411
            rp.close()
412
            if rp.rowcount == 0:
413
                s = self.policies.insert()
414
                values = {'node':node, 'key':k, 'value':v}
415
                r = self.conn.execute(s, values)
416
                r.close()
417
    
418
    def statistics_get(self, node, cluster=0):
419
        """Return population, total size and last mtime
420
           for all versions under node that belong to the cluster.
421
        """
422
        
423
        s = select([self.statistics.c.population,
424
                    self.statistics.c.size,
425
                    self.statistics.c.mtime])
426
        s = s.where(and_(self.statistics.c.node == node,
427
                         self.statistics.c.cluster == cluster))
428
        r = self.conn.execute(s)
429
        row = r.fetchone()
430
        r.close()
431
        return row
432
    
433
    def statistics_update(self, node, population, size, mtime, cluster=0):
434
        """Update the statistics of the given node.
435
           Statistics keep track the population, total
436
           size of objects and mtime in the node's namespace.
437
           May be zero or positive or negative numbers.
438
        """
439
        s = select([self.statistics.c.population, self.statistics.c.size],
440
            and_(self.statistics.c.node == node,
441
                 self.statistics.c.cluster == cluster))
442
        rp = self.conn.execute(s)
443
        r = rp.fetchone()
444
        rp.close()
445
        if not r:
446
            prepopulation, presize = (0, 0)
447
        else:
448
            prepopulation, presize = r
449
        population += prepopulation
450
        size += presize
451
        
452
        #insert or replace
453
        #TODO better upsert
454
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
455
                                           self.statistics.c.cluster==cluster))
456
        u = u.values(population=population, size=size, mtime=mtime)
457
        rp = self.conn.execute(u)
458
        rp.close()
459
        if rp.rowcount == 0:
460
            ins = self.statistics.insert()
461
            ins = ins.values(node=node, population=population, size=size,
462
                             mtime=mtime, cluster=cluster)
463
            self.conn.execute(ins).close()
464
    
465
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
466
        """Update the statistics of the given node's parent.
467
           Then recursively update all parents up to the root.
468
           Population is not recursive.
469
        """
470
        
471
        while True:
472
            if node == ROOTNODE:
473
                break
474
            props = self.node_get_properties(node)
475
            if props is None:
476
                break
477
            parent, path = props
478
            self.statistics_update(parent, population, size, mtime, cluster)
479
            node = parent
480
            population = 0 # Population isn't recursive
481
    
482
    def statistics_latest(self, node, before=inf, except_cluster=0):
483
        """Return population, total size and last mtime
484
           for all latest versions under node that
485
           do not belong to the cluster.
486
        """
487
        
488
        # The node.
489
        props = self.node_get_properties(node)
490
        if props is None:
491
            return None
492
        parent, path = props
493
        
494
        # The latest version.
495
        s = select([self.versions.c.serial,
496
                    self.versions.c.node,
497
                    self.versions.c.hash,
498
                    self.versions.c.size,
499
                    self.versions.c.source,
500
                    self.versions.c.mtime,
501
                    self.versions.c.muser,
502
                    self.versions.c.uuid,
503
                    self.versions.c.cluster])
504
        filtered = select([func.max(self.versions.c.serial)],
505
                            self.versions.c.node == node)
506
        if before != inf:
507
            filtered = filtered.where(self.versions.c.mtime < before)
508
        s = s.where(and_(self.versions.c.cluster != except_cluster,
509
                         self.versions.c.serial == filtered))
510
        r = self.conn.execute(s)
511
        props = r.fetchone()
512
        r.close()
513
        if not props:
514
            return None
515
        mtime = props[MTIME]
516
        
517
        # First level, just under node (get population).
518
        v = self.versions.alias('v')
519
        s = select([func.count(v.c.serial),
520
                    func.sum(v.c.size),
521
                    func.max(v.c.mtime)])
522
        c1 = select([func.max(self.versions.c.serial)])
523
        if before != inf:
524
            c1 = c1.where(self.versions.c.mtime < before)
525
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
526
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
527
                         v.c.cluster != except_cluster,
528
                         v.c.node.in_(c2)))
529
        rp = self.conn.execute(s)
530
        r = rp.fetchone()
531
        rp.close()
532
        if not r:
533
            return None
534
        count = r[0]
535
        mtime = max(mtime, r[2])
536
        if count == 0:
537
            return (0, 0, mtime)
538
        
539
        # All children (get size and mtime).
540
        # XXX: This is why the full path is stored.
541
        s = select([func.count(v.c.serial),
542
                    func.sum(v.c.size),
543
                    func.max(v.c.mtime)])
544
        c1 = select([func.max(self.versions.c.serial)],
545
            self.versions.c.node == v.c.node)
546
        if before != inf:
547
            c1 = c1.where(self.versions.c.mtime < before)
548
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
549
        s = s.where(and_(v.c.serial == c1,
550
                         v.c.cluster != except_cluster,
551
                         v.c.node.in_(c2)))
552
        rp = self.conn.execute(s)
553
        r = rp.fetchone()
554
        rp.close()
555
        if not r:
556
            return None
557
        size = r[1] - props[SIZE]
558
        mtime = max(mtime, r[2])
559
        return (count, size, mtime)
560
    
561
    def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
562
        """Create a new version from the given properties.
563
           Return the (serial, mtime) of the new version.
564
        """
565
        
566
        mtime = time()
567
        s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
568
                                          mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
569
        serial = self.conn.execute(s).inserted_primary_key[0]
570
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
571
        return serial, mtime
572
    
573
    def version_lookup(self, node, before=inf, cluster=0):
574
        """Lookup the current version of the given node.
575
           Return a list with its properties:
576
           (serial, node, hash, size, source, mtime, muser, uuid, cluster)
577
           or None if the current version is not found in the given cluster.
578
        """
579
        
580
        v = self.versions.alias('v')
581
        s = select([v.c.serial, v.c.node, v.c.hash,
582
                    v.c.size, v.c.source, v.c.mtime,
583
                    v.c.muser, v.c.uuid, v.c.cluster])
584
        c = select([func.max(self.versions.c.serial)],
585
            self.versions.c.node == node)
586
        if before != inf:
587
            c = c.where(self.versions.c.mtime < before)
588
        s = s.where(and_(v.c.serial == c,
589
                         v.c.cluster == cluster))
590
        r = self.conn.execute(s)
591
        props = r.fetchone()
592
        r.close()
593
        if props:
594
            return props
595
        return None
596
    
597
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
598
        """Return a sequence of values for the properties of
599
           the version specified by serial and the keys, in the order given.
600
           If keys is empty, return all properties in the order
601
           (serial, node, hash, size, source, mtime, muser, uuid, cluster).
602
        """
603
        
604
        v = self.versions.alias()
605
        s = select([v.c.serial, v.c.node, v.c.hash,
606
                    v.c.size, v.c.source, v.c.mtime,
607
                    v.c.muser, v.c.uuid, v.c.cluster], v.c.serial == serial)
608
        rp = self.conn.execute(s)
609
        r = rp.fetchone()
610
        rp.close()
611
        if r is None:
612
            return r
613
        
614
        if not keys:
615
            return r
616
        return [r[propnames[k]] for k in keys if k in propnames]
617
    
618
    def version_recluster(self, serial, cluster):
619
        """Move the version into another cluster."""
620
        
621
        props = self.version_get_properties(serial)
622
        if not props:
623
            return
624
        node = props[NODE]
625
        size = props[SIZE]
626
        oldcluster = props[CLUSTER]
627
        if cluster == oldcluster:
628
            return
629
        
630
        mtime = time()
631
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
632
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
633
        
634
        s = self.versions.update()
635
        s = s.where(self.versions.c.serial == serial)
636
        s = s.values(cluster = cluster)
637
        self.conn.execute(s).close()
638
    
639
    def version_remove(self, serial):
640
        """Remove the serial specified."""
641
        
642
        props = self.version_get_properties(serial)
643
        if not props:
644
            return
645
        node = props[NODE]
646
        hash = props[HASH]
647
        size = props[SIZE]
648
        cluster = props[CLUSTER]
649
        
650
        mtime = time()
651
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
652
        
653
        s = self.versions.delete().where(self.versions.c.serial == serial)
654
        self.conn.execute(s).close()
655
        return hash
656
    
657
    def attribute_get(self, serial, domain, keys=()):
658
        """Return a list of (key, value) pairs of the version specified by serial.
659
           If keys is empty, return all attributes.
660
           Othwerise, return only those specified.
661
        """
662
        
663
        if keys:
664
            attrs = self.attributes.alias()
665
            s = select([attrs.c.key, attrs.c.value])
666
            s = s.where(and_(attrs.c.key.in_(keys),
667
                             attrs.c.serial == serial,
668
                             attrs.c.domain == domain))
669
        else:
670
            attrs = self.attributes.alias()
671
            s = select([attrs.c.key, attrs.c.value])
672
            s = s.where(and_(attrs.c.serial == serial,
673
                             attrs.c.domain == domain))
674
        r = self.conn.execute(s)
675
        l = r.fetchall()
676
        r.close()
677
        return l
678
    
679
    def attribute_set(self, serial, domain, items):
680
        """Set the attributes of the version specified by serial.
681
           Receive attributes as an iterable of (key, value) pairs.
682
        """
683
        #insert or replace
684
        #TODO better upsert
685
        for k, v in items:
686
            s = self.attributes.update()
687
            s = s.where(and_(self.attributes.c.serial == serial,
688
                             self.attributes.c.domain == domain,
689
                             self.attributes.c.key == k))
690
            s = s.values(value = v)
691
            rp = self.conn.execute(s)
692
            rp.close()
693
            if rp.rowcount == 0:
694
                s = self.attributes.insert()
695
                s = s.values(serial=serial, domain=domain, key=k, value=v)
696
                self.conn.execute(s).close()
697
    
698
    def attribute_del(self, serial, domain, keys=()):
699
        """Delete attributes of the version specified by serial.
700
           If keys is empty, delete all attributes.
701
           Otherwise delete those specified.
702
        """
703
        
704
        if keys:
705
            #TODO more efficient way to do this?
706
            for key in keys:
707
                s = self.attributes.delete()
708
                s = s.where(and_(self.attributes.c.serial == serial,
709
                                 self.attributes.c.domain == domain,
710
                                 self.attributes.c.key == key))
711
                self.conn.execute(s).close()
712
        else:
713
            s = self.attributes.delete()
714
            s = s.where(and_(self.attributes.c.serial == serial,
715
                             self.attributes.c.domain == domain))
716
            self.conn.execute(s).close()
717
    
718
    def attribute_copy(self, source, dest):
719
        s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
720
            self.attributes.c.serial == source)
721
        rp = self.conn.execute(s)
722
        attributes = rp.fetchall()
723
        rp.close()
724
        for dest, domain, k, v in attributes:
725
            #insert or replace
726
            s = self.attributes.update().where(and_(
727
                self.attributes.c.serial == dest,
728
                self.attributes.c.domain == domain,
729
                self.attributes.c.key == k))
730
            rp = self.conn.execute(s, value=v)
731
            rp.close()
732
            if rp.rowcount == 0:
733
                s = self.attributes.insert()
734
                values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
735
                self.conn.execute(s, values).close()
736
    
737
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
738
        """Return a list with all keys pairs defined
739
           for all latest versions under parent that
740
           do not belong to the cluster.
741
        """
742
        
743
        # TODO: Use another table to store before=inf results.
744
        a = self.attributes.alias('a')
745
        v = self.versions.alias('v')
746
        n = self.nodes.alias('n')
747
        s = select([a.c.key]).distinct()
748
        filtered = select([func.max(self.versions.c.serial)])
749
        if before != inf:
750
            filtered = filtered.where(self.versions.c.mtime < before)
751
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
752
        s = s.where(v.c.cluster != except_cluster)
753
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
754
            self.nodes.c.parent == parent)))
755
        s = s.where(a.c.serial == v.c.serial)
756
        s = s.where(a.c.domain == domain)
757
        s = s.where(n.c.node == v.c.node)
758
        conj = []
759
        for x in pathq:
760
            conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
761
        if conj:
762
            s = s.where(or_(*conj))
763
        rp = self.conn.execute(s)
764
        rows = rp.fetchall()
765
        rp.close()
766
        return [r[0] for r in rows]
767
    
768
    def latest_version_list(self, parent, prefix='', delimiter=None,
769
                            start='', limit=10000, before=inf,
770
                            except_cluster=0, pathq=[], domain=None, filterq=[]):
771
        """Return a (list of (path, serial) tuples, list of common prefixes)
772
           for the current versions of the paths with the given parent,
773
           matching the following criteria.
774
           
775
           The property tuple for a version is returned if all
776
           of these conditions are true:
777
                
778
                a. parent matches
779
                
780
                b. path > start
781
                
782
                c. path starts with prefix (and paths in pathq)
783
                
784
                d. version is the max up to before
785
                
786
                e. version is not in cluster
787
                
788
                f. the path does not have the delimiter occuring
789
                   after the prefix, or ends with the delimiter
790
                
791
                g. serial matches the attribute filter query.
792
                   
793
                   A filter query is a comma-separated list of
794
                   terms in one of these three forms:
795
                   
796
                   key
797
                       an attribute with this key must exist
798
                   
799
                   !key
800
                       an attribute with this key must not exist
801
                   
802
                   key ?op value
803
                       the attribute with this key satisfies the value
804
                       where ?op is one of ==, != <=, >=, <, >.
805
           
806
           The list of common prefixes includes the prefixes
807
           matching up to the first delimiter after prefix,
808
           and are reported only once, as "virtual directories".
809
           The delimiter is included in the prefixes.
810
           
811
           If arguments are None, then the corresponding matching rule
812
           will always match.
813
           
814
           Limit applies to the first list of tuples returned.
815
        """
816
        
817
        if not start or start < prefix:
818
            start = strprevling(prefix)
819
        nextling = strnextling(prefix)
820
        
821
        a = self.attributes.alias('a')
822
        v = self.versions.alias('v')
823
        n = self.nodes.alias('n')
824
        s = select([n.c.path, v.c.serial]).distinct()
825
        filtered = select([func.max(self.versions.c.serial)])
826
        if before != inf:
827
            filtered = filtered.where(self.versions.c.mtime < before)
828
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
829
        s = s.where(v.c.cluster != except_cluster)
830
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
831
            self.nodes.c.parent == parent)))
832
        
833
        s = s.where(n.c.node == v.c.node)
834
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
835
        conj = []
836
        for x in pathq:
837
            conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
838
        
839
        if conj:
840
            s = s.where(or_(*conj))
841
        
842
        s = s.order_by(n.c.path)
843
        
844
        def filterout(r):
845
            if not filterq:
846
                return False
847
            path, serial = r
848
            included, excluded, opers = parse_filters(filterq)
849
            
850
            #retrieve metadata
851
            s = select([a.c.key, a.c.value])
852
            s = s.where(a.c.domain == domain)
853
            s = s.where(a.c.serial == serial)
854
            rp = self.conn.execute(s)
855
            meta = dict(rp.fetchall())
856
            keyset= set([k.encode('utf8') for k in meta.keys()])
857
            
858
            if included:
859
                if not set(included) & keyset:
860
                    return True
861
            if excluded:
862
                if set(excluded) & keyset:
863
                    return True
864
            for k, op, v in opers:
865
                k = k.decode('utf8')
866
                v = v.decode('utf8')
867
                if k not in meta:
868
                    return True
869
                operation = OPERATORS[op]
870
                if not operation(meta[k], v):
871
                    return True
872
            return False
873
    
874
        if not delimiter:
875
            s = s.limit(limit)
876
            rp = self.conn.execute(s, start=start)
877
            filter_ = lambda r : [t for t in r if not filterout(t)]
878
            r = filter_(rp.fetchall())
879
            rp.close()
880
            return r, ()
881
        
882
        pfz = len(prefix)
883
        dz = len(delimiter)
884
        count = 0
885
        prefixes = []
886
        pappend = prefixes.append
887
        matches = []
888
        mappend = matches.append
889
        
890
        rp = self.conn.execute(s, start=start)
891
        while True:
892
            props = rp.fetchone()
893
            if props is None:
894
                break
895
            if filterout(props):
896
                continue
897
            path, serial = props
898
            idx = path.find(delimiter, pfz)
899
            
900
            if idx < 0:
901
                mappend(props)
902
                count += 1
903
                if count >= limit:
904
                    break
905
                continue
906
            
907
            if idx + dz == len(path):
908
                mappend(props)
909
                count += 1
910
                continue # Get one more, in case there is a path.
911
            pf = path[:idx + dz]
912
            pappend(pf)
913
            if count >= limit: 
914
                break
915
            
916
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
917
        rp.close()
918
        
919
        return matches, prefixes
920
    
921
    def latest_uuid(self, uuid):
922
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
923
        
924
        v = self.versions.alias('v')
925
        n = self.nodes.alias('n')
926
        s = select([n.c.path, v.c.serial])
927
        filtered = select([func.max(self.versions.c.serial)])
928
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
929
        s = s.where(n.c.node == v.c.node)
930
        
931
        r = self.conn.execute(s)
932
        l = r.fetchone()
933
        r.close()
934
        return l