Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / sqlalchemy / node.py @ 371d907a

History | View | Annotate | Download (36.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
from pithos.lib.filter import parse_filters
45

    
46

    
47
ROOTNODE  = 0
48

    
49
( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
50

    
51
( MATCH_PREFIX, MATCH_EXACT ) = range(2)
52

    
53
inf = float('inf')
54

    
55

    
56
def strnextling(prefix):
57
    """Return the first unicode string
58
       greater than but not starting with given prefix.
59
       strnextling('hello') -> 'hellp'
60
    """
61
    if not prefix:
62
        ## all strings start with the null string,
63
        ## therefore we have to approximate strnextling('')
64
        ## with the last unicode character supported by python
65
        ## 0x10ffff for wide (32-bit unicode) python builds
66
        ## 0x00ffff for narrow (16-bit unicode) python builds
67
        ## We will not autodetect. 0xffff is safe enough.
68
        return unichr(0xffff)
69
    s = prefix[:-1]
70
    c = ord(prefix[-1])
71
    if c >= 0xffff:
72
        raise RuntimeError
73
    s += unichr(c+1)
74
    return s
75

    
76
def strprevling(prefix):
77
    """Return an approximation of the last unicode string
78
       less than but not starting with given prefix.
79
       strprevling(u'hello') -> u'helln\\xffff'
80
    """
81
    if not prefix:
82
        ## There is no prevling for the null string
83
        return prefix
84
    s = prefix[:-1]
85
    c = ord(prefix[-1])
86
    if c > 0:
87
        s += unichr(c-1) + unichr(0xffff)
88
    return s
89

    
90
_propnames = {
91
    'serial'    : 0,
92
    'node'      : 1,
93
    'hash'      : 2,
94
    'size'      : 3,
95
    'type'      : 4,
96
    'source'    : 5,
97
    'mtime'     : 6,
98
    'muser'     : 7,
99
    'uuid'      : 8,
100
    'checksum'  : 9,
101
    'cluster'   : 10
102
}
103

    
104

    
105
class Node(DBWorker):
106
    """Nodes store path organization and have multiple versions.
107
       Versions store object history and have multiple attributes.
108
       Attributes store metadata.
109
    """
110
    
111
    # TODO: Provide an interface for included and excluded clusters.
112
    
113
    def __init__(self, **params):
114
        DBWorker.__init__(self, **params)
115
        metadata = MetaData()
116
        
117
        #create nodes table
118
        columns=[]
119
        columns.append(Column('node', Integer, primary_key=True))
120
        columns.append(Column('parent', Integer,
121
                              ForeignKey('nodes.node',
122
                                         ondelete='CASCADE',
123
                                         onupdate='CASCADE'),
124
                              autoincrement=False))
125
        path_length = 2048
126
        columns.append(Column('path', String(path_length), default='', nullable=False))
127
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
128
        Index('idx_nodes_path', self.nodes.c.path, unique=True)
129
        
130
        #create policy table
131
        columns=[]
132
        columns.append(Column('node', Integer,
133
                              ForeignKey('nodes.node',
134
                                         ondelete='CASCADE',
135
                                         onupdate='CASCADE'),
136
                              primary_key=True))
137
        columns.append(Column('key', String(255), primary_key=True))
138
        columns.append(Column('value', String(255)))
139
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
140
        
141
        #create statistics table
142
        columns=[]
143
        columns.append(Column('node', Integer,
144
                              ForeignKey('nodes.node',
145
                                         ondelete='CASCADE',
146
                                         onupdate='CASCADE'),
147
                              primary_key=True))
148
        columns.append(Column('population', Integer, nullable=False, default=0))
149
        columns.append(Column('size', BigInteger, nullable=False, default=0))
150
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
151
        columns.append(Column('cluster', Integer, nullable=False, default=0,
152
                              primary_key=True, autoincrement=False))
153
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
154
        
155
        #create versions table
156
        columns=[]
157
        columns.append(Column('serial', Integer, primary_key=True))
158
        columns.append(Column('node', Integer,
159
                              ForeignKey('nodes.node',
160
                                         ondelete='CASCADE',
161
                                         onupdate='CASCADE')))
162
        columns.append(Column('hash', String(255)))
163
        columns.append(Column('size', BigInteger, nullable=False, default=0))
164
        columns.append(Column('type', String(255), nullable=False, default=''))
165
        columns.append(Column('source', Integer))
166
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
167
        columns.append(Column('muser', String(255), nullable=False, default=''))
168
        columns.append(Column('uuid', String(64), nullable=False, default=''))
169
        columns.append(Column('checksum', String(255), nullable=False, default=''))
170
        columns.append(Column('cluster', Integer, nullable=False, default=0))
171
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
172
        Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
173
        Index('idx_versions_node_uuid', self.versions.c.uuid)
174
        
175
        #create attributes table
176
        columns = []
177
        columns.append(Column('serial', Integer,
178
                              ForeignKey('versions.serial',
179
                                         ondelete='CASCADE',
180
                                         onupdate='CASCADE'),
181
                              primary_key=True))
182
        columns.append(Column('domain', String(255), primary_key=True))
183
        columns.append(Column('key', String(255), primary_key=True))
184
        columns.append(Column('value', String(255)))
185
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
186
        
187
        metadata.create_all(self.engine)
188
        
189
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
190
                                           self.nodes.c.parent == ROOTNODE))
