Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / sqlalchemy / node.py @ cf341da4

History | View | Annotate | Download (35.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
from pithos.lib.filter import parse_filters
45

    
46

    
47
ROOTNODE  = 0
48

    
49
( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(10)
50

    
51
( MATCH_PREFIX, MATCH_EXACT ) = range(2)
52

    
53
inf = float('inf')
54

    
55

    
56
def strnextling(prefix):
57
    """Return the first unicode string
58
       greater than but not starting with given prefix.
59
       strnextling('hello') -> 'hellp'
60
    """
61
    if not prefix:
62
        ## all strings start with the null string,
63
        ## therefore we have to approximate strnextling('')
64
        ## with the last unicode character supported by python
65
        ## 0x10ffff for wide (32-bit unicode) python builds
66
        ## 0x00ffff for narrow (16-bit unicode) python builds
67
        ## We will not autodetect. 0xffff is safe enough.
68
        return unichr(0xffff)
69
    s = prefix[:-1]
70
    c = ord(prefix[-1])
71
    if c >= 0xffff:
72
        raise RuntimeError
73
    s += unichr(c+1)
74
    return s
75

    
76
def strprevling(prefix):
77
    """Return an approximation of the last unicode string
78
       less than but not starting with given prefix.
79
       strprevling(u'hello') -> u'helln\\xffff'
80
    """
81
    if not prefix:
82
        ## There is no prevling for the null string
83
        return prefix
84
    s = prefix[:-1]
85
    c = ord(prefix[-1])
86
    if c > 0:
87
        s += unichr(c-1) + unichr(0xffff)
88
    return s
89

    
90
_propnames = {
91
    'serial'    : 0,
92
    'node'      : 1,
93
    'hash'      : 2,
94
    'size'      : 3,
95
    'type'      : 4,
96
    'source'    : 5,
97
    'mtime'     : 6,
98
    'muser'     : 7,
99
    'uuid'      : 8,
100
    'cluster'   : 9
101
}
102

    
103

    
104
class Node(DBWorker):
105
    """Nodes store path organization and have multiple versions.
106
       Versions store object history and have multiple attributes.
107
       Attributes store metadata.
108
    """
109
    
110
    # TODO: Provide an interface for included and excluded clusters.
111
    
112
    def __init__(self, **params):
113
        DBWorker.__init__(self, **params)
114
        metadata = MetaData()
115
        
116
        #create nodes table
117
        columns=[]
118
        columns.append(Column('node', Integer, primary_key=True))
119
        columns.append(Column('parent', Integer,
120
                              ForeignKey('nodes.node',
121
                                         ondelete='CASCADE',
122
                                         onupdate='CASCADE'),
123
                              autoincrement=False))
124
        path_length = 2048
125
        columns.append(Column('path', String(path_length), default='', nullable=False))
126
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
127
        Index('idx_nodes_path', self.nodes.c.path, unique=True)
128
        
129
        #create policy table
130
        columns=[]
131
        columns.append(Column('node', Integer,
132
                              ForeignKey('nodes.node',
133
                                         ondelete='CASCADE',
134
                                         onupdate='CASCADE'),
135
                              primary_key=True))
136
        columns.append(Column('key', String(255), primary_key=True))
137
        columns.append(Column('value', String(255)))
138
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
139
        
140
        #create statistics table
141
        columns=[]
142
        columns.append(Column('node', Integer,
143
                              ForeignKey('nodes.node',
144
                                         ondelete='CASCADE',
145
                                         onupdate='CASCADE'),
146
                              primary_key=True))
147
        columns.append(Column('population', Integer, nullable=False, default=0))
148
        columns.append(Column('size', BigInteger, nullable=False, default=0))
149
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
150
        columns.append(Column('cluster', Integer, nullable=False, default=0,
151
                              primary_key=True, autoincrement=False))
152
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
153
        
154
        #create versions table
155
        columns=[]
156
        columns.append(Column('serial', Integer, primary_key=True))
157
        columns.append(Column('node', Integer,
158
                              ForeignKey('nodes.node',
159
                                         ondelete='CASCADE',
160
                                         onupdate='CASCADE')))
161
        columns.append(Column('hash', String(255)))
162
        columns.append(Column('size', BigInteger, nullable=False, default=0))
163
        columns.append(Column('type', String(255), nullable=False, default=''))
164
        columns.append(Column('source', Integer))
165
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
166
        columns.append(Column('muser', String(255), nullable=False, default=''))
167
        columns.append(Column('uuid', String(64), nullable=False, default=''))
168
        columns.append(Column('cluster', Integer, nullable=False, default=0))
169
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
170
        Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
171
        Index('idx_versions_node_uuid', self.versions.c.uuid)
172
        
173
        #create attributes table
174
        columns = []
175
        columns.append(Column('serial', Integer,
176
                              ForeignKey('versions.serial',
177
                                         ondelete='CASCADE',
178
                                         onupdate='CASCADE'),
179
                              primary_key=True))
180
        columns.append(Column('domain', String(255), primary_key=True))
181
        columns.append(Column('key', String(255), primary_key=True))
182
        columns.append(Column('value', String(255)))
183
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
184
        
185
        metadata.create_all(self.engine)
186
        
187
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
188
                                           self.nodes.c.parent == ROOTNODE))
