Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 07867f70

History | View | Annotate | Download (38.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
from pithos.backends.filter import parse_filters
45

    
46

    
47
ROOTNODE  = 0
48

    
49
( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
50

    
51
( MATCH_PREFIX, MATCH_EXACT ) = range(2)
52

    
53
inf = float('inf')
54

    
55

    
56
def strnextling(prefix):
57
    """Return the first unicode string
58
       greater than but not starting with given prefix.
59
       strnextling('hello') -> 'hellp'
60
    """
61
    if not prefix:
62
        ## all strings start with the null string,
63
        ## therefore we have to approximate strnextling('')
64
        ## with the last unicode character supported by python
65
        ## 0x10ffff for wide (32-bit unicode) python builds
66
        ## 0x00ffff for narrow (16-bit unicode) python builds
67
        ## We will not autodetect. 0xffff is safe enough.
68
        return unichr(0xffff)
69
    s = prefix[:-1]
70
    c = ord(prefix[-1])
71
    if c >= 0xffff:
72
        raise RuntimeError
73
    s += unichr(c+1)
74
    return s
75

    
76
def strprevling(prefix):
77
    """Return an approximation of the last unicode string
78
       less than but not starting with given prefix.
79
       strprevling(u'hello') -> u'helln\\xffff'
80
    """
81
    if not prefix:
82
        ## There is no prevling for the null string
83
        return prefix
84
    s = prefix[:-1]
85
    c = ord(prefix[-1])
86
    if c > 0:
87
        s += unichr(c-1) + unichr(0xffff)
88
    return s
89

    
90
_propnames = {
91
    'serial'    : 0,
92
    'node'      : 1,
93
    'hash'      : 2,
94
    'size'      : 3,
95
    'type'      : 4,
96
    'source'    : 5,
97
    'mtime'     : 6,
98
    'muser'     : 7,
99
    'uuid'      : 8,
100
    'checksum'  : 9,
101
    'cluster'   : 10
102
}
103

    
104

    
105
class Node(DBWorker):
106
    """Nodes store path organization and have multiple versions.
107
       Versions store object history and have multiple attributes.
108
       Attributes store metadata.
109
    """
110
    
111
    # TODO: Provide an interface for included and excluded clusters.
112
    
113
    def __init__(self, **params):
114
        DBWorker.__init__(self, **params)
115
        metadata = MetaData()
116
        
117
        #create nodes table
118
        columns=[]
119
        columns.append(Column('node', Integer, primary_key=True))
120
        columns.append(Column('parent', Integer,
121
                              ForeignKey('nodes.node',
122
                                         ondelete='CASCADE',
123
                                         onupdate='CASCADE'),
124
                              autoincrement=False))
125
        columns.append(Column('path', String(2048), default='', nullable=False))
126
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
127
        Index('idx_nodes_path', self.nodes.c.path, unique=True)
128
        
129
        #create policy table
130
        columns=[]
131
        columns.append(Column('node', Integer,
132
                              ForeignKey('nodes.node',
133
                                         ondelete='CASCADE',
134
                                         onupdate='CASCADE'),
135
                              primary_key=True))
136
        columns.append(Column('key', String(128), primary_key=True))
137
        columns.append(Column('value', String(256)))
138
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
139
        
140
        #create statistics table
141
        columns=[]
142
        columns.append(Column('node', Integer,
143
                              ForeignKey('nodes.node',
144
                                         ondelete='CASCADE',
145
                                         onupdate='CASCADE'),
146
                              primary_key=True))
147
        columns.append(Column('population', Integer, nullable=False, default=0))
148
        columns.append(Column('size', BigInteger, nullable=False, default=0))
149
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
150
        columns.append(Column('cluster', Integer, nullable=False, default=0,
151
                              primary_key=True, autoincrement=False))
152
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
153
        
154
        #create versions table
155
        columns=[]
156
        columns.append(Column('serial', Integer, primary_key=True))
157
        columns.append(Column('node', Integer,
158
                              ForeignKey('nodes.node',
159
                                         ondelete='CASCADE',
160
                                         onupdate='CASCADE')))
161
        columns.append(Column('hash', String(256)))
162
        columns.append(Column('size', BigInteger, nullable=False, default=0))
163
        columns.append(Column('type', String(256), nullable=False, default=''))
164
        columns.append(Column('source', Integer))
165
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
166
        columns.append(Column('muser', String(256), nullable=False, default=''))
167
        columns.append(Column('uuid', String(64), nullable=False, default=''))
168
        columns.append(Column('checksum', String(256), nullable=False, default=''))
169
        columns.append(Column('cluster', Integer, nullable=False, default=0))
170
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
171
        Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
172
        Index('idx_versions_node_uuid', self.versions.c.uuid)
173
        
174
        #create attributes table
175
        columns = []
176
        columns.append(Column('serial', Integer,
177
                              ForeignKey('versions.serial',
178
                                         ondelete='CASCADE',
179
                                         onupdate='CASCADE'),
180
                              primary_key=True))
181
        columns.append(Column('domain', String(256), primary_key=True))
182
        columns.append(Column('key', String(128), primary_key=True))
183
        columns.append(Column('value', String(256)))
184
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
185
        
186
        metadata.create_all(self.engine)
187
        
188
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
189
                                           self.nodes.c.parent == ROOTNODE))
