Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 8221c89d

History | View | Annotate | Download (38.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
from pithos.backends.filter import parse_filters
45

    
46

    
47
ROOTNODE  = 0
48

    
49
( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
50

    
51
( MATCH_PREFIX, MATCH_EXACT ) = range(2)
52

    
53
inf = float('inf')
54

    
55

    
56
def strnextling(prefix):
57
    """Return the first unicode string
58
       greater than but not starting with given prefix.
59
       strnextling('hello') -> 'hellp'
60
    """
61
    if not prefix:
62
        ## all strings start with the null string,
63
        ## therefore we have to approximate strnextling('')
64
        ## with the last unicode character supported by python
65
        ## 0x10ffff for wide (32-bit unicode) python builds
66
        ## 0x00ffff for narrow (16-bit unicode) python builds
67
        ## We will not autodetect. 0xffff is safe enough.
68
        return unichr(0xffff)
69
    s = prefix[:-1]
70
    c = ord(prefix[-1])
71
    if c >= 0xffff:
72
        raise RuntimeError
73
    s += unichr(c+1)
74
    return s
75

    
76
def strprevling(prefix):
77
    """Return an approximation of the last unicode string
78
       less than but not starting with given prefix.
79
       strprevling(u'hello') -> u'helln\\xffff'
80
    """
81
    if not prefix:
82
        ## There is no prevling for the null string
83
        return prefix
84
    s = prefix[:-1]
85
    c = ord(prefix[-1])
86
    if c > 0:
87
        s += unichr(c-1) + unichr(0xffff)
88
    return s
89

    
90
_propnames = {
91
    'serial'    : 0,
92
    'node'      : 1,
93
    'hash'      : 2,
94
    'size'      : 3,
95
    'type'      : 4,
96
    'source'    : 5,
97
    'mtime'     : 6,
98
    'muser'     : 7,
99
    'uuid'      : 8,
100
    'checksum'  : 9,
101
    'cluster'   : 10
102
}
103

    
104

    
105
class Node(DBWorker):
106
    """Nodes store path organization and have multiple versions.
107
       Versions store object history and have multiple attributes.
108
       Attributes store metadata.
109
    """
110
    
111
    # TODO: Provide an interface for included and excluded clusters.
112
    
113
    def __init__(self, **params):
114
        DBWorker.__init__(self, **params)
115
        metadata = MetaData()
116
        
117
        #create nodes table
118
        columns=[]
119
        columns.append(Column('node', Integer, primary_key=True))
120
        columns.append(Column('parent', Integer,
121
                              ForeignKey('nodes.node',
122
                                         ondelete='CASCADE',
123
                                         onupdate='CASCADE'),
124
                              autoincrement=False))
125
        columns.append(Column('path', String(2048), default='', nullable=False))
126
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
127
        Index('idx_nodes_path', self.nodes.c.path, unique=True)
128
        
129
        #create policy table
130
        columns=[]
131
        columns.append(Column('node', Integer,
132
                              ForeignKey('nodes.node',
133
                                         ondelete='CASCADE',
134
                                         onupdate='CASCADE'),
135
                              primary_key=True))
136
        columns.append(Column('key', String(128), primary_key=True))
137
        columns.append(Column('value', String(256)))
138
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
139
        
140
        #create statistics table
141
        columns=[]
142
        columns.append(Column('node', Integer,
143
                              ForeignKey('nodes.node',
144
                                         ondelete='CASCADE',
145
                                         onupdate='CASCADE'),
146
                              primary_key=True))
147
        columns.append(Column('population', Integer, nullable=False, default=0))
148
        columns.append(Column('size', BigInteger, nullable=False, default=0))
149
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
150
        columns.append(Column('cluster', Integer, nullable=False, default=0,
151
                              primary_key=True, autoincrement=False))
152
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
153
        
154
        #create versions table
155
        columns=[]
156
        columns.append(Column('serial', Integer, primary_key=True))
157
        columns.append(Column('node', Integer,
158
                              ForeignKey('nodes.node',
159
                                         ondelete='CASCADE',
160
                                         onupdate='CASCADE')))
161
        columns.append(Column('hash', String(256)))
162
        columns.append(Column('size', BigInteger, nullable=False, default=0))
163
        columns.append(Column('type', String(256), nullable=False, default=''))
164
        columns.append(Column('source', Integer))
165
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
166
        columns.append(Column('muser', String(256), nullable=False, default=''))
167
        columns.append(Column('uuid', String(64), nullable=False, default=''))
168
        columns.append(Column('checksum', String(256), nullable=False, default=''))
169
        columns.append(Column('cluster', Integer, nullable=False, default=0))
170
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
171
        Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
172
        Index('idx_versions_node_uuid', self.versions.c.uuid)
173
        
174
        #create attributes table
175
        columns = []
176
        columns.append(Column('serial', Integer,
177
                              ForeignKey('versions.serial',
178
                                         ondelete='CASCADE',
179
                                         onupdate='CASCADE'),
180
                              primary_key=True))
181
        columns.append(Column('domain', String(256), primary_key=True))
182
        columns.append(Column('key', String(128), primary_key=True))
183
        columns.append(Column('value', String(256)))
184
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
185
        
186
        metadata.create_all(self.engine)
187
        
188
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
189
                                           self.nodes.c.parent == ROOTNODE))