191
        rp = self.conn.execute(s)
192
        r = rp.fetchone()
193
        rp.close()
194
        if not r:
195
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
196
            self.conn.execute(s)
197
    
198
    def node_create(self, parent, path):
199
        """Create a new node from the given properties.
200
           Return the node identifier of the new node.
201
        """
202
        #TODO catch IntegrityError?
203
        s = self.nodes.insert().values(parent=parent, path=path)
204
        r = self.conn.execute(s)
205
        inserted_primary_key = r.inserted_primary_key[0]
206
        r.close()
207
        return inserted_primary_key
208
    
209
    def node_lookup(self, path):
210
        """Lookup the current node of the given path.
211
           Return None if the path is not found.
212
        """
213
        
214
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
215
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
216
        r = self.conn.execute(s)
217
        row = r.fetchone()
218
        r.close()
219
        if row:
220
            return row[0]
221
        return None
222
    
223
    def node_get_properties(self, node):
224
        """Return the node's (parent, path).
225
           Return None if the node is not found.
226
        """
227
        
228
        s = select([self.nodes.c.parent, self.nodes.c.path])
229
        s = s.where(self.nodes.c.node == node)
230
        r = self.conn.execute(s)
231
        l = r.fetchone()
232
        r.close()
233
        return l
234
    
235
    def node_get_versions(self, node, keys=(), propnames=_propnames):
236
        """Return the properties of all versions at node.
237
           If keys is empty, return all properties in the order
238
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
239
        """
240
        
241
        s = select([self.versions.c.serial,
242
                    self.versions.c.node,
243
                    self.versions.c.hash,
244
                    self.versions.c.size,
245
                    self.versions.c.type,
246
                    self.versions.c.source,
247
                    self.versions.c.mtime,
248
                    self.versions.c.muser,
249
                    self.versions.c.uuid,
250
                    self.versions.c.checksum,
251
                    self.versions.c.cluster], self.versions.c.node == node)
252
        s = s.order_by(self.versions.c.serial)
253
        r = self.conn.execute(s)
254
        rows = r.fetchall()
255
        r.close()
256
        if not rows:
257
            return rows
258
        
259
        if not keys:
260
            return rows
261
        
262
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
263
    
264
    def node_count_children(self, node):
265
        """Return node's child count."""
266
        
267
        s = select([func.count(self.nodes.c.node)])
268
        s = s.where(and_(self.nodes.c.parent == node,
269
                         self.nodes.c.node != ROOTNODE))