190
        rp = self.conn.execute(s)
191
        r = rp.fetchone()
192
        rp.close()
193
        if not r:
194
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
195
            self.conn.execute(s)
196
    
197
    def node_create(self, parent, path):
198
        """Create a new node from the given properties.
199
           Return the node identifier of the new node.
200
        """
201
        #TODO catch IntegrityError?
202
        s = self.nodes.insert().values(parent=parent, path=path)
203
        r = self.conn.execute(s)
204
        inserted_primary_key = r.inserted_primary_key[0]
205
        r.close()
206
        return inserted_primary_key
207
    
208
    def node_lookup(self, path):
209
        """Lookup the current node of the given path.
210
           Return None if the path is not found.
211
        """
212
        
213
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
214
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
215
        r = self.conn.execute(s)
216
        row = r.fetchone()
217
        r.close()
218
        if row:
219
            return row[0]
220
        return None
221
    
222
    def node_lookup_bulk(self, paths):
223
        """Lookup the current nodes for the given paths.
224
           Return () if the path is not found.
225
        """
226
        
227
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
228
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
229
        r = self.conn.execute(s)
230
        rows = r.fetchall()
231
        r.close()
232
        return [row[0] for row in rows]
233
    
234
    def node_get_properties(self, node):
235
        """Return the node's (parent, path).
236
           Return None if the node is not found.
237
        """
238
        
239
        s = select([self.nodes.c.parent, self.nodes.c.path])
240
        s = s.where(self.nodes.c.node == node)
241
        r = self.conn.execute(s)
242
        l = r.fetchone()
243
        r.close()
244
        return l
245
    
246
    def node_get_versions(self, node, keys=(), propnames=_propnames):
247
        """Return the properties of all versions at node.
248
           If keys is empty, return all properties in the order
249
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
250
        """
251
        
252
        s = select([self.versions.c.serial,
253
                    self.versions.c.node,
254
                    self.versions.c.hash,
255
                    self.versions.c.size,
256
                    self.versions.c.type,
257
                    self.versions.c.source,
258
                    self.versions.c.mtime,
259
                    self.versions.c.muser,
260
                    self.versions.c.uuid,
261
                    self.versions.c.checksum,
262
                    self.versions.c.cluster], self.versions.c.node == node)
263
        s = s.order_by(self.versions.c.serial)
264
        r = self.conn.execute(s)
265
        rows = r.fetchall()
266
        r.close()
267
        if not rows:
268
            return rows
269
        
270
        if not keys:
271
            return rows
272
        
273
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
274
    
275
    def node_count_children(self, node):
276
        """Return node's child count."""
277
        
278
        s = select([func.count(self.nodes.c.node)])
279
        s = s.where(and_(self.nodes.c.parent == node,
280
                         self.nodes.c.node != ROOTNODE))
281
        r = self.conn.execute(s)