190
        rp = self.conn.execute(s)
191
        r = rp.fetchone()
192
        rp.close()
193
        if not r:
194
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
195
            self.conn.execute(s)
196
    
197
    def node_create(self, parent, path):
198
        """Create a new node from the given properties.
199
           Return the node identifier of the new node.
200
        """
201
        #TODO catch IntegrityError?
202
        s = self.nodes.insert().values(parent=parent, path=path)
203
        r = self.conn.execute(s)
204
        inserted_primary_key = r.inserted_primary_key[0]
205
        r.close()
206
        return inserted_primary_key
207
    
208
    def node_lookup(self, path):
209
        """Lookup the current node of the given path.
210
           Return None if the path is not found.
211
        """
212
        
213
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
214
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
215
        r = self.conn.execute(s)
216
        row = r.fetchone()
217
        r.close()
218
        if row:
219
            return row[0]
220
        return None
221
    
222
    def node_lookup_bulk(self, paths):
223
        """Lookup the current nodes for the given paths.
224
           Return () if the path is not found.
225
        """
226
        
227
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
228
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
229
        r = self.conn.execute(s)
230
        rows = r.fetchall()
231
        r.close()
232
        return [row[0] for row in rows]
233
    
234
    def node_get_properties(self, node):
235
        """Return the node's (parent, path).
236
           Return None if the node is not found.
237
        """
238
        
239
        s = select([self.nodes.c.parent, self.nodes.c.path])
240
        s = s.where(self.nodes.c.node == node)
241
        r = self.conn.execute(s)
242
        l = r.fetchone()
243
        r.close()
244
        return l
245
    
246
    def node_get_versions(self, node, keys=(), propnames=_propnames):
247
        """Return the properties of all versions at node.
248
           If keys is empty, return all properties in the order
249
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
250
        """
251
        
252
        s = select([self.versions.c.serial,
253
                    self.versions.c.node,
254
                    self.versions.c.hash,
255
                    self.versions.c.size,
256
                    self.versions.c.type,
257
                    self.versions.c.source,
258
                    self.versions.c.mtime,
259
                    self.versions.c.muser,
260
                    self.versions.c.uuid,
261
                    self.versions.c.checksum,
262
                    self.versions.c.cluster], self.versions.c.node == node)
263
        s = s.order_by(self.versions.c.serial)
264
        r = self.conn.execute(s)
265
        rows = r.fetchall()
266
        r.close()
267
        if not rows:
268
            return rows
269
        
270
        if not keys:
271
            return rows
272
        
273
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
274
    
275
    def node_count_children(self, node):
276
        """Return node's child count."""
277
        
278
        s = select([func.count(self.nodes.c.node)])
279
        s = s.where(and_(self.nodes.c.parent == node,
280
                         self.nodes.c.node != ROOTNODE))