189
        rp = self.conn.execute(s)
190
        r = rp.fetchone()
191
        rp.close()
192
        if not r:
193
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
194
            self.conn.execute(s)
195
    
196
    def node_create(self, parent, path):
197
        """Create a new node from the given properties.
198
           Return the node identifier of the new node.
199
        """
200
        #TODO catch IntegrityError?
201
        s = self.nodes.insert().values(parent=parent, path=path)
202
        r = self.conn.execute(s)
203
        inserted_primary_key = r.inserted_primary_key[0]
204
        r.close()
205
        return inserted_primary_key
206
    
207
    def node_lookup(self, path):
208
        """Lookup the current node of the given path.
209
           Return None if the path is not found.
210
        """
211
        
212
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
213
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
214
        r = self.conn.execute(s)
215
        row = r.fetchone()
216
        r.close()
217
        if row:
218
            return row[0]
219
        return None
220
    
221
    def node_get_properties(self, node):
222
        """Return the node's (parent, path).
223
           Return None if the node is not found.
224
        """
225
        
226
        s = select([self.nodes.c.parent, self.nodes.c.path])
227
        s = s.where(self.nodes.c.node == node)
228
        r = self.conn.execute(s)
229
        l = r.fetchone()
230
        r.close()
231
        return l
232
    
233
    def node_get_versions(self, node, keys=(), propnames=_propnames):
234
        """Return the properties of all versions at node.
235
           If keys is empty, return all properties in the order
236
           (serial, node, hash, size, type, source, mtime, muser, uuid, cluster).
237
        """
238
        
239
        s = select([self.versions.c.serial,
240
                    self.versions.c.node,
241
                    self.versions.c.hash,
242
                    self.versions.c.size,
243
                    self.versions.c.type,
244
                    self.versions.c.source,
245
                    self.versions.c.mtime,
246
                    self.versions.c.muser,
247
                    self.versions.c.uuid,
248
                    self.versions.c.cluster], self.versions.c.node == node)
249
        s = s.order_by(self.versions.c.serial)
250
        r = self.conn.execute(s)
251
        rows = r.fetchall()
252
        r.close()
253
        if not rows:
254
            return rows
255
        
256
        if not keys:
257
            return rows
258
        
259
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
260
    
261
    def node_count_children(self, node):
262
        """Return node's child count."""
263
        
264
        s = select([func.count(self.nodes.c.node)])
265
        s = s.where(and_(self.nodes.c.parent == node,
266
                         self.nodes.c.node != ROOTNODE))
267
        r = self.conn.execute(s)
268
        row = r.fetchone()
269
        r.close()
270
        return row[0]
271
    
272
    def node_purge_children(self, parent, before=inf, cluster=0):
273
        """Delete all versions with the specified
274
           parent and cluster, and return
275
           the hashes of versions deleted.
276
           Clears out nodes with no remaining versions.
277
        """
278
        #update statistics
279
        c1 = select([self.nodes.c.node],
280
            self.nodes.c.parent == parent)
281
        where_clause = and_(self.versions.c.node.in_(c1),
282
                            self.versions.c.cluster == cluster)
283
        s = select([func.count(self.versions.c.serial),
284
                    func.sum(self.versions.c.size)])
285
        s = s.where(where_clause)
286
        if before != inf:
287
            s = s.where(self.versions.c.mtime <= before)
288
        r = self.conn.execute(s)
289
        row = r.fetchone()
290
        r.close()
291
        if not row:
292
            return ()
293
        nr, size = row[0], -row[1] if row[1] else 0
294
        mtime = time()
295
        self.statistics_update(parent, -nr, size, mtime, cluster)
296
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
297
        
298
        s = select([self.versions.c.hash])
299
        s = s.where(where_clause)
300
        r = self.conn.execute(s)
301
        hashes = [row[0] for row in r.fetchall()]
302
        r.close()
303
        
304
        #delete versions
305
        s = self.versions.delete().where(where_clause)
306
        r = self.conn.execute(s)
307
        r.close()
308
        
309
        #delete nodes
310
        s = select([self.nodes.c.node],
311
            and_(self.nodes.c.parent == parent,
312
                 select([func.count(self.versions.c.serial)],
313
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
314
        rp = self.conn.execute(s)
315
        nodes = [r[0] for r in rp.fetchall()]
316
        rp.close()
317
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
318
        self.conn.execute(s).close()
319
        
320
        return hashes
321
    
322
    def node_purge(self, node, before=inf, cluster=0):
323
        """Delete all versions with the specified
324
           node and cluster, and return
325
           the hashes of versions deleted.
326
           Clears out the node if it has no remaining versions.
327
        """
328
        
329
        #update statistics
330
        s = select([func.count(self.versions.c.serial),
331
                    func.sum(self.versions.c.size)])
332
        where_clause = and_(self.versions.c.node == node,
333
                         self.versions.c.cluster == cluster)
334
        s = s.where(where_clause)
335
        if before != inf:
336
            s = s.where(self.versions.c.mtime <= before)
337
        r = self.conn.execute(s)
338
        row = r.fetchone()
339
        nr, size = row[0], row[1]
340
        r.close()
341
        if not nr:
342
            return ()
343
        mtime = time()
344
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
345
        
346
        s = select([self.versions.c.hash])
347
        s = s.where(where_clause)
348
        r = self.conn.execute(s)
349
        hashes = [r[0] for r in r.fetchall()]
350
        r.close()
351
        
352
        #delete versions
353
        s = self.versions.delete().where(where_clause)
354
        r = self.conn.execute(s)
355
        r.close()
356
        
357
        #delete nodes
358
        s = select([self.nodes.c.node],
359
            and_(self.nodes.c.node == node,
360
                 select([func.count(self.versions.c.serial)],
361
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
362
        r = self.conn.execute(s)
363
        nodes = r.fetchall()
364
        r.close()
365
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
366
        self.conn.execute(s).close()
367
        
368
        return hashes
369
    
370
    def node_remove(self, node):
371
        """Remove the node specified.
372
           Return false if the node has children or is not found.
373
        """
374
        
375
        if self.node_count_children(node):
376
            return False
377
        
378
        mtime = time()
379
        s = select([func.count(self.versions.c.serial),
380
                    func.sum(self.versions.c.size),
381
                    self.versions.c.cluster])
382
        s = s.where(self.versions.c.node == node)
383
        s = s.group_by(self.versions.c.cluster)
384
        r = self.conn.execute(s)
385
        for population, size, cluster in r.fetchall():
386
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
387
        r.close()
388
        
389
        s = self.nodes.delete().where(self.nodes.c.node == node)
390
        self.conn.execute(s).close()
391
        return True
392
    
393
    def policy_get(self, node):
394
        s = select([self.policies.c.key, self.policies.c.value],
395
            self.policies.c.node==node)
396
        r = self.conn.execute(s)
397
        d = dict(r.fetchall())
398
        r.close()
399
        return d
400
    
401
    def policy_set(self, node, policy):
402
        #insert or replace
403
        for k, v in policy.iteritems():
404
            s = self.policies.update().where(and_(self.policies.c.node == node,
405
                                                  self.policies.c.key == k))
406
            s = s.values(value = v)
407
            rp = self.conn.execute(s)
408
            rp.close()
409
            if rp.rowcount == 0:
410
                s = self.policies.insert()
411
                values = {'node':node, 'key':k, 'value':v}
412
                r = self.conn.execute(s, values)
413
                r.close()
414
    
415
    def statistics_get(self, node, cluster=0):
416
        """Return population, total size and last mtime
417
           for all versions under node that belong to the cluster.
418
        """
419
        
420
        s = select([self.statistics.c.population,
421
                    self.statistics.c.size,
422
                    self.statistics.c.mtime])
423
        s = s.where(and_(self.statistics.c.node == node,
424
                         self.statistics.c.cluster == cluster))
425
        r = self.conn.execute(s)
426
        row = r.fetchone()
427
        r.close()
428
        return row
429
    
430
    def statistics_update(self, node, population, size, mtime, cluster=0):
431
        """Update the statistics of the given node.
432
           Statistics keep track the population, total
433
           size of objects and mtime in the node's namespace.
434
           May be zero or positive or negative numbers.
435
        """
436
        s = select([self.statistics.c.population, self.statistics.c.size],
437
            and_(self.statistics.c.node == node,
438
                 self.statistics.c.cluster == cluster))
439
        rp = self.conn.execute(s)
440
        r = rp.fetchone()
441
        rp.close()
442
        if not r:
443
            prepopulation, presize = (0, 0)
444
        else:
445
            prepopulation, presize = r
446
        population += prepopulation
447
        size += presize
448
        
449
        #insert or replace
450
        #TODO better upsert
451
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
452
                                           self.statistics.c.cluster==cluster))
453
        u = u.values(population=population, size=size, mtime=mtime)
454
        rp = self.conn.execute(u)
455
        rp.close()
456
        if rp.rowcount == 0:
457
            ins = self.statistics.insert()
458
            ins = ins.values(node=node, population=population, size=size,
459
                             mtime=mtime, cluster=cluster)
460
            self.conn.execute(ins).close()
461
    
462
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
463
        """Update the statistics of the given node's parent.
464
           Then recursively update all parents up to the root.
465
           Population is not recursive.
466
        """
467
        
468
        while True:
469
            if node == ROOTNODE:
470
                break
471
            props = self.node_get_properties(node)
472
            if props is None:
473
                break
474
            parent, path = props
475
            self.statistics_update(parent, population, size, mtime, cluster)
476
            node = parent
477
            population = 0 # Population isn't recursive
478
    
479
    def statistics_latest(self, node, before=inf, except_cluster=0):
480
        """Return population, total size and last mtime
481
           for all latest versions under node that
482
           do not belong to the cluster.
483
        """
484
        
485
        # The node.
486
        props = self.node_get_properties(node)
487
        if props is None:
488
            return None
489
        parent, path = props
490
        
491
        # The latest version.
492
        s = select([self.versions.c.serial,
493
                    self.versions.c.node,
494
                    self.versions.c.hash,
495
                    self.versions.c.size,
496
                    self.versions.c.type,
497
                    self.versions.c.source,
498
                    self.versions.c.mtime,
499
                    self.versions.c.muser,
500
                    self.versions.c.uuid,
501
                    self.versions.c.cluster])
502
        filtered = select([func.max(self.versions.c.serial)],
503
                            self.versions.c.node == node)
504
        if before != inf:
505
            filtered = filtered.where(self.versions.c.mtime < before)
506
        s = s.where(and_(self.versions.c.cluster != except_cluster,
507
                         self.versions.c.serial == filtered))
508
        r = self.conn.execute(s)
509
        props = r.fetchone()
510
        r.close()
511
        if not props:
512
            return None
513
        mtime = props[MTIME]
514
        
515
        # First level, just under node (get population).
516
        v = self.versions.alias('v')
517
        s = select([func.count(v.c.serial),
518
                    func.sum(v.c.size),
519
                    func.max(v.c.mtime)])
520
        c1 = select([func.max(self.versions.c.serial)])
521
        if before != inf:
522
            c1 = c1.where(self.versions.c.mtime < before)
523
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
524
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
525
                         v.c.cluster != except_cluster,
526
                         v.c.node.in_(c2)))
527
        rp = self.conn.execute(s)
528
        r = rp.fetchone()
529
        rp.close()
530
        if not r:
531
            return None
532
        count = r[0]
533
        mtime = max(mtime, r[2])
534
        if count == 0:
535
            return (0, 0, mtime)
536
        
537
        # All children (get size and mtime).
538
        # This is why the full path is stored.
539
        s = select([func.count(v.c.serial),
540
                    func.sum(v.c.size),
541
                    func.max(v.c.mtime)])
542
        c1 = select([func.max(self.versions.c.serial)],
543
            self.versions.c.node == v.c.node)
544
        if before != inf:
545
            c1 = c1.where(self.versions.c.mtime < before)
546
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
547
        s = s.where(and_(v.c.serial == c1,
548
                         v.c.cluster != except_cluster,
549
                         v.c.node.in_(c2)))
550
        rp = self.conn.execute(s)
551
        r = rp.fetchone()
552
        rp.close()
553
        if not r:
554
            return None
555
        size = r[1] - props[SIZE]
556
        mtime = max(mtime, r[2])
557
        return (count, size, mtime)
558
    
559
    def version_create(self, node, hash, size, type, source, muser, uuid, cluster=0):
560
        """Create a new version from the given properties.
561
           Return the (serial, mtime) of the new version.
562
        """
563
        
564
        mtime = time()
565
        s = self.versions.insert().values(node=node, hash=hash, size=size, type=type, source=source,
566
                                          mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
567
        serial = self.conn.execute(s).inserted_primary_key[0]
568
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
569
        return serial, mtime
570
    
571
    def version_lookup(self, node, before=inf, cluster=0):
572
        """Lookup the current version of the given node.
573
           Return a list with its properties:
574
           (serial, node, hash, size, type, source, mtime, muser, uuid, cluster)
575
           or None if the current version is not found in the given cluster.
576
        """
577
        
578
        v = self.versions.alias('v')
579
        s = select([v.c.serial, v.c.node, v.c.hash,
580
                    v.c.size, v.c.type, v.c.source,
581
                    v.c.mtime, v.c.muser, v.c.uuid,
582
                    v.c.cluster])
583
        c = select([func.max(self.versions.c.serial)],
584
            self.versions.c.node == node)
585
        if before != inf:
586
            c = c.where(self.versions.c.mtime < before)
587
        s = s.where(and_(v.c.serial == c,
588
                         v.c.cluster == cluster))
589
        r = self.conn.execute(s)
590
        props = r.fetchone()
591
        r.close()
592
        if props:
593
            return props
594
        return None
595
    
596
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
597
        """Return a sequence of values for the properties of
598
           the version specified by serial and the keys, in the order given.
599
           If keys is empty, return all properties in the order
600
           (serial, node, hash, size, type, source, mtime, muser, uuid, cluster).
601
        """
602
        
603
        v = self.versions.alias()
604
        s = select([v.c.serial, v.c.node, v.c.hash,
605
                    v.c.size, v.c.type, v.c.source,
606
                    v.c.mtime, v.c.muser, v.c.uuid,
607
                    v.c.cluster], v.c.serial == serial)
608
        rp = self.conn.execute(s)
609
        r = rp.fetchone()
610
        rp.close()
611
        if r is None:
612
            return r
613
        
614
        if not keys:
615
            return r
616
        return [r[propnames[k]] for k in keys if k in propnames]
617
    
618
    def version_recluster(self, serial, cluster):
619
        """Move the version into another cluster."""
620
        
621
        props = self.version_get_properties(serial)
622
        if not props:
623
            return
624
        node = props[NODE]
625
        size = props[SIZE]
626
        oldcluster = props[CLUSTER]
627
        if cluster == oldcluster:
628
            return
629
        
630
        mtime = time()
631
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
632
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
633
        
634
        s = self.versions.update()
635
        s = s.where(self.versions.c.serial == serial)
636
        s = s.values(cluster = cluster)
637
        self.conn.execute(s).close()
638
    
639
    def version_remove(self, serial):
640
        """Remove the serial specified."""
641
        
642
        props = self.version_get_properties(serial)
643
        if not props:
644
            return
645
        node = props[NODE]
646
        hash = props[HASH]
647
        size = props[SIZE]
648
        cluster = props[CLUSTER]
649
        
650
        mtime = time()
651
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
652
        
653
        s = self.versions.delete().where(self.versions.c.serial == serial)
654
        self.conn.execute(s).close()
655
        return hash
656
    
657
    def attribute_get(self, serial, domain, keys=()):
658
        """Return a list of (key, value) pairs of the version specified by serial.
659
           If keys is empty, return all attributes.
660
           Othwerise, return only those specified.
661
        """
662
        
663
        if keys:
664
            attrs = self.attributes.alias()
665
            s = select([attrs.c.key, attrs.c.value])
666
            s = s.where(and_(attrs.c.key.in_(keys),
667
                             attrs.c.serial == serial,
668
                             attrs.c.domain == domain))
669
        else:
670
            attrs = self.attributes.alias()
671
            s = select([attrs.c.key, attrs.c.value])
672
            s = s.where(and_(attrs.c.serial == serial,
673
                             attrs.c.domain == domain))
674
        r = self.conn.execute(s)
675
        l = r.fetchall()
676
        r.close()
677
        return l
678
    
679
    def attribute_set(self, serial, domain, items):
680
        """Set the attributes of the version specified by serial.
681
           Receive attributes as an iterable of (key, value) pairs.
682
        """
683
        #insert or replace
684
        #TODO better upsert
685
        for k, v in items:
686
            s = self.attributes.update()
687
            s = s.where(and_(self.attributes.c.serial == serial,
688
                             self.attributes.c.domain == domain,
689
                             self.attributes.c.key == k))
690
            s = s.values(value = v)
691
            rp = self.conn.execute(s)
692
            rp.close()
693
            if rp.rowcount == 0:
694
                s = self.attributes.insert()
695
                s = s.values(serial=serial, domain=domain, key=k, value=v)
696
                self.conn.execute(s).close()
697
    
698
    def attribute_del(self, serial, domain, keys=()):
699
        """Delete attributes of the version specified by serial.
700
           If keys is empty, delete all attributes.
701
           Otherwise delete those specified.
702
        """
703
        
704
        if keys:
705
            #TODO more efficient way to do this?
706
            for key in keys:
707
                s = self.attributes.delete()
708
                s = s.where(and_(self.attributes.c.serial == serial,
709
                                 self.attributes.c.domain == domain,
710
                                 self.attributes.c.key == key))
711
                self.conn.execute(s).close()
712
        else:
713
            s = self.attributes.delete()
714
            s = s.where(and_(self.attributes.c.serial == serial,
715
                             self.attributes.c.domain == domain))
716
            self.conn.execute(s).close()
717
    
718
    def attribute_copy(self, source, dest):
719
        s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
720
            self.attributes.c.serial == source)
721
        rp = self.conn.execute(s)
722
        attributes = rp.fetchall()
723
        rp.close()
724
        for dest, domain, k, v in attributes:
725
            #insert or replace
726
            s = self.attributes.update().where(and_(
727
                self.attributes.c.serial == dest,
728
                self.attributes.c.domain == domain,
729
                self.attributes.c.key == k))
730
            rp = self.conn.execute(s, value=v)
731
            rp.close()
732
            if rp.rowcount == 0:
733
                s = self.attributes.insert()
734
                values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
735
                self.conn.execute(s, values).close()
736
    
737
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
738
        """Return a list with all keys pairs defined
739
           for all latest versions under parent that
740
           do not belong to the cluster.
741
        """
742
        
743
        # TODO: Use another table to store before=inf results.
744
        a = self.attributes.alias('a')
745
        v = self.versions.alias('v')
746
        n = self.nodes.alias('n')
747
        s = select([a.c.key]).distinct()
748
        filtered = select([func.max(self.versions.c.serial)])
749
        if before != inf:
750
            filtered = filtered.where(self.versions.c.mtime < before)
751
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
752
        s = s.where(v.c.cluster != except_cluster)
753
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
754
            self.nodes.c.parent == parent)))