282
        row = r.fetchone()
283
        r.close()
284
        return row[0]
285
    
286
    def node_purge_children(self, parent, before=inf, cluster=0):
287
        """Delete all versions with the specified
288
           parent and cluster, and return
289
           the hashes and size of versions deleted.
290
           Clears out nodes with no remaining versions.
291
        """
292
        #update statistics
293
        c1 = select([self.nodes.c.node],
294
            self.nodes.c.parent == parent)
295
        where_clause = and_(self.versions.c.node.in_(c1),
296
                            self.versions.c.cluster == cluster)
297
        s = select([func.count(self.versions.c.serial),
298
                    func.sum(self.versions.c.size)])
299
        s = s.where(where_clause)
300
        if before != inf:
301
            s = s.where(self.versions.c.mtime <= before)
302
        r = self.conn.execute(s)
303
        row = r.fetchone()
304
        r.close()
305
        if not row:
306
            return (), 0
307
        nr, size = row[0], -row[1] if row[1] else 0
308
        mtime = time()
309
        self.statistics_update(parent, -nr, size, mtime, cluster)
310
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
311
        
312
        s = select([self.versions.c.hash])
313
        s = s.where(where_clause)
314
        r = self.conn.execute(s)
315
        hashes = [row[0] for row in r.fetchall()]
316
        r.close()
317
        
318
        #delete versions
319
        s = self.versions.delete().where(where_clause)
320
        r = self.conn.execute(s)
321
        r.close()
322
        
323
        #delete nodes