281
        r = self.conn.execute(s)
282
        row = r.fetchone()
283
        r.close()
284
        return row[0]
285
    
286
    def node_purge_children(self, parent, before=inf, cluster=0):
287
        """Delete all versions with the specified
288
           parent and cluster, and return
289
           the hashes and size of versions deleted.
290
           Clears out nodes with no remaining versions.
291
        """
292
        #update statistics
293
        c1 = select([self.nodes.c.node],
294
            self.nodes.c.parent == parent)
295
        where_clause = and_(self.versions.c.node.in_(c1),
296
                            self.versions.c.cluster == cluster)
297
        s = select([func.count(self.versions.c.serial),
298
                    func.sum(self.versions.c.size)])
299
        s = s.where(where_clause)
300
        if before != inf:
301
            s = s.where(self.versions.c.mtime <= before)
302
        r = self.conn.execute(s)
303
        row = r.fetchone()
304
        r.close()
305
        if not row:
306
            return (), 0
307
        nr, size = row[0], -row[1] if row[1] else 0
308
        mtime = time()
309
        self.statistics_update(parent, -nr, size, mtime, cluster)
310
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
311
        
312
        s = select([self.versions.c.hash])
313
        s = s.where(where_clause)
314
        r = self.conn.execute(s)
315
        hashes = [row[0] for row in r.fetchall()]
316
        r.close()
317
        
318
        #delete versions
319
        s = self.versions.delete().where(where_clause)
320
        r = self.conn.execute(s)
321
        r.close()
322
        
323
        #delete nodes
