Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 94243c86

History | View | Annotate | Download (39.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
from pithos.backends.filter import parse_filters
45

    
46

    
47
ROOTNODE  = 0
48

    
49
( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
50

    
51
( MATCH_PREFIX, MATCH_EXACT ) = range(2)
52

    
53
inf = float('inf')
54

    
55

    
56
def strnextling(prefix):
57
    """Return the first unicode string
58
       greater than but not starting with given prefix.
59
       strnextling('hello') -> 'hellp'
60
    """
61
    if not prefix:
62
        ## all strings start with the null string,
63
        ## therefore we have to approximate strnextling('')
64
        ## with the last unicode character supported by python
65
        ## 0x10ffff for wide (32-bit unicode) python builds
66
        ## 0x00ffff for narrow (16-bit unicode) python builds
67
        ## We will not autodetect. 0xffff is safe enough.
68
        return unichr(0xffff)
69
    s = prefix[:-1]
70
    c = ord(prefix[-1])
71
    if c >= 0xffff:
72
        raise RuntimeError
73
    s += unichr(c+1)
74
    return s
75

    
76
def strprevling(prefix):
77
    """Return an approximation of the last unicode string
78
       less than but not starting with given prefix.
79
       strprevling(u'hello') -> u'helln\\xffff'
80
    """
81
    if not prefix:
82
        ## There is no prevling for the null string
83
        return prefix
84
    s = prefix[:-1]
85
    c = ord(prefix[-1])
86
    if c > 0:
87
        s += unichr(c-1) + unichr(0xffff)
88
    return s
89

    
90
_propnames = {
91
    'serial'    : 0,
92
    'node'      : 1,
93
    'hash'      : 2,
94
    'size'      : 3,
95
    'type'      : 4,
96
    'source'    : 5,
97
    'mtime'     : 6,
98
    'muser'     : 7,
99
    'uuid'      : 8,
100
    'checksum'  : 9,
101
    'cluster'   : 10
102
}
103

    
104

    
105
class Node(DBWorker):
106
    """Nodes store path organization and have multiple versions.
107
       Versions store object history and have multiple attributes.
108
       Attributes store metadata.
109
    """
110
    
111
    # TODO: Provide an interface for included and excluded clusters.
112
    
113
    def __init__(self, **params):
114
        DBWorker.__init__(self, **params)
115
        metadata = MetaData()
116
        
117
        #create nodes table
118
        columns=[]
119
        columns.append(Column('node', Integer, primary_key=True))
120
        columns.append(Column('parent', Integer,
121
                              ForeignKey('nodes.node',
122
                                         ondelete='CASCADE',
123
                                         onupdate='CASCADE'),
124
                              autoincrement=False))
125
        columns.append(Column('latest_version', Integer))
126
        columns.append(Column('path', String(2048), default='', nullable=False))
127
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
128
        Index('idx_nodes_path', self.nodes.c.path, unique=True)
129
        Index('idx_nodes_parent', self.nodes.c.parent)
130
        
131
        #create policy table
132
        columns=[]
133
        columns.append(Column('node', Integer,
134
                              ForeignKey('nodes.node',
135
                                         ondelete='CASCADE',
136
                                         onupdate='CASCADE'),
137
                              primary_key=True))
138
        columns.append(Column('key', String(128), primary_key=True))
139
        columns.append(Column('value', String(256)))
140
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
141
        
142
        #create statistics table
143
        columns=[]
144
        columns.append(Column('node', Integer,
145
                              ForeignKey('nodes.node',
146
                                         ondelete='CASCADE',
147
                                         onupdate='CASCADE'),
148
                              primary_key=True))
149
        columns.append(Column('population', Integer, nullable=False, default=0))
150
        columns.append(Column('size', BigInteger, nullable=False, default=0))
151
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
152
        columns.append(Column('cluster', Integer, nullable=False, default=0,
153
                              primary_key=True, autoincrement=False))
154
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
155
        
156
        #create versions table
157
        columns=[]
158
        columns.append(Column('serial', Integer, primary_key=True))
159
        columns.append(Column('node', Integer,
160
                              ForeignKey('nodes.node',
161
                                         ondelete='CASCADE',
162
                                         onupdate='CASCADE')))
163
        columns.append(Column('hash', String(256)))
164
        columns.append(Column('size', BigInteger, nullable=False, default=0))
165
        columns.append(Column('type', String(256), nullable=False, default=''))
166
        columns.append(Column('source', Integer))
167
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
168
        columns.append(Column('muser', String(256), nullable=False, default=''))
169
        columns.append(Column('uuid', String(64), nullable=False, default=''))
170
        columns.append(Column('checksum', String(256), nullable=False, default=''))
171
        columns.append(Column('cluster', Integer, nullable=False, default=0))
172
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
173
        Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
174
        Index('idx_versions_node_uuid', self.versions.c.uuid)
175
        
176
        #create attributes table
177
        columns = []
178
        columns.append(Column('serial', Integer,
179
                              ForeignKey('versions.serial',
180
                                         ondelete='CASCADE',
181
                                         onupdate='CASCADE'),
182
                              primary_key=True))
183
        columns.append(Column('domain', String(256), primary_key=True))
184
        columns.append(Column('key', String(128), primary_key=True))
185
        columns.append(Column('value', String(256)))
186
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
187
        
188
        metadata.create_all(self.engine)
189
        
190
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
191
                                           self.nodes.c.parent == ROOTNODE))