270
        r = self.conn.execute(s)
271
        row = r.fetchone()
272
        r.close()
273
        return row[0]
274
    
275
    def node_purge_children(self, parent, before=inf, cluster=0):
276
        """Delete all versions with the specified
277
           parent and cluster, and return
278
           the hashes and size of versions deleted.
279
           Clears out nodes with no remaining versions.
280
        """
281
        #update statistics
282
        c1 = select([self.nodes.c.node],
283
            self.nodes.c.parent == parent)
284
        where_clause = and_(self.versions.c.node.in_(c1),
285
                            self.versions.c.cluster == cluster)
286
        s = select([func.count(self.versions.c.serial),
287
                    func.sum(self.versions.c.size)])
288
        s = s.where(where_clause)
289
        if before != inf:
290
            s = s.where(self.versions.c.mtime <= before)
291
        r = self.conn.execute(s)
292
        row = r.fetchone()
293
        r.close()
294
        if not row:
295
            return (), 0
296
        nr, size = row[0], -row[1] if row[1] else 0
297
        mtime = time()
298
        self.statistics_update(parent, -nr, size, mtime, cluster)
299
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
300
        
301
        s = select([self.versions.c.hash])
302
        s = s.where(where_clause)
303
        r = self.conn.execute(s)
304
        hashes = [row[0] for row in r.fetchall()]
305
        r.close()
306
        
307
        #delete versions
308
        s = self.versions.delete().where(where_clause)
309
        r = self.conn.execute(s)
310
        r.close()
311
        
312
        #delete nodes