324
        s = select([self.nodes.c.node],
325
            and_(self.nodes.c.parent == parent,
326
                 select([func.count(self.versions.c.serial)],
327
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
328
        rp = self.conn.execute(s)
329
        nodes = [r[0] for r in rp.fetchall()]
330
        rp.close()
331
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
332
        self.conn.execute(s).close()
333
        
334
        return hashes, size
335
    
336
    def node_purge(self, node, before=inf, cluster=0):
337
        """Delete all versions with the specified
338
           node and cluster, and return
339
           the hashes and size of versions deleted.
340
           Clears out the node if it has no remaining versions.
341
        """
342
        
343
        #update statistics
344
        s = select([func.count(self.versions.c.serial),
345
                    func.sum(self.versions.c.size)])
346
        where_clause = and_(self.versions.c.node == node,
347
                         self.versions.c.cluster == cluster)
348
        s = s.where(where_clause)
349
        if before != inf:
350
            s = s.where(self.versions.c.mtime <= before)
351
        r = self.conn.execute(s)
352
        row = r.fetchone()
353
        nr, size = row[0], row[1]
354
        r.close()
355
        if not nr:
356
            return (), 0
357
        mtime = time()
358
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
359
        
360
        s = select([self.versions.c.hash])
361
        s = s.where(where_clause)
362
        r = self.conn.execute(s)
363
        hashes = [r[0] for r in r.fetchall()]
364
        r.close()
365
        
366
        #delete versions
367
        s = self.versions.delete().where(where_clause)
368
        r = self.conn.execute(s)
369
        r.close()
370
        
371
        #delete nodes
372
        s = select([self.nodes.c.node],
373
            and_(self.nodes.c.node == node,
374
                 select([func.count(self.versions.c.serial)],
375
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
376
        r = self.conn.execute(s)
377
        nodes = r.fetchall()
378
        r.close()
379
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
380
        self.conn.execute(s).close()
381
        
382
        return hashes, size
383
    
384
    def node_remove(self, node):
385
        """Remove the node specified.
386
           Return false if the node has children or is not found.
387
        """
388
        
389
        if self.node_count_children(node):
390
            return False
391
        
392
        mtime = time()
393
        s = select([func.count(self.versions.c.serial),
394
                    func.sum(self.versions.c.size),
395
                    self.versions.c.cluster])
396
        s = s.where(self.versions.c.node == node)
397
        s = s.group_by(self.versions.c.cluster)
398
        r = self.conn.execute(s)
399
        for population, size, cluster in r.fetchall():
400
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
401
        r.close()
402
        
403
        s = self.nodes.delete().where(self.nodes.c.node == node)
404
        self.conn.execute(s).close()
405
        return True
406
    
407
    def policy_get(self, node):
408
        s = select([self.policies.c.key, self.policies.c.value],
409
            self.policies.c.node==node)
410
        r = self.conn.execute(s)
411
        d = dict(r.fetchall())
412
        r.close()
413
        return d
414
    
415
    def policy_set(self, node, policy):
416
        #insert or replace
417
        for k, v in policy.iteritems():
418
            s = self.policies.update().where(and_(self.policies.c.node == node,
419
                                                  self.policies.c.key == k))
420
            s = s.values(value = v)
421
            rp = self.conn.execute(s)
422
            rp.close()
423
            if rp.rowcount == 0:
424
                s = self.policies.insert()
425
                values = {'node':node, 'key':k, 'value':v}
426
                r = self.conn.execute(s, values)
427
                r.close()
428
    
429
    def statistics_get(self, node, cluster=0):
430
        """Return population, total size and last mtime
431
           for all versions under node that belong to the cluster.
432
        """
433
        
434
        s = select([self.statistics.c.population,
435
                    self.statistics.c.size,
436
                    self.statistics.c.mtime])
437
        s = s.where(and_(self.statistics.c.node == node,
438
                         self.statistics.c.cluster == cluster))
439
        r = self.conn.execute(s)
440
        row = r.fetchone()
441
        r.close()
442
        return row
443
    
444
    def statistics_update(self, node, population, size, mtime, cluster=0):
445
        """Update the statistics of the given node.
446
           Statistics keep track the population, total
447
           size of objects and mtime in the node's namespace.
448
           May be zero or positive or negative numbers.
449
        """
450
        s = select([self.statistics.c.population, self.statistics.c.size],
451
            and_(self.statistics.c.node == node,
452
                 self.statistics.c.cluster == cluster))
453
        rp = self.conn.execute(s)
454
        r = rp.fetchone()
455
        rp.close()
456
        if not r:
457
            prepopulation, presize = (0, 0)
458
        else:
459
            prepopulation, presize = r
460
        population += prepopulation
461
        size += presize
462
        
463
        #insert or replace
464
        #TODO better upsert
465
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
466
                                           self.statistics.c.cluster==cluster))
467
        u = u.values(population=population, size=size, mtime=mtime)
468
        rp = self.conn.execute(u)
469
        rp.close()
470
        if rp.rowcount == 0:
471
            ins = self.statistics.insert()
472
            ins = ins.values(node=node, population=population, size=size,
473
                             mtime=mtime, cluster=cluster)
474
            self.conn.execute(ins).close()
475
    
476
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
477
        """Update the statistics of the given node's parent.
478
           Then recursively update all parents up to the root.
479
           Population is not recursive.
480
        """
481
        
482
        while True:
483
            if node == ROOTNODE:
484
                break
485
            props = self.node_get_properties(node)
486
            if props is None:
487
                break
488
            parent, path = props
489
            self.statistics_update(parent, population, size, mtime, cluster)
490
            node = parent
491
            population = 0 # Population isn't recursive
492
    
493
    def statistics_latest(self, node, before=inf, except_cluster=0):
494
        """Return population, total size and last mtime
495
           for all latest versions under node that
496
           do not belong to the cluster.
497
        """
498
        
499
        # The node.
500
        props = self.node_get_properties(node)
501
        if props is None:
502
            return None
503
        parent, path = props
504
        
505
        # The latest version.
506
        s = select([self.versions.c.serial,
507
                    self.versions.c.node,
508
                    self.versions.c.hash,
509
                    self.versions.c.size,
510
                    self.versions.c.type,
511
                    self.versions.c.source,
512
                    self.versions.c.mtime,
513
                    self.versions.c.muser,
514
                    self.versions.c.uuid,
515
                    self.versions.c.checksum,
516
                    self.versions.c.cluster])
517
        filtered = select([func.max(self.versions.c.serial)],
518
                            self.versions.c.node == node)
519
        if before != inf:
520
            filtered = filtered.where(self.versions.c.mtime < before)
521
        s = s.where(and_(self.versions.c.cluster != except_cluster,
522
                         self.versions.c.serial == filtered))
523
        r = self.conn.execute(s)
524
        props = r.fetchone()
525
        r.close()
526
        if not props:
527
            return None
528
        mtime = props[MTIME]
529
        
530
        # First level, just under node (get population).
531
        v = self.versions.alias('v')
532
        s = select([func.count(v.c.serial),
533
                    func.sum(v.c.size),
534
                    func.max(v.c.mtime)])
535
        c1 = select([func.max(self.versions.c.serial)])
536
        if before != inf:
537
            c1 = c1.where(self.versions.c.mtime < before)
538
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
539
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
540
                         v.c.cluster != except_cluster,
541
                         v.c.node.in_(c2)))
542
        rp = self.conn.execute(s)
543
        r = rp.fetchone()
544
        rp.close()
545
        if not r:
546
            return None
547
        count = r[0]
548
        mtime = max(mtime, r[2])
549
        if count == 0:
550
            return (0, 0, mtime)
551
        
552
        # All children (get size and mtime).
553
        # This is why the full path is stored.
554
        s = select([func.count(v.c.serial),
555
                    func.sum(v.c.size),
556
                    func.max(v.c.mtime)])
557
        c1 = select([func.max(self.versions.c.serial)],
558
            self.versions.c.node == v.c.node)
559
        if before != inf:
560
            c1 = c1.where(self.versions.c.mtime < before)
561
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
562
        s = s.where(and_(v.c.serial == c1,
563
                         v.c.cluster != except_cluster,
564
                         v.c.node.in_(c2)))
565
        rp = self.conn.execute(s)
566
        r = rp.fetchone()
567
        rp.close()
568
        if not r:
569
            return None
570
        size = r[1] - props[SIZE]
571
        mtime = max(mtime, r[2])
572
        return (count, size, mtime)
573
    
574
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
575
        """Create a new version from the given properties.
576
           Return the (serial, mtime) of the new version.
577
        """
578
        
579
        mtime = time()
580
        s = self.versions.insert().values(node=node, hash=hash, size=size, type=type, source=source,
581
                                          mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
582
        serial = self.conn.execute(s).inserted_primary_key[0]
583
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
584
        return serial, mtime
585
    
586
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
587
        """Lookup the current version of the given node.
588
           Return a list with its properties:
589
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
590
           or None if the current version is not found in the given cluster.
591
        """
592
        
593
        v = self.versions.alias('v')
594
        if not all_props:
595
            s = select([v.c.serial])
596
        else:
597
            s = select([v.c.serial, v.c.node, v.c.hash,
598
                        v.c.size, v.c.type, v.c.source,
599
                        v.c.mtime, v.c.muser, v.c.uuid,
600
                        v.c.checksum, v.c.cluster])
601
        c = select([func.max(self.versions.c.serial)],
602
            self.versions.c.node == node)
603
        if before != inf:
604
            c = c.where(self.versions.c.mtime < before)
605
        s = s.where(and_(v.c.serial == c,
606
                         v.c.cluster == cluster))
607
        r = self.conn.execute(s)
608
        props = r.fetchone()
609
        r.close()
610
        if props:
611
            return props
612
        return None
613
    
614
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
615
        """Lookup the current versions of the given nodes.
616
           Return a list with their properties:
617
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
618
        """
619
        
620
        v = self.versions.alias('v')
621
        if not all_props:
622
            s = select([v.c.serial])
623
        else:
624
            s = select([v.c.serial, v.c.node, v.c.hash,
625
                        v.c.size, v.c.type, v.c.source,
626
                        v.c.mtime, v.c.muser, v.c.uuid,
627
                        v.c.checksum, v.c.cluster])
628
        c = select([func.max(self.versions.c.serial)],
629
            self.versions.c.node.in_(nodes)).group_by(self.versions.c.node)
630
        if before != inf:
631
            c = c.where(self.versions.c.mtime < before)
632
        s = s.where(and_(v.c.serial.in_(c),
633
                         v.c.cluster == cluster))
634
        r = self.conn.execute(s)
635
        rproxy = r.fetchall()
636
        r.close()
637
        return (tuple(row.values()) for row in rproxy)
638
        
639
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
640
        """Return a sequence of values for the properties of
641
           the version specified by serial and the keys, in the order given.
642
           If keys is empty, return all properties in the order
643
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
644
        """
645
        
646
        v = self.versions.alias()
647
        s = select([v.c.serial, v.c.node, v.c.hash,
648
                    v.c.size, v.c.type, v.c.source,
649
                    v.c.mtime, v.c.muser, v.c.uuid,
650
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
651
        rp = self.conn.execute(s)
652
        r = rp.fetchone()
653
        rp.close()
654
        if r is None:
655
            return r
656
        
657
        if not keys:
658
            return r
659
        return [r[propnames[k]] for k in keys if k in propnames]
660
    
661
    def version_put_property(self, serial, key, value):
662
        """Set value for the property of version specified by key."""
663
        
664
        if key not in _propnames:
665
            return
666
        s = self.versions.update()
667
        s = s.where(self.versions.c.serial == serial)
668
        s = s.values(**{key: value})
669
        self.conn.execute(s).close()
670
    
671
    def version_recluster(self, serial, cluster):
672
        """Move the version into another cluster."""
673
        
674
        props = self.version_get_properties(serial)
675
        if not props:
676
            return
677
        node = props[NODE]
678
        size = props[SIZE]
679
        oldcluster = props[CLUSTER]
680
        if cluster == oldcluster:
681
            return
682
        
683
        mtime = time()
684
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
685
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
686
        
687
        s = self.versions.update()
688
        s = s.where(self.versions.c.serial == serial)
689
        s = s.values(cluster = cluster)
690
        self.conn.execute(s).close()
691
    
692
    def version_remove(self, serial):
693
        """Remove the serial specified."""
694
        
695
        props = self.version_get_properties(serial)
696
        if not props:
697
            return
698
        node = props[NODE]
699
        hash = props[HASH]
700
        size = props[SIZE]
701
        cluster = props[CLUSTER]
702
        
703
        mtime = time()
704
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
705
        
706
        s = self.versions.delete().where(self.versions.c.serial == serial)
707
        self.conn.execute(s).close()
708
        return hash, size
709
    
710
    def attribute_get(self, serial, domain, keys=()):
711
        """Return a list of (key, value) pairs of the version specified by serial.
712
           If keys is empty, return all attributes.
713
           Othwerise, return only those specified.
714
        """
715
        
716
        if keys:
717
            attrs = self.attributes.alias()
718
            s = select([attrs.c.key, attrs.c.value])
719
            s = s.where(and_(attrs.c.key.in_(keys),
720
                             attrs.c.serial == serial,
721
                             attrs.c.domain == domain))
722
        else:
723
            attrs = self.attributes.alias()
724
            s = select([attrs.c.key, attrs.c.value])
725
            s = s.where(and_(attrs.c.serial == serial,
726
                             attrs.c.domain == domain))
727
        r = self.conn.execute(s)
728
        l = r.fetchall()
729
        r.close()
730
        return l
731
    
732
    def attribute_set(self, serial, domain, items):
733
        """Set the attributes of the version specified by serial.
734
           Receive attributes as an iterable of (key, value) pairs.
735
        """
736
        #insert or replace
737
        #TODO better upsert
738
        for k, v in items:
739
            s = self.attributes.update()
740
            s = s.where(and_(self.attributes.c.serial == serial,
741
                             self.attributes.c.domain == domain,
742
                             self.attributes.c.key == k))
743
            s = s.values(value = v)
744
            rp = self.conn.execute(s)
745
            rp.close()
746
            if rp.rowcount == 0:
747
                s = self.attributes.insert()
748
                s = s.values(serial=serial, domain=domain, key=k, value=v)
749
                self.conn.execute(s).close()
750
    
751
    def attribute_del(self, serial, domain, keys=()):
752
        """Delete attributes of the version specified by serial.
753
           If keys is empty, delete all attributes.
754
           Otherwise delete those specified.
755
        """
756
        
757
        if keys:
758
            #TODO more efficient way to do this?
759
            for key in keys:
760
                s = self.attributes.delete()
761
                s = s.where(and_(self.attributes.c.serial == serial,
762
                                 self.attributes.c.domain == domain,
763
                                 self.attributes.c.key == key))
764
                self.conn.execute(s).close()
765
        else:
766
            s = self.attributes.delete()
767
            s = s.where(and_(self.attributes.c.serial == serial,
768
                             self.attributes.c.domain == domain))
769
            self.conn.execute(s).close()
770
    
771
    def attribute_copy(self, source, dest):
772
        s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
773
            self.attributes.c.serial == source)
774
        rp = self.conn.execute(s)
775
        attributes = rp.fetchall()
776
        rp.close()
777
        for dest, domain, k, v in attributes:
778
            #insert or replace
779
            s = self.attributes.update().where(and_(
780
                self.attributes.c.serial == dest,
781
                self.attributes.c.domain == domain,
782
                self.attributes.c.key == k))
783
            rp = self.conn.execute(s, value=v)
784
            rp.close()
785
            if rp.rowcount == 0:
786
                s = self.attributes.insert()
787
                values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
788
                self.conn.execute(s, values).close()
789
    
790
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
791
        """Return a list with all keys pairs defined
792
           for all latest versions under parent that
793
           do not belong to the cluster.
794
        """
795
        
796
        # TODO: Use another table to store before=inf results.
797
        a = self.attributes.alias('a')
798
        v = self.versions.alias('v')
799
        n = self.nodes.alias('n')
800
        s = select([a.c.key]).distinct()
801
        filtered = select([func.max(self.versions.c.serial)])
802
        if before != inf:
803
            filtered = filtered.where(self.versions.c.mtime < before)
804
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
805
        s = s.where(v.c.cluster != except_cluster)
806
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
807
            self.nodes.c.parent == parent)))
808
        s = s.where(a.c.serial == v.c.serial)
809
        s = s.where(a.c.domain == domain)
810
        s = s.where(n.c.node == v.c.node)
811
        conj = []
812
        for path, match in pathq:
813
            if match == MATCH_PREFIX:
814
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
815
            elif match == MATCH_EXACT:
816
                conj.append(n.c.path == path)
817
        if conj:
818
            s = s.where(or_(*conj))
819
        rp = self.conn.execute(s)
820
        rows = rp.fetchall()
821
        rp.close()
822
        return [r[0] for r in rows]
823
    
824
    def latest_version_list(self, parent, prefix='', delimiter=None,
825
                            start='', limit=10000, before=inf,
826
                            except_cluster=0, pathq=[], domain=None,
827
                            filterq=[], sizeq=None, all_props=False):
828
        """Return a (list of (path, serial) tuples, list of common prefixes)
829
           for the current versions of the paths with the given parent,
830
           matching the following criteria.
831
           
832
           The property tuple for a version is returned if all
833
           of these conditions are true:
834
                
835
                a. parent matches
836
                
837
                b. path > start
838
                
839
                c. path starts with prefix (and paths in pathq)
840
                
841
                d. version is the max up to before
842
                
843
                e. version is not in cluster
844
                
845
                f. the path does not have the delimiter occuring
846
                   after the prefix, or ends with the delimiter
847
                
848
                g. serial matches the attribute filter query.
849
                   
850
                   A filter query is a comma-separated list of
851
                   terms in one of these three forms:
852
                   
853
                   key
854
                       an attribute with this key must exist
855
                   
856
                   !key
857
                       an attribute with this key must not exist
858
                   
859
                   key ?op value
860
                       the attribute with this key satisfies the value
861
                       where ?op is one of ==, != <=, >=, <, >.
862
                
863
                h. the size is in the range set by sizeq
864
           
865
           The list of common prefixes includes the prefixes
866
           matching up to the first delimiter after prefix,
867
           and are reported only once, as "virtual directories".
868
           The delimiter is included in the prefixes.
869
           
870
           If arguments are None, then the corresponding matching rule
871
           will always match.
872
           
873
           Limit applies to the first list of tuples returned.
874
           
875
           If all_props is True, return all properties after path, not just serial.
876
        """
877
        
878
        if not start or start < prefix:
879
            start = strprevling(prefix)
880
        nextling = strnextling(prefix)
881
        
882
        v = self.versions.alias('v')
883
        n = self.nodes.alias('n')
884
        if not all_props:
885
            s = select([n.c.path, v.c.serial]).distinct()
886
        else:
887
            s = select([n.c.path,
888
                        v.c.serial, v.c.node, v.c.hash,
889
                        v.c.size, v.c.type, v.c.source,
890
                        v.c.mtime, v.c.muser, v.c.uuid,
891
                        v.c.checksum, v.c.cluster]).distinct()
892
        filtered = select([func.max(self.versions.c.serial)])
893
        if before != inf:
894
            filtered = filtered.where(self.versions.c.mtime < before)
895
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
896
        s = s.where(v.c.cluster != except_cluster)
897
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
898
            self.nodes.c.parent == parent)))