192
        rp = self.conn.execute(s)
193
        r = rp.fetchone()
194
        rp.close()
195
        if not r:
196
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
197
            self.conn.execute(s)
198
    
199
    def node_create(self, parent, path):
200
        """Create a new node from the given properties.
201
           Return the node identifier of the new node.
202
        """
203
        #TODO catch IntegrityError?
204
        s = self.nodes.insert().values(parent=parent, path=path)
205
        r = self.conn.execute(s)
206
        inserted_primary_key = r.inserted_primary_key[0]
207
        r.close()
208
        return inserted_primary_key
209
    
210
    def node_lookup(self, path):
211
        """Lookup the current node of the given path.
212
           Return None if the path is not found.
213
        """
214
        
215
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
216
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
217
        r = self.conn.execute(s)
218
        row = r.fetchone()
219
        r.close()
220
        if row:
221
            return row[0]
222
        return None
223
    
224
    def node_lookup_bulk(self, paths):
225
        """Lookup the current nodes for the given paths.
226
           Return () if the path is not found.
227
        """
228
        
229
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
230
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
231
        r = self.conn.execute(s)
232
        rows = r.fetchall()
233
        r.close()
234
        return [row[0] for row in rows]
235
    
236
    def node_get_properties(self, node):
237
        """Return the node's (parent, path).
238
           Return None if the node is not found.
239
        """
240
        
241
        s = select([self.nodes.c.parent, self.nodes.c.path])
242
        s = s.where(self.nodes.c.node == node)
243
        r = self.conn.execute(s)
244
        l = r.fetchone()
245
        r.close()
246
        return l
247
    
248
    def node_get_versions(self, node, keys=(), propnames=_propnames):
249
        """Return the properties of all versions at node.
250
           If keys is empty, return all properties in the order
251
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
252
        """
253
        
254
        s = select([self.versions.c.serial,
255
                    self.versions.c.node,
256
                    self.versions.c.hash,
257
                    self.versions.c.size,
258
                    self.versions.c.type,
259
                    self.versions.c.source,
260
                    self.versions.c.mtime,
261
                    self.versions.c.muser,
262
                    self.versions.c.uuid,
263
                    self.versions.c.checksum,
264
                    self.versions.c.cluster], self.versions.c.node == node)
265
        s = s.order_by(self.versions.c.serial)
266
        r = self.conn.execute(s)
267
        rows = r.fetchall()
268
        r.close()
269
        if not rows:
270
            return rows
271
        
272
        if not keys:
273
            return rows
274
        
275
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
276
    
277
    def node_count_children(self, node):
278
        """Return node's child count."""
279
        
280
        s = select([func.count(self.nodes.c.node)])
281
        s = s.where(and_(self.nodes.c.parent == node,
282
                         self.nodes.c.node != ROOTNODE))
283
        r = self.conn.execute(s)
284
        row = r.fetchone()
285
        r.close()
286
        return row[0]
287
    
288
    def node_purge_children(self, parent, before=inf, cluster=0):
289
        """Delete all versions with the specified
290
           parent and cluster, and return
291
           the hashes and size of versions deleted.
292
           Clears out nodes with no remaining versions.
293
        """