324
        s = select([self.nodes.c.node],
325
            and_(self.nodes.c.parent == parent,
326
                 select([func.count(self.versions.c.serial)],
327
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
328
        rp = self.conn.execute(s)
329
        nodes = [r[0] for r in rp.fetchall()]
330
        rp.close()
331
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
332
        self.conn.execute(s).close()
333
        
334
        return hashes, size
335
    
336
    def node_purge(self, node, before=inf, cluster=0):
337
        """Delete all versions with the specified
338
           node and cluster, and return
339
           the hashes and size of versions deleted.
340
           Clears out the node if it has no remaining versions.
341
        """
342
        
343
        #update statistics
344
        s = select([func.count(self.versions.c.serial),
345
                    func.sum(self.versions.c.size)])
346
        where_clause = and_(self.versions.c.node == node,
347
                         self.versions.c.cluster == cluster)
348
        s = s.where(where_clause)
349
        if before != inf:
350
            s = s.where(self.versions.c.mtime <= before)
351
        r = self.conn.execute(s)
352
        row = r.fetchone()
353
        nr, size = row[0], row[1]
354
        r.close()
355
        if not nr:
356
            return (), 0
357
        mtime = time()
358
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
359
        
360
        s = select([self.versions.c.hash])
361
        s = s.where(where_clause)
362
        r = self.conn.execute(s)
363
        hashes = [r[0] for r in r.fetchall()]
364
        r.close()
365
        
366
        #delete versions
367
        s = self.versions.delete().where(where_clause)
368
        r = self.conn.execute(s)
369
        r.close()
370
        
371
        #delete nodes
372
        s = select([self.nodes.c.node],
373
            and_(self.nodes.c.node == node,
374
                 select([func.count(self.versions.c.serial)],
375
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
376
        r = self.conn.execute(s)
377
        nodes = r.fetchall()
378
        r.close()
379
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
380
        self.conn.execute(s).close()
381
        
382
        return hashes, size
383
    
384
    def node_remove(self, node):
385
        """Remove the node specified.
386
           Return false if the node has children or is not found.
387
        """
388
        
389
        if self.node_count_children(node):
390
            return False
391
        
392
        mtime = time()
393
        s = select([func.count(self.versions.c.serial),
394
                    func.sum(self.versions.c.size),
395
                    self.versions.c.cluster])
396
        s = s.where(self.versions.c.node == node)
397
        s = s.group_by(self.versions.c.cluster)
398
        r = self.conn.execute(s)
399
        for population, size, cluster in r.fetchall():
400
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
401
        r.close()
402
        
403
        s = self.nodes.delete().where(self.nodes.c.node == node)
404
        self.conn.execute(s).close()
405
        return True
406
    
407
    def policy_get(self, node):
408
        s = select([self.policies.c.key, self.policies.c.value],
409
            self.policies.c.node==node)
410
        r = self.conn.execute(s)
411
        d = dict(r.fetchall())
412
        r.close()
413
        return d
414
    
415
    def policy_set(self, node, policy):
416
        #insert or replace
417
        for k, v in policy.iteritems():
418
            s = self.policies.update().where(and_(self.policies.c.node == node,
419
                                                  self.policies.c.key == k))
420
            s = s.values(value = v)
421
            rp = self.conn.execute(s)
422
            rp.close()
423
            if rp.rowcount == 0:
424
                s = self.policies.insert()
425
                values = {'node':node, 'key':k, 'value':v}
426
                r = self.conn.execute(s, values)
427
                r.close()
428
    
429
    def statistics_get(self, node, cluster=0):
430
        """Return population, total size and last mtime
431
           for all versions under node that belong to the cluster.
432
        """
433
        
434
        s = select([self.statistics.c.population,
435
                    self.statistics.c.size,
436
                    self.statistics.c.mtime])
437
        s = s.where(and_(self.statistics.c.node == node,
438
                         self.statistics.c.cluster == cluster))
439
        r = self.conn.execute(s)
440
        row = r.fetchone()
441
        r.close()
442
        return row
443
    
444
    def statistics_update(self, node, population, size, mtime, cluster=0):
445
        """Update the statistics of the given node.
446
           Statistics keep track the population, total
447
           size of objects and mtime in the node's namespace.
448
           May be zero or positive or negative numbers.
449
        """
450
        s = select([self.statistics.c.population, self.statistics.c.size],
451
            and_(self.statistics.c.node == node,
452
                 self.statistics.c.cluster == cluster))
453
        rp = self.conn.execute(s)
454
        r = rp.fetchone()
455
        rp.close()
456
        if not r:
457
            prepopulation, presize = (0, 0)
458
        else:
459
            prepopulation, presize = r
460
        population += prepopulation
461
        size += presize
462
        
463
        #insert or replace
464
        #TODO better upsert
465
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
466
                                           self.statistics.c.cluster==cluster))
467
        u = u.values(population=population, size=size, mtime=mtime)
468
        rp = self.conn.execute(u)
469
        rp.close()
470
        if rp.rowcount == 0:
471
            ins = self.statistics.insert()
472
            ins = ins.values(node=node, population=population, size=size,
473
                             mtime=mtime, cluster=cluster)
474
            self.conn.execute(ins).close()
475
    
476
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
477
        """Update the statistics of the given node's parent.
478
           Then recursively update all parents up to the root.
479
           Population is not recursive.
480
        """
481
        
482
        while True:
483
            if node == ROOTNODE:
484
                break
485
            props = self.node_get_properties(node)
486
            if props is None:
487
                break
488
            parent, path = props
489
            self.statistics_update(parent, population, size, mtime, cluster)
490
            node = parent
491
            population = 0 # Population isn't recursive
492
    
493
    def statistics_latest(self, node, before=inf, except_cluster=0):
494
        """Return population, total size and last mtime
495
           for all latest versions under node that
496
           do not belong to the cluster.
497
        """
498
        
499
        # The node.
500
        props = self.node_get_properties(node)
501
        if props is None:
502
            return None
503
        parent, path = props
504
        
505
        # The latest version.
506
        s = select([self.versions.c.serial,
507
                    self.versions.c.node,
508
                    self.versions.c.hash,
509
                    self.versions.c.size,
510
                    self.versions.c.type,
511
                    self.versions.c.source,
512
                    self.versions.c.mtime,
513
                    self.versions.c.muser,
514
                    self.versions.c.uuid,
515
                    self.versions.c.checksum,
516
                    self.versions.c.cluster])
517
        filtered = select([func.max(self.versions.c.serial)],
518
                            self.versions.c.node == node)
519
        if before != inf:
520
            filtered = filtered.where(self.versions.c.mtime < before)
521
        s = s.where(and_(self.versions.c.cluster != except_cluster,
522
                         self.versions.c.serial == filtered))
523
        r = self.conn.execute(s)
524
        props = r.fetchone()
525
        r.close()
526
        if not props:
527
            return None
528
        mtime = props[MTIME]
529
        
530
        # First level, just under node (get population).
531
        v = self.versions.alias('v')
532
        s = select([func.count(v.c.serial),
533
                    func.sum(v.c.size),
534
                    func.max(v.c.mtime)])
535
        c1 = select([func.max(self.versions.c.serial)])
536
        if before != inf:
537
            c1 = c1.where(self.versions.c.mtime < before)
538
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
539
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
540
                         v.c.cluster != except_cluster,
541
                         v.c.node.in_(c2)))
542
        rp = self.conn.execute(s)
543
        r = rp.fetchone()
544
        rp.close()
545
        if not r:
546
            return None
547
        count = r[0]
548
        mtime = max(mtime, r[2])
549
        if count == 0:
550
            return (0, 0, mtime)
551
        
552
        # All children (get size and mtime).
553
        # This is why the full path is stored.
554
        s = select([func.count(v.c.serial),
555
                    func.sum(v.c.size),
556
                    func.max(v.c.mtime)])
557
        c1 = select([func.max(self.versions.c.serial)],
558
            self.versions.c.node == v.c.node)
559
        if before != inf:
560
            c1 = c1.where(self.versions.c.mtime < before)
561
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
562
        s = s.where(and_(v.c.serial == c1,
563
                         v.c.cluster != except_cluster,
564
                         v.c.node.in_(c2)))
565
        rp = self.conn.execute(s)
566
        r = rp.fetchone()
567
        rp.close()
568
        if not r:
569
            return None
570
        size = r[1] - props[SIZE]
571
        mtime = max(mtime, r[2])
572
        return (count, size, mtime)
573
    
574
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
575
        """Create a new version from the given properties.
576
           Return the (serial, mtime) of the new version.
577
        """
578
        
579
        mtime = time()
580
        s = self.versions.insert().values(node=node, hash=hash, size=size, type=type, source=source,
581
                                          mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
582
        serial = self.conn.execute(s).inserted_primary_key[0]
583
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
584
        return serial, mtime
585
    
586
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
587
        """Lookup the current version of the given node.
588
           Return a list with its properties:
589
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
590
           or None if the current version is not found in the given cluster.
591
        """
592
        
593
        v = self.versions.alias('v')
594
        if not all_props:
595
            s = select([v.c.serial])
596
        else:
597
            s = select([v.c.serial, v.c.node, v.c.hash,
598
                        v.c.size, v.c.type, v.c.source,
599
                        v.c.mtime, v.c.muser, v.c.uuid,
600
                        v.c.checksum, v.c.cluster])
601
        c = select([func.max(self.versions.c.serial)],
602
            self.versions.c.node == node)
603
        if before != inf:
604
            c = c.where(self.versions.c.mtime < before)
605
        s = s.where(and_(v.c.serial == c,
606
                         v.c.cluster == cluster))
607
        r = self.conn.execute(s)
608
        props = r.fetchone()
609
        r.close()
610
        if props:
611
            return props
612
        return None
613
    
614
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
615
        """Lookup the current versions of the given nodes.
616
           Return a list with their properties:
617
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
618
        """
619
        
620
        v = self.versions.alias('v')
621
        if not all_props:
622
            s = select([v.c.serial])
623
        else:
624
            s = select([v.c.serial, v.c.node, v.c.hash,
625
                        v.c.size, v.c.type, v.c.source,
626
                        v.c.mtime, v.c.muser, v.c.uuid,
627
                        v.c.checksum, v.c.cluster])
628
        c = select([func.max(self.versions.c.serial)],
629
            self.versions.c.node.in_(nodes)).group_by(self.versions.c.node)
630
        if before != inf:
631
            c = c.where(self.versions.c.mtime < before)
632
        s = s.where(and_(v.c.serial.in_(c),
633
                         v.c.cluster == cluster))
634
        s = s.order_by(v.c.node)
635
        r = self.conn.execute(s)
636
        rproxy = r.fetchall()
637
        r.close()
638
        return (tuple(row.values()) for row in rproxy)
639
        
640
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
641
        """Return a sequence of values for the properties of
642
           the version specified by serial and the keys, in the order given.
643
           If keys is empty, return all properties in the order
644
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
645
        """
646
        
647
        v = self.versions.alias()
648
        s = select([v.c.serial, v.c.node, v.c.hash,
649
                    v.c.size, v.c.type, v.c.source,
650
                    v.c.mtime, v.c.muser, v.c.uuid,
651
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
652
        rp = self.conn.execute(s)
653
        r = rp.fetchone()
654
        rp.close()
655
        if r is None:
656
            return r
657
        
658
        if not keys:
659
            return r
660
        return [r[propnames[k]] for k in keys if k in propnames]
661
    
662
    def version_put_property(self, serial, key, value):
663
        """Set value for the property of version specified by key."""
664
        
665
        if key not in _propnames:
666
            return
667
        s = self.versions.update()
668
        s = s.where(self.versions.c.serial == serial)
669
        s = s.values(**{key: value})
670
        self.conn.execute(s).close()
671
    
672
    def version_recluster(self, serial, cluster):
673
        """Move the version into another cluster."""
674
        
675
        props = self.version_get_properties(serial)
676
        if not props:
677
            return
678
        node = props[NODE]
679
        size = props[SIZE]
680
        oldcluster = props[CLUSTER]
681
        if cluster == oldcluster:
682
            return
683
        
684
        mtime = time()
685
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
686
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
687
        
688
        s = self.versions.update()
689
        s = s.where(self.versions.c.serial == serial)
690
        s = s.values(cluster = cluster)
691
        self.conn.execute(s).close()
692
    
693
    def version_remove(self, serial):
694
        """Remove the serial specified."""
695
        
696
        props = self.version_get_properties(serial)
697
        if not props:
698
            return
699
        node = props[NODE]
700
        hash = props[HASH]
701
        size = props[SIZE]
702
        cluster = props[CLUSTER]
703
        
704
        mtime = time()
705
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
706
        
707
        s = self.versions.delete().where(self.versions.c.serial == serial)
708
        self.conn.execute(s).close()
709
        return hash, size
710
    
711
    def attribute_get(self, serial, domain, keys=()):
712
        """Return a list of (key, value) pairs of the version specified by serial.
713
           If keys is empty, return all attributes.
714
           Othwerise, return only those specified.
715
        """
716
        
717
        if keys:
718
            attrs = self.attributes.alias()
719
            s = select([attrs.c.key, attrs.c.value])
720
            s = s.where(and_(attrs.c.key.in_(keys),
721
                             attrs.c.serial == serial,
722
                             attrs.c.domain == domain))
723
        else:
724
            attrs = self.attributes.alias()
725
            s = select([attrs.c.key, attrs.c.value])
726
            s = s.where(and_(attrs.c.serial == serial,
727
                             attrs.c.domain == domain))
728
        r = self.conn.execute(s)
729
        l = r.fetchall()
730
        r.close()
731
        return l
732
    
733
    def attribute_set(self, serial, domain, items):
734
        """Set the attributes of the version specified by serial.
735
           Receive attributes as an iterable of (key, value) pairs.
736
        """
737
        #insert or replace
738
        #TODO better upsert
739
        for k, v in items:
740
            s = self.attributes.update()
741
            s = s.where(and_(self.attributes.c.serial == serial,
742
                             self.attributes.c.domain == domain,
743
                             self.attributes.c.key == k))
744
            s = s.values(value = v)
745
            rp = self.conn.execute(s)
746
            rp.close()
747
            if rp.rowcount == 0:
748
                s = self.attributes.insert()
749
                s = s.values(serial=serial, domain=domain, key=k, value=v)
750
                self.conn.execute(s).close()
751
    
752
    def attribute_del(self, serial, domain, keys=()):
753
        """Delete attributes of the version specified by serial.
754
           If keys is empty, delete all attributes.
755
           Otherwise delete those specified.
756
        """
757
        
758
        if keys:
759
            #TODO more efficient way to do this?
760
            for key in keys:
761
                s = self.attributes.delete()
762
                s = s.where(and_(self.attributes.c.serial == serial,
763
                                 self.attributes.c.domain == domain,
764
                                 self.attributes.c.key == key))
765
                self.conn.execute(s).close()
766
        else:
767
            s = self.attributes.delete()
768
            s = s.where(and_(self.attributes.c.serial == serial,
769
                             self.attributes.c.domain == domain))
770
            self.conn.execute(s).close()
771
    
772
    def attribute_copy(self, source, dest):
773
        s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
774
            self.attributes.c.serial == source)
775
        rp = self.conn.execute(s)
776
        attributes = rp.fetchall()
777
        rp.close()
778
        for dest, domain, k, v in attributes:
779
            #insert or replace
780
            s = self.attributes.update().where(and_(
781
                self.attributes.c.serial == dest,
782
                self.attributes.c.domain == domain,
783
                self.attributes.c.key == k))
784
            rp = self.conn.execute(s, value=v)
785
            rp.close()
786
            if rp.rowcount == 0:
787
                s = self.attributes.insert()
788
                values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
789
                self.conn.execute(s, values).close()
790
    
791
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
792
        """Return a list with all keys pairs defined
793
           for all latest versions under parent that
794
           do not belong to the cluster.
795
        """
796
        
797
        # TODO: Use another table to store before=inf results.
798
        a = self.attributes.alias('a')
799
        v = self.versions.alias('v')
800
        n = self.nodes.alias('n')
801
        s = select([a.c.key]).distinct()
802
        filtered = select([func.max(self.versions.c.serial)])
803
        if before != inf:
804
            filtered = filtered.where(self.versions.c.mtime < before)
805
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
806
        s = s.where(v.c.cluster != except_cluster)
807
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
808
            self.nodes.c.parent == parent)))
809
        s = s.where(a.c.serial == v.c.serial)
810
        s = s.where(a.c.domain == domain)
811
        s = s.where(n.c.node == v.c.node)
812
        conj = []
813
        for path, match in pathq:
814
            if match == MATCH_PREFIX:
815
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
816
            elif match == MATCH_EXACT:
817
                conj.append(n.c.path == path)
818
        if conj:
819
            s = s.where(or_(*conj))
820
        rp = self.conn.execute(s)
821
        rows = rp.fetchall()
822
        rp.close()
823
        return [r[0] for r in rows]
824
    
825
    def latest_version_list(self, parent, prefix='', delimiter=None,
826
                            start='', limit=10000, before=inf,
827
                            except_cluster=0, pathq=[], domain=None,
828
                            filterq=[], sizeq=None, all_props=False):
829
        """Return a (list of (path, serial) tuples, list of common prefixes)
830
           for the current versions of the paths with the given parent,
831
           matching the following criteria.
832
           
833
           The property tuple for a version is returned if all
834
           of these conditions are true:
835
                
836
                a. parent matches
837
                
838
                b. path > start
839
                
840
                c. path starts with prefix (and paths in pathq)
841
                
842
                d. version is the max up to before
843
                
844
                e. version is not in cluster
845
                
846
                f. the path does not have the delimiter occuring
847
                   after the prefix, or ends with the delimiter
848
                
849
                g. serial matches the attribute filter query.
850
                   
851
                   A filter query is a comma-separated list of
852
                   terms in one of these three forms:
853
                   
854
                   key
855
                       an attribute with this key must exist
856
                   
857
                   !key
858
                       an attribute with this key must not exist
859
                   
860
                   key ?op value
861
                       the attribute with this key satisfies the value
862
                       where ?op is one of ==, != <=, >=, <, >.
863
                
864
                h. the size is in the range set by sizeq
865
           
866
           The list of common prefixes includes the prefixes
867
           matching up to the first delimiter after prefix,
868
           and are reported only once, as "virtual directories".
869
           The delimiter is included in the prefixes.
870
           
871
           If arguments are None, then the corresponding matching rule
872
           will always match.
873
           
874
           Limit applies to the first list of tuples returned.
875
           
876
           If all_props is True, return all properties after path, not just serial.
877
        """
878
        
879
        if not start or start < prefix:
880
            start = strprevling(prefix)
881
        nextling = strnextling(prefix)
882
        
883
        v = self.versions.alias('v')
884
        n = self.nodes.alias('n')
885
        if not all_props:
886
            s = select([n.c.path, v.c.serial]).distinct()
887
        else:
888
            s = select([n.c.path,
889
                        v.c.serial, v.c.node, v.c.hash,
890
                        v.c.size, v.c.type, v.c.source,
891
                        v.c.mtime, v.c.muser, v.c.uuid,
892
                        v.c.checksum, v.c.cluster]).distinct()
893
        filtered = select([func.max(self.versions.c.serial)])
894
        if before != inf:
895
            filtered = filtered.where(self.versions.c.mtime < before)
896
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
897
        s = s.where(v.c.cluster != except_cluster)
898
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
899
            self.nodes.c.parent == parent)))