313
        s = select([self.nodes.c.node],
314
            and_(self.nodes.c.parent == parent,
315
                 select([func.count(self.versions.c.serial)],
316
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
317
        rp = self.conn.execute(s)
318
        nodes = [r[0] for r in rp.fetchall()]
319
        rp.close()
320
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
321
        self.conn.execute(s).close()
322
        
323
        return hashes, size
324
    
325
    def node_purge(self, node, before=inf, cluster=0):
326
        """Delete all versions with the specified
327
           node and cluster, and return
328
           the hashes and size of versions deleted.
329
           Clears out the node if it has no remaining versions.
330
        """
331
        
332
        #update statistics
333
        s = select([func.count(self.versions.c.serial),
334
                    func.sum(self.versions.c.size)])
335
        where_clause = and_(self.versions.c.node == node,
336
                         self.versions.c.cluster == cluster)
337
        s = s.where(where_clause)
338
        if before != inf:
339
            s = s.where(self.versions.c.mtime <= before)
340
        r = self.conn.execute(s)
341
        row = r.fetchone()
342
        nr, size = row[0], row[1]
343
        r.close()
344
        if not nr:
345
            return (), 0
346
        mtime = time()
347
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
348
        
349
        s = select([self.versions.c.hash])
350
        s = s.where(where_clause)
351
        r = self.conn.execute(s)
352
        hashes = [r[0] for r in r.fetchall()]
353
        r.close()
354
        
355
        #delete versions
356
        s = self.versions.delete().where(where_clause)
357
        r = self.conn.execute(s)
358
        r.close()
359
        
360
        #delete nodes
361
        s = select([self.nodes.c.node],
362
            and_(self.nodes.c.node == node,
363
                 select([func.count(self.versions.c.serial)],
364
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
365
        r = self.conn.execute(s)
366
        nodes = r.fetchall()
367
        r.close()
368
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
369
        self.conn.execute(s).close()
370
        
371
        return hashes, size
372
    
373
    def node_remove(self, node):
374
        """Remove the node specified.
375
           Return false if the node has children or is not found.
376
        """
377
        
378
        if self.node_count_children(node):
379
            return False
380
        
381
        mtime = time()
382
        s = select([func.count(self.versions.c.serial),
383
                    func.sum(self.versions.c.size),
384
                    self.versions.c.cluster])
385
        s = s.where(self.versions.c.node == node)
386
        s = s.group_by(self.versions.c.cluster)
387
        r = self.conn.execute(s)
388
        for population, size, cluster in r.fetchall():
389
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
390
        r.close()
391
        
392
        s = self.nodes.delete().where(self.nodes.c.node == node)
393
        self.conn.execute(s).close()
394
        return True
395
    
396
    def policy_get(self, node):
397
        s = select([self.policies.c.key, self.policies.c.value],
398
            self.policies.c.node==node)
399
        r = self.conn.execute(s)
400
        d = dict(r.fetchall())
401
        r.close()
402
        return d
403
    
404
    def policy_set(self, node, policy):
405
        #insert or replace
406
        for k, v in policy.iteritems():
407
            s = self.policies.update().where(and_(self.policies.c.node == node,
408
                                                  self.policies.c.key == k))
409
            s = s.values(value = v)
410
            rp = self.conn.execute(s)
411
            rp.close()
412
            if rp.rowcount == 0:
413
                s = self.policies.insert()
414
                values = {'node':node, 'key':k, 'value':v}
415
                r = self.conn.execute(s, values)
416
                r.close()
417
    
418
    def statistics_get(self, node, cluster=0):
419
        """Return population, total size and last mtime
420
           for all versions under node that belong to the cluster.
421
        """
422
        
423
        s = select([self.statistics.c.population,
424
                    self.statistics.c.size,
425
                    self.statistics.c.mtime])
426
        s = s.where(and_(self.statistics.c.node == node,
427
                         self.statistics.c.cluster == cluster))
428
        r = self.conn.execute(s)
429
        row = r.fetchone()
430
        r.close()
431
        return row
432
    
433
    def statistics_update(self, node, population, size, mtime, cluster=0):
434
        """Update the statistics of the given node.
435
           Statistics keep track the population, total
436
           size of objects and mtime in the node's namespace.
437
           May be zero or positive or negative numbers.
438
        """
439
        s = select([self.statistics.c.population, self.statistics.c.size],
440
            and_(self.statistics.c.node == node,
441
                 self.statistics.c.cluster == cluster))
442
        rp = self.conn.execute(s)
443
        r = rp.fetchone()
444
        rp.close()
445
        if not r:
446
            prepopulation, presize = (0, 0)
447
        else:
448
            prepopulation, presize = r
449
        population += prepopulation
450
        size += presize
451
        
452
        #insert or replace
453
        #TODO better upsert
454
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
455
                                           self.statistics.c.cluster==cluster))
456
        u = u.values(population=population, size=size, mtime=mtime)
457
        rp = self.conn.execute(u)
458
        rp.close()
459
        if rp.rowcount == 0:
460
            ins = self.statistics.insert()
461
            ins = ins.values(node=node, population=population, size=size,
462
                             mtime=mtime, cluster=cluster)
463
            self.conn.execute(ins).close()
464
    
465
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
466
        """Update the statistics of the given node's parent.
467
           Then recursively update all parents up to the root.
468
           Population is not recursive.
469
        """
470
        
471
        while True:
472
            if node == ROOTNODE:
473
                break
474
            props = self.node_get_properties(node)
475
            if props is None:
476
                break
477
            parent, path = props
478
            self.statistics_update(parent, population, size, mtime, cluster)
479
            node = parent
480
            population = 0 # Population isn't recursive
481
    
482
    def statistics_latest(self, node, before=inf, except_cluster=0):
483
        """Return population, total size and last mtime
484
           for all latest versions under node that
485
           do not belong to the cluster.
486
        """
487
        
488
        # The node.
489
        props = self.node_get_properties(node)
490
        if props is None:
491
            return None
492
        parent, path = props
493
        
494
        # The latest version.
495
        s = select([self.versions.c.serial,
496
                    self.versions.c.node,
497
                    self.versions.c.hash,
498
                    self.versions.c.size,
499
                    self.versions.c.type,
500
                    self.versions.c.source,
501
                    self.versions.c.mtime,
502
                    self.versions.c.muser,
503
                    self.versions.c.uuid,
504
                    self.versions.c.checksum,
505
                    self.versions.c.cluster])
506
        filtered = select([func.max(self.versions.c.serial)],
507
                            self.versions.c.node == node)
508
        if before != inf:
509
            filtered = filtered.where(self.versions.c.mtime < before)
510
        s = s.where(and_(self.versions.c.cluster != except_cluster,
511
                         self.versions.c.serial == filtered))
512
        r = self.conn.execute(s)
513
        props = r.fetchone()
514
        r.close()
515
        if not props:
516
            return None
517
        mtime = props[MTIME]
518
        
519
        # First level, just under node (get population).
520
        v = self.versions.alias('v')
521
        s = select([func.count(v.c.serial),
522
                    func.sum(v.c.size),
523
                    func.max(v.c.mtime)])
524
        c1 = select([func.max(self.versions.c.serial)])
525
        if before != inf:
526
            c1 = c1.where(self.versions.c.mtime < before)
527
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
528
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
529
                         v.c.cluster != except_cluster,
530
                         v.c.node.in_(c2)))
531
        rp = self.conn.execute(s)
532
        r = rp.fetchone()
533
        rp.close()
534
        if not r:
535
            return None
536
        count = r[0]
537
        mtime = max(mtime, r[2])
538
        if count == 0:
539
            return (0, 0, mtime)
540
        
541
        # All children (get size and mtime).
542
        # This is why the full path is stored.
543
        s = select([func.count(v.c.serial),
544
                    func.sum(v.c.size),
545
                    func.max(v.c.mtime)])
546
        c1 = select([func.max(self.versions.c.serial)],
547
            self.versions.c.node == v.c.node)
548
        if before != inf:
549
            c1 = c1.where(self.versions.c.mtime < before)
550
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
551
        s = s.where(and_(v.c.serial == c1,
552
                         v.c.cluster != except_cluster,
553
                         v.c.node.in_(c2)))
554
        rp = self.conn.execute(s)
555
        r = rp.fetchone()
556
        rp.close()
557
        if not r:
558
            return None
559
        size = r[1] - props[SIZE]
560
        mtime = max(mtime, r[2])
561
        return (count, size, mtime)
562
    
563
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
564
        """Create a new version from the given properties.
565
           Return the (serial, mtime) of the new version.
566
        """
567
        
568
        mtime = time()
569
        s = self.versions.insert().values(node=node, hash=hash, size=size, type=type, source=source,
570
                                          mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
571
        serial = self.conn.execute(s).inserted_primary_key[0]
572
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
573
        return serial, mtime
574
    
575
    def version_lookup(self, node, before=inf, cluster=0):
576
        """Lookup the current version of the given node.
577
           Return a list with its properties:
578
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
579
           or None if the current version is not found in the given cluster.
580
        """
581
        
582
        v = self.versions.alias('v')
583
        s = select([v.c.serial, v.c.node, v.c.hash,
584
                    v.c.size, v.c.type, v.c.source,
585
                    v.c.mtime, v.c.muser, v.c.uuid,
586
                    v.c.checksum, v.c.cluster])
587
        c = select([func.max(self.versions.c.serial)],
588
            self.versions.c.node == node)
589
        if before != inf:
590
            c = c.where(self.versions.c.mtime < before)
591
        s = s.where(and_(v.c.serial == c,
592
                         v.c.cluster == cluster))
593
        r = self.conn.execute(s)
594
        props = r.fetchone()
595
        r.close()
596
        if props:
597
            return props
598
        return None
599
    
600
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
601
        """Return a sequence of values for the properties of
602
           the version specified by serial and the keys, in the order given.
603
           If keys is empty, return all properties in the order
604
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
605
        """
606
        
607
        v = self.versions.alias()
608
        s = select([v.c.serial, v.c.node, v.c.hash,
609
                    v.c.size, v.c.type, v.c.source,
610
                    v.c.mtime, v.c.muser, v.c.uuid,
611
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
612
        rp = self.conn.execute(s)
613
        r = rp.fetchone()
614
        rp.close()
615
        if r is None:
616
            return r
617
        
618
        if not keys:
619
            return r
620
        return [r[propnames[k]] for k in keys if k in propnames]
621
    
622
    def version_put_property(self, serial, key, value):
623
        """Set value for the property of version specified by key."""
624
        
625
        if key not in _propnames:
626
            return
627
        s = self.versions.update()
628
        s = s.where(self.versions.c.serial == serial)
629
        s = s.values(**{key: value})
630
        self.conn.execute(s).close()
631
    
632
    def version_recluster(self, serial, cluster):
633
        """Move the version into another cluster."""
634
        
635
        props = self.version_get_properties(serial)
636
        if not props:
637
            return
638
        node = props[NODE]
639
        size = props[SIZE]
640
        oldcluster = props[CLUSTER]
641
        if cluster == oldcluster:
642
            return
643
        
644
        mtime = time()
645
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
646
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
647
        
648
        s = self.versions.update()
649
        s = s.where(self.versions.c.serial == serial)
650
        s = s.values(cluster = cluster)
651
        self.conn.execute(s).close()
652
    
653
    def version_remove(self, serial):
654
        """Remove the serial specified."""
655
        
656
        props = self.version_get_properties(serial)
657
        if not props:
658
            return
659
        node = props[NODE]
660
        hash = props[HASH]
661
        size = props[SIZE]
662
        cluster = props[CLUSTER]
663
        
664
        mtime = time()
665
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
666
        
667
        s = self.versions.delete().where(self.versions.c.serial == serial)
668
        self.conn.execute(s).close()
669
        return hash, size
670
    
671
    def attribute_get(self, serial, domain, keys=()):
672
        """Return a list of (key, value) pairs of the version specified by serial.
673
           If keys is empty, return all attributes.
674
           Othwerise, return only those specified.
675
        """
676
        
677
        if keys:
678
            attrs = self.attributes.alias()
679
            s = select([attrs.c.key, attrs.c.value])
680
            s = s.where(and_(attrs.c.key.in_(keys),
681
                             attrs.c.serial == serial,
682
                             attrs.c.domain == domain))
683
        else:
684
            attrs = self.attributes.alias()
685
            s = select([attrs.c.key, attrs.c.value])
686
            s = s.where(and_(attrs.c.serial == serial,
687
                             attrs.c.domain == domain))
688
        r = self.conn.execute(s)
689
        l = r.fetchall()
690
        r.close()
691
        return l
692
    
693
    def attribute_set(self, serial, domain, items):
694
        """Set the attributes of the version specified by serial.
695
           Receive attributes as an iterable of (key, value) pairs.
696
        """
697
        #insert or replace
698
        #TODO better upsert
699
        for k, v in items:
700
            s = self.attributes.update()
701
            s = s.where(and_(self.attributes.c.serial == serial,
702
                             self.attributes.c.domain == domain,
703
                             self.attributes.c.key == k))
704
            s = s.values(value = v)
705
            rp = self.conn.execute(s)
706
            rp.close()
707
            if rp.rowcount == 0:
708
                s = self.attributes.insert()
709
                s = s.values(serial=serial, domain=domain, key=k, value=v)
710
                self.conn.execute(s).close()
711
    
712
    def attribute_del(self, serial, domain, keys=()):
713
        """Delete attributes of the version specified by serial.
714
           If keys is empty, delete all attributes.
715
           Otherwise delete those specified.
716
        """
717
        
718
        if keys:
719
            #TODO more efficient way to do this?
720
            for key in keys:
721
                s = self.attributes.delete()
722
                s = s.where(and_(self.attributes.c.serial == serial,
723
                                 self.attributes.c.domain == domain,
724
                                 self.attributes.c.key == key))
725
                self.conn.execute(s).close()
726
        else:
727
            s = self.attributes.delete()
728
            s = s.where(and_(self.attributes.c.serial == serial,
729
                             self.attributes.c.domain == domain))
730
            self.conn.execute(s).close()
731
    
732
    def attribute_copy(self, source, dest):
733
        s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
734
            self.attributes.c.serial == source)
735
        rp = self.conn.execute(s)
736
        attributes = rp.fetchall()
737
        rp.close()
738
        for dest, domain, k, v in attributes:
739
            #insert or replace
740
            s = self.attributes.update().where(and_(
741
                self.attributes.c.serial == dest,
742
                self.attributes.c.domain == domain,
743
                self.attributes.c.key == k))
744
            rp = self.conn.execute(s, value=v)
745
            rp.close()
746
            if rp.rowcount == 0:
747
                s = self.attributes.insert()
748
                values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
749
                self.conn.execute(s, values).close()
750
    
751
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
752
        """Return a list with all keys pairs defined
753
           for all latest versions under parent that
754
           do not belong to the cluster.
755
        """
756
        
757
        # TODO: Use another table to store before=inf results.
758
        a = self.attributes.alias('a')
759
        v = self.versions.alias('v')
760
        n = self.nodes.alias('n')
761
        s = select([a.c.key]).distinct()
762
        filtered = select([func.max(self.versions.c.serial)])
763
        if before != inf:
764
            filtered = filtered.where(self.versions.c.mtime < before)
765
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
766
        s = s.where(v.c.cluster != except_cluster)
767
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
768
            self.nodes.c.parent == parent)))
769
        s = s.where(a.c.serial == v.c.serial)
770
        s = s.where(a.c.domain == domain)
771
        s = s.where(n.c.node == v.c.node)
772
        conj = []
773
        for path, match in pathq:
774
            if match == MATCH_PREFIX:
775
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
776
            elif match == MATCH_EXACT:
777
                conj.append(n.c.path == path)
778
        if conj:
779
            s = s.where(or_(*conj))
780
        rp = self.conn.execute(s)
781
        rows = rp.fetchall()
782
        rp.close()
783
        return [r[0] for r in rows]
784
    
785
    def latest_version_list(self, parent, prefix='', delimiter=None,
786
                            start='', limit=10000, before=inf,
787
                            except_cluster=0, pathq=[], domain=None,
788
                            filterq=[], sizeq=None, all_props=False):
789
        """Return a (list of (path, serial) tuples, list of common prefixes)
790
           for the current versions of the paths with the given parent,
791
           matching the following criteria.
792
           
793
           The property tuple for a version is returned if all
794
           of these conditions are true:
795
                
796
                a. parent matches
797
                
798
                b. path > start
799
                
800
                c. path starts with prefix (and paths in pathq)
801
                
802
                d. version is the max up to before
803
                
804
                e. version is not in cluster
805
                
806
                f. the path does not have the delimiter occuring
807
                   after the prefix, or ends with the delimiter
808
                
809
                g. serial matches the attribute filter query.
810
                   
811
                   A filter query is a comma-separated list of
812
                   terms in one of these three forms:
813
                   
814
                   key
815
                       an attribute with this key must exist
816
                   
817
                   !key
818
                       an attribute with this key must not exist
819
                   
820
                   key ?op value
821
                       the attribute with this key satisfies the value
822
                       where ?op is one of ==, != <=, >=, <, >.
823
                
824
                h. the size is in the range set by sizeq
825
           
826
           The list of common prefixes includes the prefixes
827
           matching up to the first delimiter after prefix,
828
           and are reported only once, as "virtual directories".
829
           The delimiter is included in the prefixes.
830
           
831
           If arguments are None, then the corresponding matching rule
832
           will always match.
833
           
834
           Limit applies to the first list of tuples returned.
835
           
836
           If all_props is True, return all properties after path, not just serial.
837
        """
838
        
839
        if not start or start < prefix:
840
            start = strprevling(prefix)
841
        nextling = strnextling(prefix)
842
        
843
        v = self.versions.alias('v')
844
        n = self.nodes.alias('n')
845
        if not all_props:
846
            s = select([n.c.path, v.c.serial]).distinct()
847
        else:
848
            s = select([n.c.path,
849
                        v.c.serial, v.c.node, v.c.hash,
850
                        v.c.size, v.c.type, v.c.source,
851
                        v.c.mtime, v.c.muser, v.c.uuid,
852
                        v.c.checksum, v.c.cluster]).distinct()
853
        filtered = select([func.max(self.versions.c.serial)])
854
        if before != inf:
855
            filtered = filtered.where(self.versions.c.mtime < before)
856
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
857
        s = s.where(v.c.cluster != except_cluster)
858
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
859
            self.nodes.c.parent == parent)))