294
        #update statistics
295
        c1 = select([self.nodes.c.node],
296
            self.nodes.c.parent == parent)
297
        where_clause = and_(self.versions.c.node.in_(c1),
298
                            self.versions.c.cluster == cluster)
299
        s = select([func.count(self.versions.c.serial),
300
                    func.sum(self.versions.c.size)])
301
        s = s.where(where_clause)
302
        if before != inf:
303
            s = s.where(self.versions.c.mtime <= before)
304
        r = self.conn.execute(s)
305
        row = r.fetchone()
306
        r.close()
307
        if not row:
308
            return (), 0
309
        nr, size = row[0], -row[1] if row[1] else 0
310
        mtime = time()
311
        self.statistics_update(parent, -nr, size, mtime, cluster)
312
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
313
        
314
        s = select([self.versions.c.hash])
315
        s = s.where(where_clause)
316
        r = self.conn.execute(s)
317
        hashes = [row[0] for row in r.fetchall()]
318
        r.close()
319
        
320
        #delete versions
321
        s = self.versions.delete().where(where_clause)
322
        r = self.conn.execute(s)
323
        r.close()
324
        
325
        #delete nodes
326
        s = select([self.nodes.c.node],
327
            and_(self.nodes.c.parent == parent,
328
                 select([func.count(self.versions.c.serial)],
329
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
330
        rp = self.conn.execute(s)
331
        nodes = [r[0] for r in rp.fetchall()]
332
        rp.close()
333
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
334
        self.conn.execute(s).close()
335
        
336
        return hashes, size
337
    
338
    def node_purge(self, node, before=inf, cluster=0):
339
        """Delete all versions with the specified
340
           node and cluster, and return
341
           the hashes and size of versions deleted.
342
           Clears out the node if it has no remaining versions.
343
        """
344
        
345
        #update statistics
346
        s = select([func.count(self.versions.c.serial),
347
                    func.sum(self.versions.c.size)])
348
        where_clause = and_(self.versions.c.node == node,
349
                         self.versions.c.cluster == cluster)
350
        s = s.where(where_clause)
351
        if before != inf:
352
            s = s.where(self.versions.c.mtime <= before)
353
        r = self.conn.execute(s)
354
        row = r.fetchone()
355
        nr, size = row[0], row[1]
356
        r.close()
357
        if not nr:
358
            return (), 0
359
        mtime = time()
360
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
361
        
362
        s = select([self.versions.c.hash])
363
        s = s.where(where_clause)
364
        r = self.conn.execute(s)
365
        hashes = [r[0] for r in r.fetchall()]
366
        r.close()
367
        
368
        #delete versions
369
        s = self.versions.delete().where(where_clause)
370
        r = self.conn.execute(s)
371
        r.close()
372
        
373
        #delete nodes
374
        s = select([self.nodes.c.node],
375
            and_(self.nodes.c.node == node,
376
                 select([func.count(self.versions.c.serial)],
377
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
378
        r = self.conn.execute(s)
379
        nodes = r.fetchall()
380
        r.close()
381
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
382
        self.conn.execute(s).close()
383
        
384
        return hashes, size
385
    
386
    def node_remove(self, node):
387
        """Remove the node specified.
388
           Return false if the node has children or is not found.
389
        """
390
        
391
        if self.node_count_children(node):
392
            return False
393
        
394
        mtime = time()
395
        s = select([func.count(self.versions.c.serial),
396
                    func.sum(self.versions.c.size),
397
                    self.versions.c.cluster])
398
        s = s.where(self.versions.c.node == node)
399
        s = s.group_by(self.versions.c.cluster)
400
        r = self.conn.execute(s)
401
        for population, size, cluster in r.fetchall():
402
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
403
        r.close()
404
        
405
        s = self.nodes.delete().where(self.nodes.c.node == node)
406
        self.conn.execute(s).close()
407
        return True
408
    
409
    def policy_get(self, node):
410
        s = select([self.policies.c.key, self.policies.c.value],
411
            self.policies.c.node==node)
412
        r = self.conn.execute(s)
413
        d = dict(r.fetchall())
414
        r.close()
415
        return d
416
    
417
    def policy_set(self, node, policy):
418
        #insert or replace
419
        for k, v in policy.iteritems():
420
            s = self.policies.update().where(and_(self.policies.c.node == node,
421
                                                  self.policies.c.key == k))
422
            s = s.values(value = v)
423
            rp = self.conn.execute(s)
424
            rp.close()
425
            if rp.rowcount == 0:
426
                s = self.policies.insert()
427
                values = {'node':node, 'key':k, 'value':v}
428
                r = self.conn.execute(s, values)
429
                r.close()
430
    
431
    def statistics_get(self, node, cluster=0):
432
        """Return population, total size and last mtime
433
           for all versions under node that belong to the cluster.
434
        """
435
        
436
        s = select([self.statistics.c.population,
437
                    self.statistics.c.size,
438
                    self.statistics.c.mtime])
439
        s = s.where(and_(self.statistics.c.node == node,
440
                         self.statistics.c.cluster == cluster))
441
        r = self.conn.execute(s)
442
        row = r.fetchone()
443
        r.close()
444
        return row
445
    
446
    def statistics_update(self, node, population, size, mtime, cluster=0):
447
        """Update the statistics of the given node.
448
           Statistics keep track the population, total
449
           size of objects and mtime in the node's namespace.
450
           May be zero or positive or negative numbers.
451
        """
452
        s = select([self.statistics.c.population, self.statistics.c.size],
453
            and_(self.statistics.c.node == node,
454
                 self.statistics.c.cluster == cluster))
455
        rp = self.conn.execute(s)
456
        r = rp.fetchone()
457
        rp.close()
458
        if not r:
459
            prepopulation, presize = (0, 0)
460
        else:
461
            prepopulation, presize = r
462
        population += prepopulation
463
        size += presize
464
        
465
        #insert or replace
466
        #TODO better upsert
467
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
468
                                           self.statistics.c.cluster==cluster))
469
        u = u.values(population=population, size=size, mtime=mtime)
470
        rp = self.conn.execute(u)
471
        rp.close()
472
        if rp.rowcount == 0:
473
            ins = self.statistics.insert()
474
            ins = ins.values(node=node, population=population, size=size,
475
                             mtime=mtime, cluster=cluster)
476
            self.conn.execute(ins).close()
477
    
478
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
479
        """Update the statistics of the given node's parent.
480
           Then recursively update all parents up to the root.
481
           Population is not recursive.
482
        """
483
        
484
        while True:
485
            if node == ROOTNODE:
486
                break
487
            props = self.node_get_properties(node)
488
            if props is None:
489
                break
490
            parent, path = props
491
            self.statistics_update(parent, population, size, mtime, cluster)
492
            node = parent
493
            population = 0 # Population isn't recursive
494
    
495
    def statistics_latest(self, node, before=inf, except_cluster=0):
496
        """Return population, total size and last mtime
497
           for all latest versions under node that
498
           do not belong to the cluster.
499
        """
500
        
501
        # The node.
502
        props = self.node_get_properties(node)
503
        if props is None:
504
            return None
505
        parent, path = props
506
        
507
        # The latest version.
508
        s = select([self.versions.c.serial,
509
                    self.versions.c.node,
510
                    self.versions.c.hash,
511
                    self.versions.c.size,
512
                    self.versions.c.type,
513
                    self.versions.c.source,
514
                    self.versions.c.mtime,
515
                    self.versions.c.muser,
516
                    self.versions.c.uuid,
517
                    self.versions.c.checksum,
518
                    self.versions.c.cluster])
519
        if before != inf:
520
            filtered = select([func.max(self.versions.c.serial)],
521
                            self.versions.c.node == node)
522
            filtered = filtered.where(self.versions.c.mtime < before)
523
        else:
524
            filtered = select([self.nodes.c.latest_version],
525
                            self.versions.c.node == node)
526
        s = s.where(and_(self.versions.c.cluster != except_cluster,
527
                         self.versions.c.serial == filtered))
528
        r = self.conn.execute(s)
529
        props = r.fetchone()
530
        r.close()
531
        if not props:
532
            return None
533
        mtime = props[MTIME]
534
        
535
        # First level, just under node (get population).
536
        v = self.versions.alias('v')
537
        s = select([func.count(v.c.serial),
538
                    func.sum(v.c.size),
539
                    func.max(v.c.mtime)])
540
        if before != inf:
541
            c1 = select([func.max(self.versions.c.serial)])
542
            c1 = c1.where(self.versions.c.mtime < before)
543
            c1.where(self.versions.c.node == v.c.node)
544
        else:
545
            c1 = select([self.nodes.c.latest_version])
546
            c1.where(self.nodes.c.node == v.c.node)
547
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
548
        s = s.where(and_(v.c.serial == c1,
549
                         v.c.cluster != except_cluster,
550
                         v.c.node.in_(c2)))
551
        rp = self.conn.execute(s)
552
        r = rp.fetchone()
553
        rp.close()
554
        if not r:
555
            return None
556
        count = r[0]
557
        mtime = max(mtime, r[2])
558
        if count == 0:
559
            return (0, 0, mtime)
560
        
561
        # All children (get size and mtime).
562
        # This is why the full path is stored.
563
        s = select([func.count(v.c.serial),
564
                    func.sum(v.c.size),
565
                    func.max(v.c.mtime)])
566
        if before != inf:
567
            c1 = select([func.max(self.versions.c.serial)],
568
                    self.versions.c.node == v.c.node)
569
            c1 = c1.where(self.versions.c.mtime < before)
570
        else:
571
            c1 = select([self.nodes.c.serial],
572
                    self.nodes.c.node == v.c.node)
573
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
574
        s = s.where(and_(v.c.serial == c1,
575
                         v.c.cluster != except_cluster,
576
                         v.c.node.in_(c2)))
577
        rp = self.conn.execute(s)
578
        r = rp.fetchone()
579
        rp.close()
580
        if not r:
581
            return None
582
        size = r[1] - props[SIZE]
583
        mtime = max(mtime, r[2])
584
        return (count, size, mtime)
585
    
586
    def nodes_set_latest_version(self, node, serial):
587
        s = self.nodes.update().where(self.nodes.c.node == node)
588
        s = s.values(latest_version = serial)
589
        self.conn.execute(s).close()
590
    
591
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
592
        """Create a new version from the given properties.
593
           Return the (serial, mtime) of the new version.
594
        """
595
        
596
        mtime = time()
597
        s = self.versions.insert().values(node=node, hash=hash, size=size, type=type, source=source,
598
                                          mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
599
        serial = self.conn.execute(s).inserted_primary_key[0]
600
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
601
        
602
        self.nodes_set_latest_version(node, serial)
603
        
604
        return serial, mtime
605
    
606
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
607
        """Lookup the current version of the given node.
608
           Return a list with its properties:
609
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
610
           or None if the current version is not found in the given cluster.
611
        """
612
        
613
        v = self.versions.alias('v')
614
        if not all_props:
615
            s = select([v.c.serial])
616
        else:
617
            s = select([v.c.serial, v.c.node, v.c.hash,
618
                        v.c.size, v.c.type, v.c.source,
619
                        v.c.mtime, v.c.muser, v.c.uuid,
620
                        v.c.checksum, v.c.cluster])
621
        if before != inf:
622
            c = select([func.max(self.versions.c.serial)],
623
                self.versions.c.node == node)
624
            c = c.where(self.versions.c.mtime < before)
625
        else:
626
            c = select([self.nodes.c.latest_version],
627
                self.nodes.c.node == node)
628
        s = s.where(and_(v.c.serial == c,
629
                         v.c.cluster == cluster))
630
        r = self.conn.execute(s)
631
        props = r.fetchone()
632
        r.close()
633
        if props:
634
            return props
635
        return None
636
    
637
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
638
        """Lookup the current versions of the given nodes.
639
           Return a list with their properties:
640
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
641
        """
642
        if not nodes:
643
            return ()
644
        v = self.versions.alias('v')
645
        if not all_props:
646
            s = select([v.c.serial])
647
        else:
648
            s = select([v.c.serial, v.c.node, v.c.hash,
649
                        v.c.size, v.c.type, v.c.source,
650
                        v.c.mtime, v.c.muser, v.c.uuid,
651
                        v.c.checksum, v.c.cluster])
652
        if before != inf:
653
            c = select([func.max(self.versions.c.serial)],
654
                self.versions.c.node.in_(nodes))
655
            c = c.where(self.versions.c.mtime < before)
656
            c = c.group_by(self.versions.c.node)
657
        else:
658
            c = select([self.nodes.c.latest_version],
659
                self.nodes.c.node.in_(nodes))
660
        s = s.where(and_(v.c.serial.in_(c),
661
                         v.c.cluster == cluster))
662
        s = s.order_by(v.c.node)
663
        r = self.conn.execute(s)
664
        rproxy = r.fetchall()
665
        r.close()
666
        return (tuple(row.values()) for row in rproxy)
667
        
668
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
669
        """Return a sequence of values for the properties of
670
           the version specified by serial and the keys, in the order given.
671
           If keys is empty, return all properties in the order
672
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
673
        """
674
        
675
        v = self.versions.alias()
676
        s = select([v.c.serial, v.c.node, v.c.hash,
677
                    v.c.size, v.c.type, v.c.source,
678
                    v.c.mtime, v.c.muser, v.c.uuid,
679
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
680
        rp = self.conn.execute(s)
681
        r = rp.fetchone()
682
        rp.close()
683
        if r is None:
684
            return r
685
        
686
        if not keys:
687
            return r
688
        return [r[propnames[k]] for k in keys if k in propnames]
689
    
690
    def version_put_property(self, serial, key, value):
691
        """Set value for the property of version specified by key."""
692
        
693
        if key not in _propnames:
694
            return
695
        s = self.versions.update()
696
        s = s.where(self.versions.c.serial == serial)
697
        s = s.values(**{key: value})
698
        self.conn.execute(s).close()
699
    
700
    def version_recluster(self, serial, cluster):
701
        """Move the version into another cluster."""
702
        
703
        props = self.version_get_properties(serial)
704
        if not props:
705
            return
706
        node = props[NODE]
707
        size = props[SIZE]
708
        oldcluster = props[CLUSTER]
709
        if cluster == oldcluster:
710
            return
711
        
712
        mtime = time()
713
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
714
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
715
        
716
        s = self.versions.update()
717
        s = s.where(self.versions.c.serial == serial)
718
        s = s.values(cluster = cluster)
719
        self.conn.execute(s).close()
720
    
721
    def version_remove(self, serial):
722
        """Remove the serial specified."""
723
        
724
        props = self.version_get_properties(serial)
725
        if not props:
726
            return
727
        node = props[NODE]
728
        hash = props[HASH]
729
        size = props[SIZE]
730
        cluster = props[CLUSTER]
731
        
732
        mtime = time()
733
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
734
        
735
        s = self.versions.delete().where(self.versions.c.serial == serial)
736
        self.conn.execute(s).close()
737
        
738
        props = self.version_lookup(node, cluster=cluster, all_props=False)
739
        if props:
740
            self.nodes_set_latest_version(v.node, serial)
741
        
742
        return hash, size
743
    
744
    def attribute_get(self, serial, domain, keys=()):
745
        """Return a list of (key, value) pairs of the version specified by serial.
746
           If keys is empty, return all attributes.
747
           Othwerise, return only those specified.
748
        """
749
        
750
        if keys:
751
            attrs = self.attributes.alias()
752
            s = select([attrs.c.key, attrs.c.value])
753
            s = s.where(and_(attrs.c.key.in_(keys),
754
                             attrs.c.serial == serial,
755
                             attrs.c.domain == domain))
756
        else:
757
            attrs = self.attributes.alias()
758
            s = select([attrs.c.key, attrs.c.value])
759
            s = s.where(and_(attrs.c.serial == serial,
760
                             attrs.c.domain == domain))
761
        r = self.conn.execute(s)
762
        l = r.fetchall()
763
        r.close()
764
        return l
765
    
766
    def attribute_set(self, serial, domain, items):
767
        """Set the attributes of the version specified by serial.
768
           Receive attributes as an iterable of (key, value) pairs.
769
        """
770
        #insert or replace
771
        #TODO better upsert
772
        for k, v in items:
773
            s = self.attributes.update()
774
            s = s.where(and_(self.attributes.c.serial == serial,
775
                             self.attributes.c.domain == domain,
776
                             self.attributes.c.key == k))
777
            s = s.values(value = v)
778
            rp = self.conn.execute(s)
779
            rp.close()
780
            if rp.rowcount == 0:
781
                s = self.attributes.insert()
782
                s = s.values(serial=serial, domain=domain, key=k, value=v)
783
                self.conn.execute(s).close()
784
    
785
    def attribute_del(self, serial, domain, keys=()):
786
        """Delete attributes of the version specified by serial.
787
           If keys is empty, delete all attributes.
788
           Otherwise delete those specified.
789
        """
790
        
791
        if keys:
792
            #TODO more efficient way to do this?
793
            for key in keys:
794
                s = self.attributes.delete()
795
                s = s.where(and_(self.attributes.c.serial == serial,
796
                                 self.attributes.c.domain == domain,
797
                                 self.attributes.c.key == key))
798
                self.conn.execute(s).close()
799
        else:
800
            s = self.attributes.delete()
801
            s = s.where(and_(self.attributes.c.serial == serial,
802
                             self.attributes.c.domain == domain))
803
            self.conn.execute(s).close()
804
    
805
    def attribute_copy(self, source, dest):
806
        s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
807
            self.attributes.c.serial == source)
808
        rp = self.conn.execute(s)
809
        attributes = rp.fetchall()
810
        rp.close()
811
        for dest, domain, k, v in attributes:
812
            #insert or replace
813
            s = self.attributes.update().where(and_(
814
                self.attributes.c.serial == dest,
815
                self.attributes.c.domain == domain,
816
                self.attributes.c.key == k))
817
            rp = self.conn.execute(s, value=v)
818
            rp.close()
819
            if rp.rowcount == 0:
820
                s = self.attributes.insert()
821
                values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
822
                self.conn.execute(s, values).close()
823
    
824
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
825
        """Return a list with all keys pairs defined
826
           for all latest versions under parent that
827
           do not belong to the cluster.
828
        """
829
        
830
        # TODO: Use another table to store before=inf results.
831
        a = self.attributes.alias('a')
832
        v = self.versions.alias('v')
833
        n = self.nodes.alias('n')
834
        s = select([a.c.key]).distinct()
835
        if before != inf:
836
            filtered = select([func.max(self.versions.c.serial)])
837
            filtered = filtered.where(self.versions.c.mtime < before)
838
            filtered = filtered.where(self.versions.c.node == v.c.node)
839
        else:
840
            filtered = select([self.nodes.c.latest_version])
841
            filtered = filtered.where(self.nodes.c.node == v.c.node)
842
        s = s.where(v.c.serial == filtered)
843
        s = s.where(v.c.cluster != except_cluster)
844
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
845
            self.nodes.c.parent == parent)))
846
        s = s.where(a.c.serial == v.c.serial)
847
        s = s.where(a.c.domain == domain)
848
        s = s.where(n.c.node == v.c.node)
849
        conj = []
850
        for path, match in pathq:
851
            if match == MATCH_PREFIX:
852
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
853
            elif match == MATCH_EXACT:
854
                conj.append(n.c.path == path)
855
        if conj:
856
            s = s.where(or_(*conj))
857
        rp = self.conn.execute(s)
858
        rows = rp.fetchall()
859
        rp.close()
860
        return [r[0] for r in rows]
861
    
862
    def latest_version_list(self, parent, prefix='', delimiter=None,
863
                            start='', limit=10000, before=inf,
864
                            except_cluster=0, pathq=[], domain=None,
865
                            filterq=[], sizeq=None, all_props=False):
866
        """Return a (list of (path, serial) tuples, list of common prefixes)
867
           for the current versions of the paths with the given parent,
868
           matching the following criteria.
869
           
870
           The property tuple for a version is returned if all
871
           of these conditions are true:
872
                
873
                a. parent matches
874
                
875
                b. path > start
876
                
877
                c. path starts with prefix (and paths in pathq)
878
                
879
                d. version is the max up to before
880
                
881
                e. version is not in cluster
882
                
883
                f. the path does not have the delimiter occuring
884
                   after the prefix, or ends with the delimiter
885
                
886
                g. serial matches the attribute filter query.
887
                   
888
                   A filter query is a comma-separated list of
889
                   terms in one of these three forms:
890
                   
891
                   key
892
                       an attribute with this key must exist
893
                   
894
                   !key
895
                       an attribute with this key must not exist
896
                   
897
                   key ?op value
898
                       the attribute with this key satisfies the value
899
                       where ?op is one of ==, != <=, >=, <, >.
900
                
901
                h. the size is in the range set by sizeq
902
           
903
           The list of common prefixes includes the prefixes
904
           matching up to the first delimiter after prefix,
905
           and are reported only once, as "virtual directories".
906
           The delimiter is included in the prefixes.
907
           
908
           If arguments are None, then the corresponding matching rule
909
           will always match.
910
           
911
           Limit applies to the first list of tuples returned.
912
           
913
           If all_props is True, return all properties after path, not just serial.
914
        """
915
        
916
        if not start or start < prefix:
917
            start = strprevling(prefix)
918
        nextling = strnextling(prefix)
919
        
920
        v = self.versions.alias('v')
921
        n = self.nodes.alias('n')
922
        if not all_props:
923
            s = select([n.c.path, v.c.serial]).distinct()
924
        else:
925
            s = select([n.c.path,
926
                        v.c.serial, v.c.node, v.c.hash,
927
                        v.c.size, v.c.type, v.c.source,
928
                        v.c.mtime, v.c.muser, v.c.uuid,
929
                        v.c.checksum, v.c.cluster]).distinct()
930
        if before != inf:
931
            filtered = select([func.max(self.versions.c.serial)])
932
            filtered = filtered.where(self.versions.c.mtime < before)
933
        else:
934
            filtered = select([self.nodes.c.latest_version])
935
        s = s.where(v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
936
        s = s.where(v.c.cluster != except_cluster)
937
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
938
            self.nodes.c.parent == parent)))
939
        
940
        s = s.where(n.c.node == v.c.node)
941
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
942
        conj = []
943
        for path, match in pathq:
944
            if match == MATCH_PREFIX:
945
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
946
            elif match == MATCH_EXACT:
947
                conj.append(n.c.path == path)
948
        if conj:
949
            s = s.where(or_(*conj))
950
        
951
        if sizeq and len(sizeq) == 2:
952
            if sizeq[0]:
953
                s = s.where(v.c.size >= sizeq[0])
954
            if sizeq[1]:
955
                s = s.where(v.c.size < sizeq[1])
956
        
957
        if domain and filterq:
958
            a = self.attributes.alias('a')
959
            included, excluded, opers = parse_filters(filterq)
960
            if included:
961
                subs = select([1])
962
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
963
                subs = subs.where(a.c.domain == domain)
964
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
965
                s = s.where(exists(subs))
966
            if excluded:
967
                subs = select([1])
968
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
969
                subs = subs.where(a.c.domain == domain)
970
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
971
                s = s.where(not_(exists(subs)))
972
            if opers:
973
                for k, o, val in opers:
974
                    subs = select([1])
975
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
976
                    subs = subs.where(a.c.domain == domain)
977
                    subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
978
                    s = s.where(exists(subs))
979
        
980
        s = s.order_by(n.c.path)
981
        
982
        if not delimiter:
983
            s = s.limit(limit)
984
            rp = self.conn.execute(s, start=start)
985
            r = rp.fetchall()
986
            rp.close()
987
            return r, ()
988
        
989
        pfz = len(prefix)
990
        dz = len(delimiter)
991
        count = 0
992
        prefixes = []
993
        pappend = prefixes.append
994
        matches = []
995
        mappend = matches.append
996
        
997
        rp = self.conn.execute(s, start=start)
998
        while True:
999
            props = rp.fetchone()
1000
            if props is None:
1001
                break
1002
            path = props[0]
1003
            serial = props[1]
1004
            idx = path.find(delimiter, pfz)
1005
            
1006
            if idx < 0:
1007
                mappend(props)
1008
                count += 1
1009
                if count >= limit:
1010
                    break
1011
                continue
1012
            
1013
            if idx + dz == len(path):
1014
                mappend(props)
1015
                count += 1
1016
                continue # Get one more, in case there is a path.
1017
            pf = path[:idx + dz]
1018
            pappend(pf)
1019
            if count >= limit: 
1020
                break
1021
            
1022
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
1023
        rp.close()
1024
        
1025
        return matches, prefixes
1026
    
1027
    def latest_uuid(self, uuid):
1028
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
1029
        
1030
        v = self.versions.alias('v')
1031
        n = self.nodes.alias('n')
1032
        s = select([n.c.path, v.c.serial])
1033
        filtered = select([func.max(self.versions.c.serial)])
1034
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
1035
        s = s.where(n.c.node == v.c.node)
1036
        
1037
        r = self.conn.execute(s)
1038
        l = r.fetchone()
1039
        r.close()
1040
        return l