900
        
901
        s = s.where(n.c.node == v.c.node)
902
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
903
        conj = []
904
        for path, match in pathq:
905
            if match == MATCH_PREFIX:
906
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
907
            elif match == MATCH_EXACT:
908
                conj.append(n.c.path == path)
909
        if conj:
910
            s = s.where(or_(*conj))
911
        
912
        if sizeq and len(sizeq) == 2:
913
            if sizeq[0]:
914
                s = s.where(v.c.size >= sizeq[0])
915
            if sizeq[1]:
916
                s = s.where(v.c.size < sizeq[1])
917
        
918
        if domain and filterq:
919
            a = self.attributes.alias('a')
920
            included, excluded, opers = parse_filters(filterq)
921
            if included:
922
                subs = select([1])
923
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
924
                subs = subs.where(a.c.domain == domain)
925
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
926
                s = s.where(exists(subs))
927
            if excluded:
928
                subs = select([1])
929
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
930
                subs = subs.where(a.c.domain == domain)
931
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
932
                s = s.where(not_(exists(subs)))
933
            if opers:
934
                for k, o, val in opers:
935
                    subs = select([1])
936
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
937
                    subs = subs.where(a.c.domain == domain)
938
                    subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
939
                    s = s.where(exists(subs))
940
        
941
        s = s.order_by(n.c.path)