860
        
861
        s = s.where(n.c.node == v.c.node)
862
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
863
        conj = []
864
        for path, match in pathq:
865
            if match == MATCH_PREFIX:
866
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
867
            elif match == MATCH_EXACT:
868
                conj.append(n.c.path == path)
869
        if conj:
870
            s = s.where(or_(*conj))
871
        
872
        if sizeq and len(sizeq) == 2:
873
            if sizeq[0]:
874
                s = s.where(v.c.size >= sizeq[0])
875
            if sizeq[1]:
876
                s = s.where(v.c.size < sizeq[1])
877
        
878
        if domain and filterq:
879
            a = self.attributes.alias('a')
880
            included, excluded, opers = parse_filters(filterq)
881
            if included:
882
                subs = select([1])
883
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
884
                subs = subs.where(a.c.domain == domain)
885
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
886
                s = s.where(exists(subs))
887
            if excluded:
888
                subs = select([1])
889
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
890
                subs = subs.where(a.c.domain == domain)
891
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
892
                s = s.where(not_(exists(subs)))
893
            if opers:
894
                for k, o, val in opers:
895
                    subs = select([1])
896
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
897
                    subs = subs.where(a.c.domain == domain)
898
                    subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
899
                    s = s.where(exists(subs))
900
        
901
        s = s.order_by(n.c.path)