755
        s = s.where(a.c.serial == v.c.serial)
756
        s = s.where(a.c.domain == domain)
757
        s = s.where(n.c.node == v.c.node)
758
        conj = []
759
        for path, match in pathq:
760
            if match == MATCH_PREFIX:
761
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
762
            elif match == MATCH_EXACT:
763
                conj.append(n.c.path == path)
764
        if conj:
765
            s = s.where(or_(*conj))
766
        rp = self.conn.execute(s)
767
        rows = rp.fetchall()
768
        rp.close()
769
        return [r[0] for r in rows]
770
    
771
    def latest_version_list(self, parent, prefix='', delimiter=None,
772
                            start='', limit=10000, before=inf,
773
                            except_cluster=0, pathq=[], domain=None, filterq=[], sizeq=None):
774
        """Return a (list of (path, serial) tuples, list of common prefixes)
775
           for the current versions of the paths with the given parent,
776
           matching the following criteria.
777
           
778
           The property tuple for a version is returned if all
779
           of these conditions are true:
780
                
781
                a. parent matches
782
                
783
                b. path > start
784
                
785
                c. path starts with prefix (and paths in pathq)
786
                
787
                d. version is the max up to before
788
                
789
                e. version is not in cluster
790
                
791
                f. the path does not have the delimiter occuring
792
                   after the prefix, or ends with the delimiter
793
                
794
                g. serial matches the attribute filter query.
795
                   
796
                   A filter query is a comma-separated list of
797
                   terms in one of these three forms:
798
                   
799
                   key
800
                       an attribute with this key must exist
801
                   
802
                   !key
803
                       an attribute with this key must not exist
804
                   
805
                   key ?op value
806
                       the attribute with this key satisfies the value
807
                       where ?op is one of ==, != <=, >=, <, >.
808
                
809
                h. the size is in the range set by sizeq
810
           
811
           The list of common prefixes includes the prefixes
812
           matching up to the first delimiter after prefix,
813
           and are reported only once, as "virtual directories".
814
           The delimiter is included in the prefixes.
815
           
816
           If arguments are None, then the corresponding matching rule
817
           will always match.
818
           
819
           Limit applies to the first list of tuples returned.
820
        """