899
        
900
        s = s.where(n.c.node == v.c.node)
901
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
902
        conj = []
903
        for path, match in pathq:
904
            if match == MATCH_PREFIX:
905
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
906
            elif match == MATCH_EXACT:
907
                conj.append(n.c.path == path)
908
        if conj:
909
            s = s.where(or_(*conj))
910
        
911
        if sizeq and len(sizeq) == 2:
912
            if sizeq[0]:
913
                s = s.where(v.c.size >= sizeq[0])
914
            if sizeq[1]:
915
                s = s.where(v.c.size < sizeq[1])
916
        
917
        if domain and filterq:
918
            a = self.attributes.alias('a')
919
            included, excluded, opers = parse_filters(filterq)
920
            if included:
921
                subs = select([1])
922
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
923
                subs = subs.where(a.c.domain == domain)
924
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
925
                s = s.where(exists(subs))
926
            if excluded:
927
                subs = select([1])
928
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
929
                subs = subs.where(a.c.domain == domain)
930
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
931
                s = s.where(not_(exists(subs)))
932
            if opers:
933
                for k, o, val in opers:
934
                    subs = select([1])
935
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
936
                    subs = subs.where(a.c.domain == domain)
937
                    subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
938
                    s = s.where(exists(subs))
939
        
940
        s = s.order_by(n.c.path)