902
        
903
        if not delimiter:
904
            s = s.limit(limit)
905
            rp = self.conn.execute(s, start=start)
906
            r = rp.fetchall()
907
            rp.close()
908
            return r, ()
909
        
910
        pfz = len(prefix)
911
        dz = len(delimiter)
912
        count = 0
913
        prefixes = []
914
        pappend = prefixes.append
915
        matches = []
916
        mappend = matches.append
917
        
918
        rp = self.conn.execute(s, start=start)
919
        while True:
920
            props = rp.fetchone()
921
            if props is None:
922
                break
923
            path = props[0]
924
            serial = props[1]
925
            idx = path.find(delimiter, pfz)
926
            
927
            if idx < 0:
928
                mappend(props)
929
                count += 1
930
                if count >= limit:
931
                    break
932
                continue
933
            
934
            if idx + dz == len(path):
935
                mappend(props)
936
                count += 1
937
                continue # Get one more, in case there is a path.
938
            pf = path[:idx + dz]
939
            pappend(pf)
940
            if count >= limit: 
941
                break
942
            
943
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
944
        rp.close()
945
        
946
        return matches, prefixes
947
    
948
    def latest_uuid(self, uuid):
949
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
950
        
951
        v = self.versions.alias('v')
952
        n = self.nodes.alias('n')
953
        s = select([n.c.path, v.c.serial])
954
        filtered = select([func.max(self.versions.c.serial)])
955
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
956
        s = s.where(n.c.node == v.c.node)
957
        
958
        r = self.conn.execute(s)
959
        l = r.fetchone()
960
        r.close()
961
        return l