821
        
822
        if not start or start < prefix:
823
            start = strprevling(prefix)
824
        nextling = strnextling(prefix)
825
        
826
        v = self.versions.alias('v')
827
        n = self.nodes.alias('n')
828
        s = select([n.c.path, v.c.serial]).distinct()
829
        filtered = select([func.max(self.versions.c.serial)])
830
        if before != inf:
831
            filtered = filtered.where(self.versions.c.mtime < before)
832
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
833
        s = s.where(v.c.cluster != except_cluster)
834
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
835
            self.nodes.c.parent == parent)))
836
        
837
        s = s.where(n.c.node == v.c.node)
838
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
839
        conj = []
840
        for path, match in pathq:
841
            if match == MATCH_PREFIX:
842
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
843
            elif match == MATCH_EXACT:
844
                conj.append(n.c.path == path)
845
        if conj:
846
            s = s.where(or_(*conj))
847
        
848
        if sizeq and len(sizeq) == 2:
849
            if sizeq[0]:
850
                s = s.where(v.c.size >= sizeq[0])
851
            if sizeq[1]:
852
                s = s.where(v.c.size < sizeq[1])
853
        
854
        if domain and filterq:
855
            a = self.attributes.alias('a')
856
            included, excluded, opers = parse_filters(filterq)