941
        
942
        if not delimiter:
943
            s = s.limit(limit)
944
            rp = self.conn.execute(s, start=start)
945
            r = rp.fetchall()
946
            rp.close()
947
            return r, ()
948
        
949
        pfz = len(prefix)
950
        dz = len(delimiter)
951
        count = 0
952
        prefixes = []
953
        pappend = prefixes.append
954
        matches = []
955
        mappend = matches.append
956
        
957
        rp = self.conn.execute(s, start=start)
958
        while True:
959
            props = rp.fetchone()
960
            if props is None:
961
                break
962
            path = props[0]
963
            serial = props[1]
964
            idx = path.find(delimiter, pfz)
965
            
966
            if idx < 0:
967
                mappend(props)
968
                count += 1
969
                if count >= limit:
970
                    break
971
                continue
972
            
973
            if idx + dz == len(path):
974
                mappend(props)
975
                count += 1
976
                continue # Get one more, in case there is a path.
977
            pf = path[:idx + dz]
978
            pappend(pf)
979
            if count >= limit: 
980
                break
981
            
982
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
983
        rp.close()
984
        
985
        return matches, prefixes
986
    
987
    def latest_uuid(self, uuid):
988
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
989
        
990
        v = self.versions.alias('v')
991
        n = self.nodes.alias('n')
992
        s = select([n.c.path, v.c.serial])
993
        filtered = select([func.max(self.versions.c.serial)])
994
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
995
        s = s.where(n.c.node == v.c.node)
996
        
997
        r = self.conn.execute(s)
998
        l = r.fetchone()
999
        r.close()
1000
        return l