942
        
943
        if not delimiter:
944
            s = s.limit(limit)
945
            rp = self.conn.execute(s, start=start)
946
            r = rp.fetchall()
947
            rp.close()
948
            return r, ()
949
        
950
        pfz = len(prefix)
951
        dz = len(delimiter)
952
        count = 0
953
        prefixes = []
954
        pappend = prefixes.append
955
        matches = []
956
        mappend = matches.append
957
        
958
        rp = self.conn.execute(s, start=start)
959
        while True:
960
            props = rp.fetchone()
961
            if props is None:
962
                break
963
            path = props[0]
964
            serial = props[1]
965
            idx = path.find(delimiter, pfz)
966
            
967
            if idx < 0:
968
                mappend(props)
969
                count += 1
970
                if count >= limit:
971
                    break
972
                continue
973
            
974
            if idx + dz == len(path):
975
                mappend(props)
976
                count += 1
977
                continue # Get one more, in case there is a path.
978
            pf = path[:idx + dz]
979
            pappend(pf)
980
            if count >= limit: 
981
                break
982
            
983
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
984
        rp.close()
985
        
986
        return matches, prefixes
987
    
988
    def latest_uuid(self, uuid):
989
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
990
        
991
        v = self.versions.alias('v')
992
        n = self.nodes.alias('n')
993
        s = select([n.c.path, v.c.serial])
994
        filtered = select([func.max(self.versions.c.serial)])
995
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
996
        s = s.where(n.c.node == v.c.node)
997
        
998
        r = self.conn.execute(s)
999
        l = r.fetchone()
1000
        r.close()
1001
        return l