857
            if included:
858
                subs = select([1])
859
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
860
                subs = subs.where(a.c.domain == domain)
861
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
862
                s = s.where(exists(subs))
863
            if excluded:
864
                subs = select([1])
865
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
866
                subs = subs.where(a.c.domain == domain)
867
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
868
                s = s.where(not_(exists(subs)))
869
            if opers:
870
                for k, o, val in opers:
871
                    subs = select([1])
872
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
873
                    subs = subs.where(a.c.domain == domain)
874
                    subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
875
                    s = s.where(exists(subs))
876
        
877
        s = s.order_by(n.c.path)
878
        
879
        if not delimiter:
880
            s = s.limit(limit)
881
            rp = self.conn.execute(s, start=start)
882
            r = rp.fetchall()
883
            rp.close()
884
            return r, ()
885
        
886
        pfz = len(prefix)
887
        dz = len(delimiter)
888
        count = 0
889
        prefixes = []
890
        pappend = prefixes.append
891
        matches = []
892
        mappend = matches.append
893
        
894
        rp = self.conn.execute(s, start=start)
895
        while True:
896
            props = rp.fetchone()
897
            if props is None:
898
                break
899
            path, serial = props
900
            idx = path.find(delimiter, pfz)
901
            
902
            if idx < 0:
903
                mappend(props)
904
                count += 1
905
                if count >= limit:
906
                    break
907
                continue
908
            
909
            if idx + dz == len(path):
910
                mappend(props)
911
                count += 1
912
                continue # Get one more, in case there is a path.
913
            pf = path[:idx + dz]
914
            pappend(pf)
915
            if count >= limit: 
916
                break
917
            
918
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
919
        rp.close()
920
        
921
        return matches, prefixes
922
    
923
    def latest_uuid(self, uuid):
924
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
925
        
926
        v = self.versions.alias('v')
927
        n = self.nodes.alias('n')
928
        s = select([n.c.path, v.c.serial])
929
        filtered = select([func.max(self.versions.c.serial)])
930
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
931
        s = s.where(n.c.node == v.c.node)
932
        
933
        r = self.conn.execute(s)
934
        l = r.fetchone()
935
        r.close()
936
        return l