Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 585b75e7

History | View | Annotate | Download (40 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41

    
42
from dbworker import DBWorker
43

    
44
from pithos.backends.filter import parse_filters
45

    
46

    
47
ROOTNODE  = 0
48

    
49
( SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM, CLUSTER ) = range(11)
50

    
51
( MATCH_PREFIX, MATCH_EXACT ) = range(2)
52

    
53
inf = float('inf')
54

    
55

    
56
def strnextling(prefix):
57
    """Return the first unicode string
58
       greater than but not starting with given prefix.
59
       strnextling('hello') -> 'hellp'
60
    """
61
    if not prefix:
62
        ## all strings start with the null string,
63
        ## therefore we have to approximate strnextling('')
64
        ## with the last unicode character supported by python
65
        ## 0x10ffff for wide (32-bit unicode) python builds
66
        ## 0x00ffff for narrow (16-bit unicode) python builds
67
        ## We will not autodetect. 0xffff is safe enough.
68
        return unichr(0xffff)
69
    s = prefix[:-1]
70
    c = ord(prefix[-1])
71
    if c >= 0xffff:
72
        raise RuntimeError
73
    s += unichr(c+1)
74
    return s
75

    
76
def strprevling(prefix):
77
    """Return an approximation of the last unicode string
78
       less than but not starting with given prefix.
79
       strprevling(u'hello') -> u'helln\\xffff'
80
    """
81
    if not prefix:
82
        ## There is no prevling for the null string
83
        return prefix
84
    s = prefix[:-1]
85
    c = ord(prefix[-1])
86
    if c > 0:
87
        s += unichr(c-1) + unichr(0xffff)
88
    return s
89

    
90
_propnames = {
91
    'serial'    : 0,
92
    'node'      : 1,
93
    'hash'      : 2,
94
    'size'      : 3,
95
    'type'      : 4,
96
    'source'    : 5,
97
    'mtime'     : 6,
98
    'muser'     : 7,
99
    'uuid'      : 8,
100
    'checksum'  : 9,
101
    'cluster'   : 10
102
}
103

    
104

    
105
class Node(DBWorker):
106
    """Nodes store path organization and have multiple versions.
107
       Versions store object history and have multiple attributes.
108
       Attributes store metadata.
109
    """
110
    
111
    # TODO: Provide an interface for included and excluded clusters.
112
    
113
    def __init__(self, **params):
114
        DBWorker.__init__(self, **params)
115
        metadata = MetaData()
116
        
117
        #create nodes table
118
        columns=[]
119
        columns.append(Column('node', Integer, primary_key=True))
120
        columns.append(Column('parent', Integer,
121
                              ForeignKey('nodes.node',
122
                                         ondelete='CASCADE',
123
                                         onupdate='CASCADE'),
124
                              autoincrement=False))
125
        columns.append(Column('latest_version', Integer))
126
        columns.append(Column('path', String(2048), default='', nullable=False))
127
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
128
        Index('idx_nodes_path', self.nodes.c.path, unique=True)
129
        Index('idx_nodes_parent', self.nodes.c.parent)
130
        
131
        #create policy table
132
        columns=[]
133
        columns.append(Column('node', Integer,
134
                              ForeignKey('nodes.node',
135
                                         ondelete='CASCADE',
136
                                         onupdate='CASCADE'),
137
                              primary_key=True))
138
        columns.append(Column('key', String(128), primary_key=True))
139
        columns.append(Column('value', String(256)))
140
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
141
        
142
        #create statistics table
143
        columns=[]
144
        columns.append(Column('node', Integer,
145
                              ForeignKey('nodes.node',
146
                                         ondelete='CASCADE',
147
                                         onupdate='CASCADE'),
148
                              primary_key=True))
149
        columns.append(Column('population', Integer, nullable=False, default=0))
150
        columns.append(Column('size', BigInteger, nullable=False, default=0))
151
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
152
        columns.append(Column('cluster', Integer, nullable=False, default=0,
153
                              primary_key=True, autoincrement=False))
154
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
155
        
156
        #create versions table
157
        columns=[]
158
        columns.append(Column('serial', Integer, primary_key=True))
159
        columns.append(Column('node', Integer,
160
                              ForeignKey('nodes.node',
161
                                         ondelete='CASCADE',
162
                                         onupdate='CASCADE')))
163
        columns.append(Column('hash', String(256)))
164
        columns.append(Column('size', BigInteger, nullable=False, default=0))
165
        columns.append(Column('type', String(256), nullable=False, default=''))
166
        columns.append(Column('source', Integer))
167
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
168
        columns.append(Column('muser', String(256), nullable=False, default=''))
169
        columns.append(Column('uuid', String(64), nullable=False, default=''))
170
        columns.append(Column('checksum', String(256), nullable=False, default=''))
171
        columns.append(Column('cluster', Integer, nullable=False, default=0))
172
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
173
        Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
174
        Index('idx_versions_node_uuid', self.versions.c.uuid)
175
        Index('idx_versions_serial_cluster', self.versions.c.serial, self.versions.c.cluster)
176
        
177
        #create attributes table
178
        columns = []
179
        columns.append(Column('serial', Integer,
180
                              ForeignKey('versions.serial',
181
                                         ondelete='CASCADE',
182
                                         onupdate='CASCADE'),
183
                              primary_key=True))
184
        columns.append(Column('domain', String(256), primary_key=True))
185
        columns.append(Column('key', String(128), primary_key=True))
186
        columns.append(Column('value', String(256)))
187
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
188
        
189
        metadata.create_all(self.engine)
190
        
191
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
192
                                           self.nodes.c.parent == ROOTNODE))
193
        rp = self.conn.execute(s)
194
        r = rp.fetchone()
195
        rp.close()
196
        if not r:
197
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
198
            self.conn.execute(s)
199
    
200
    def node_create(self, parent, path):
201
        """Create a new node from the given properties.
202
           Return the node identifier of the new node.
203
        """
204
        #TODO catch IntegrityError?
205
        s = self.nodes.insert().values(parent=parent, path=path)
206
        r = self.conn.execute(s)
207
        inserted_primary_key = r.inserted_primary_key[0]
208
        r.close()
209
        return inserted_primary_key
210
    
211
    def node_lookup(self, path):
212
        """Lookup the current node of the given path.
213
           Return None if the path is not found.
214
        """
215
        
216
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
217
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
218
        r = self.conn.execute(s)
219
        row = r.fetchone()
220
        r.close()
221
        if row:
222
            return row[0]
223
        return None
224
    
225
    def node_lookup_bulk(self, paths):
226
        """Lookup the current nodes for the given paths.
227
           Return () if the path is not found.
228
        """
229
        
230
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
231
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
232
        r = self.conn.execute(s)
233
        rows = r.fetchall()
234
        r.close()
235
        return [row[0] for row in rows]
236
    
237
    def node_get_properties(self, node):
238
        """Return the node's (parent, path).
239
           Return None if the node is not found.
240
        """
241
        
242
        s = select([self.nodes.c.parent, self.nodes.c.path])
243
        s = s.where(self.nodes.c.node == node)
244
        r = self.conn.execute(s)
245
        l = r.fetchone()
246
        r.close()
247
        return l
248
    
249
    def node_get_versions(self, node, keys=(), propnames=_propnames):
250
        """Return the properties of all versions at node.
251
           If keys is empty, return all properties in the order
252
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
253
        """
254
        
255
        s = select([self.versions.c.serial,
256
                    self.versions.c.node,
257
                    self.versions.c.hash,
258
                    self.versions.c.size,
259
                    self.versions.c.type,
260
                    self.versions.c.source,
261
                    self.versions.c.mtime,
262
                    self.versions.c.muser,
263
                    self.versions.c.uuid,
264
                    self.versions.c.checksum,
265
                    self.versions.c.cluster], self.versions.c.node == node)
266
        s = s.order_by(self.versions.c.serial)
267
        r = self.conn.execute(s)
268
        rows = r.fetchall()
269
        r.close()
270
        if not rows:
271
            return rows
272
        
273
        if not keys:
274
            return rows
275
        
276
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
277
    
278
    def node_count_children(self, node):
279
        """Return node's child count."""
280
        
281
        s = select([func.count(self.nodes.c.node)])
282
        s = s.where(and_(self.nodes.c.parent == node,
283
                         self.nodes.c.node != ROOTNODE))
284
        r = self.conn.execute(s)
285
        row = r.fetchone()
286
        r.close()
287
        return row[0]
288
    
289
    def node_purge_children(self, parent, before=inf, cluster=0):
290
        """Delete all versions with the specified
291
           parent and cluster, and return
292
           the hashes and size of versions deleted.
293
           Clears out nodes with no remaining versions.
294
        """
295
        #update statistics
296
        c1 = select([self.nodes.c.node],
297
            self.nodes.c.parent == parent)
298
        where_clause = and_(self.versions.c.node.in_(c1),
299
                            self.versions.c.cluster == cluster)
300
        s = select([func.count(self.versions.c.serial),
301
                    func.sum(self.versions.c.size)])
302
        s = s.where(where_clause)
303
        if before != inf:
304
            s = s.where(self.versions.c.mtime <= before)
305
        r = self.conn.execute(s)
306
        row = r.fetchone()
307
        r.close()
308
        if not row:
309
            return (), 0
310
        nr, size = row[0], -row[1] if row[1] else 0
311
        mtime = time()
312
        self.statistics_update(parent, -nr, size, mtime, cluster)
313
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
314
        
315
        s = select([self.versions.c.hash])
316
        s = s.where(where_clause)
317
        r = self.conn.execute(s)
318
        hashes = [row[0] for row in r.fetchall()]
319
        r.close()
320
        
321
        #delete versions
322
        s = self.versions.delete().where(where_clause)
323
        r = self.conn.execute(s)
324
        r.close()
325
        
326
        #delete nodes
327
        s = select([self.nodes.c.node],
328
            and_(self.nodes.c.parent == parent,
329
                 select([func.count(self.versions.c.serial)],
330
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
331
        rp = self.conn.execute(s)
332
        nodes = [r[0] for r in rp.fetchall()]
333
        rp.close()
334
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
335
        self.conn.execute(s).close()
336
        
337
        return hashes, size
338
    
339
    def node_purge(self, node, before=inf, cluster=0):
340
        """Delete all versions with the specified
341
           node and cluster, and return
342
           the hashes and size of versions deleted.
343
           Clears out the node if it has no remaining versions.
344
        """
345
        
346
        #update statistics
347
        s = select([func.count(self.versions.c.serial),
348
                    func.sum(self.versions.c.size)])
349
        where_clause = and_(self.versions.c.node == node,
350
                         self.versions.c.cluster == cluster)
351
        s = s.where(where_clause)
352
        if before != inf:
353
            s = s.where(self.versions.c.mtime <= before)
354
        r = self.conn.execute(s)
355
        row = r.fetchone()
356
        nr, size = row[0], row[1]
357
        r.close()
358
        if not nr:
359
            return (), 0
360
        mtime = time()
361
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
362
        
363
        s = select([self.versions.c.hash])
364
        s = s.where(where_clause)
365
        r = self.conn.execute(s)
366
        hashes = [r[0] for r in r.fetchall()]
367
        r.close()
368
        
369
        #delete versions
370
        s = self.versions.delete().where(where_clause)
371
        r = self.conn.execute(s)
372
        r.close()
373
        
374
        #delete nodes
375
        s = select([self.nodes.c.node],
376
            and_(self.nodes.c.node == node,
377
                 select([func.count(self.versions.c.serial)],
378
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
379
        r = self.conn.execute(s)
380
        nodes = r.fetchall()
381
        r.close()
382
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
383
        self.conn.execute(s).close()
384
        
385
        return hashes, size
386
    
387
    def node_remove(self, node):
388
        """Remove the node specified.
389
           Return false if the node has children or is not found.
390
        """
391
        
392
        if self.node_count_children(node):
393
            return False
394
        
395
        mtime = time()
396
        s = select([func.count(self.versions.c.serial),
397
                    func.sum(self.versions.c.size),
398
                    self.versions.c.cluster])
399
        s = s.where(self.versions.c.node == node)
400
        s = s.group_by(self.versions.c.cluster)
401
        r = self.conn.execute(s)
402
        for population, size, cluster in r.fetchall():
403
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
404
        r.close()
405
        
406
        s = self.nodes.delete().where(self.nodes.c.node == node)
407
        self.conn.execute(s).close()
408
        return True
409
    
410
    def policy_get(self, node):
411
        s = select([self.policies.c.key, self.policies.c.value],
412
            self.policies.c.node==node)
413
        r = self.conn.execute(s)
414
        d = dict(r.fetchall())
415
        r.close()
416
        return d
417
    
418
    def policy_set(self, node, policy):
419
        #insert or replace
420
        for k, v in policy.iteritems():
421
            s = self.policies.update().where(and_(self.policies.c.node == node,
422
                                                  self.policies.c.key == k))
423
            s = s.values(value = v)
424
            rp = self.conn.execute(s)
425
            rp.close()
426
            if rp.rowcount == 0:
427
                s = self.policies.insert()
428
                values = {'node':node, 'key':k, 'value':v}
429
                r = self.conn.execute(s, values)
430
                r.close()
431
    
432
    def statistics_get(self, node, cluster=0):
433
        """Return population, total size and last mtime
434
           for all versions under node that belong to the cluster.
435
        """
436
        
437
        s = select([self.statistics.c.population,
438
                    self.statistics.c.size,
439
                    self.statistics.c.mtime])
440
        s = s.where(and_(self.statistics.c.node == node,
441
                         self.statistics.c.cluster == cluster))
442
        r = self.conn.execute(s)
443
        row = r.fetchone()
444
        r.close()
445
        return row
446
    
447
    def statistics_update(self, node, population, size, mtime, cluster=0):
448
        """Update the statistics of the given node.
449
           Statistics keep track the population, total
450
           size of objects and mtime in the node's namespace.
451
           May be zero or positive or negative numbers.
452
        """
453
        s = select([self.statistics.c.population, self.statistics.c.size],
454
            and_(self.statistics.c.node == node,
455
                 self.statistics.c.cluster == cluster))
456
        rp = self.conn.execute(s)
457
        r = rp.fetchone()
458
        rp.close()
459
        if not r:
460
            prepopulation, presize = (0, 0)
461
        else:
462
            prepopulation, presize = r
463
        population += prepopulation
464
        size += presize
465
        
466
        #insert or replace
467
        #TODO better upsert
468
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
469
                                           self.statistics.c.cluster==cluster))
470
        u = u.values(population=population, size=size, mtime=mtime)
471
        rp = self.conn.execute(u)
472
        rp.close()
473
        if rp.rowcount == 0:
474
            ins = self.statistics.insert()
475
            ins = ins.values(node=node, population=population, size=size,
476
                             mtime=mtime, cluster=cluster)
477
            self.conn.execute(ins).close()
478
    
479
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
480
        """Update the statistics of the given node's parent.
481
           Then recursively update all parents up to the root.
482
           Population is not recursive.
483
        """
484
        
485
        while True:
486
            if node == ROOTNODE:
487
                break
488
            props = self.node_get_properties(node)
489
            if props is None:
490
                break
491
            parent, path = props
492
            self.statistics_update(parent, population, size, mtime, cluster)
493
            node = parent
494
            population = 0 # Population isn't recursive
495
    
496
    def statistics_latest(self, node, before=inf, except_cluster=0):
497
        """Return population, total size and last mtime
498
           for all latest versions under node that
499
           do not belong to the cluster.
500
        """
501
        
502
        # The node.
503
        props = self.node_get_properties(node)
504
        if props is None:
505
            return None
506
        parent, path = props
507
        
508
        # The latest version.
509
        s = select([self.versions.c.serial,
510
                    self.versions.c.node,
511
                    self.versions.c.hash,
512
                    self.versions.c.size,
513
                    self.versions.c.type,
514
                    self.versions.c.source,
515
                    self.versions.c.mtime,
516
                    self.versions.c.muser,
517
                    self.versions.c.uuid,
518
                    self.versions.c.checksum,
519
                    self.versions.c.cluster])
520
        if before != inf:
521
            filtered = select([func.max(self.versions.c.serial)],
522
                            self.versions.c.node == node)
523
            filtered = filtered.where(self.versions.c.mtime < before)
524
        else:
525
            filtered = select([self.nodes.c.latest_version],
526
                            self.versions.c.node == node)
527
        s = s.where(and_(self.versions.c.cluster != except_cluster,
528
                         self.versions.c.serial == filtered))
529
        r = self.conn.execute(s)
530
        props = r.fetchone()
531
        r.close()
532
        if not props:
533
            return None
534
        mtime = props[MTIME]
535
        
536
        # First level, just under node (get population).
537
        v = self.versions.alias('v')
538
        s = select([func.count(v.c.serial),
539
                    func.sum(v.c.size),
540
                    func.max(v.c.mtime)])
541
        if before != inf:
542
            c1 = select([func.max(self.versions.c.serial)])
543
            c1 = c1.where(self.versions.c.mtime < before)
544
            c1.where(self.versions.c.node == v.c.node)
545
        else:
546
            c1 = select([self.nodes.c.latest_version])
547
            c1.where(self.nodes.c.node == v.c.node)
548
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
549
        s = s.where(and_(v.c.serial == c1,
550
                         v.c.cluster != except_cluster,
551
                         v.c.node.in_(c2)))
552
        rp = self.conn.execute(s)
553
        r = rp.fetchone()
554
        rp.close()
555
        if not r:
556
            return None
557
        count = r[0]
558
        mtime = max(mtime, r[2])
559
        if count == 0:
560
            return (0, 0, mtime)
561
        
562
        # All children (get size and mtime).
563
        # This is why the full path is stored.
564
        s = select([func.count(v.c.serial),
565
                    func.sum(v.c.size),
566
                    func.max(v.c.mtime)])
567
        if before != inf:
568
            c1 = select([func.max(self.versions.c.serial)],
569
                    self.versions.c.node == v.c.node)
570
            c1 = c1.where(self.versions.c.mtime < before)
571
        else:
572
            c1 = select([self.nodes.c.serial],
573
                    self.nodes.c.node == v.c.node)
574
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
575
        s = s.where(and_(v.c.serial == c1,
576
                         v.c.cluster != except_cluster,
577
                         v.c.node.in_(c2)))
578
        rp = self.conn.execute(s)
579
        r = rp.fetchone()
580
        rp.close()
581
        if not r:
582
            return None
583
        size = r[1] - props[SIZE]
584
        mtime = max(mtime, r[2])
585
        return (count, size, mtime)
586
    
587
    def nodes_set_latest_version(self, node, serial):
588
        s = self.nodes.update().where(self.nodes.c.node == node)
589
        s = s.values(latest_version = serial)
590
        self.conn.execute(s).close()
591
    
592
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
593
        """Create a new version from the given properties.
594
           Return the (serial, mtime) of the new version.
595
        """
596
        
597
        mtime = time()
598
        s = self.versions.insert().values(node=node, hash=hash, size=size, type=type, source=source,
599
                                          mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
600
        serial = self.conn.execute(s).inserted_primary_key[0]
601
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
602
        
603
        self.nodes_set_latest_version(node, serial)
604
        
605
        return serial, mtime
606
    
607
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
608
        """Lookup the current version of the given node.
609
           Return a list with its properties:
610
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster)
611
           or None if the current version is not found in the given cluster.
612
        """
613
        
614
        v = self.versions.alias('v')
615
        if not all_props:
616
            s = select([v.c.serial])
617
        else:
618
            s = select([v.c.serial, v.c.node, v.c.hash,
619
                        v.c.size, v.c.type, v.c.source,
620
                        v.c.mtime, v.c.muser, v.c.uuid,
621
                        v.c.checksum, v.c.cluster])
622
        if before != inf:
623
            c = select([func.max(self.versions.c.serial)],
624
                self.versions.c.node == node)
625
            c = c.where(self.versions.c.mtime < before)
626
        else:
627
            c = select([self.nodes.c.latest_version],
628
                self.nodes.c.node == node)
629
        s = s.where(and_(v.c.serial == c,
630
                         v.c.cluster == cluster))
631
        r = self.conn.execute(s)
632
        props = r.fetchone()
633
        r.close()
634
        if props:
635
            return props
636
        return None
637
    
638
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
639
        """Lookup the current versions of the given nodes.
640
           Return a list with their properties:
641
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
642
        """
643
        if not nodes:
644
            return ()
645
        v = self.versions.alias('v')
646
        if not all_props:
647
            s = select([v.c.serial])
648
        else:
649
            s = select([v.c.serial, v.c.node, v.c.hash,
650
                        v.c.size, v.c.type, v.c.source,
651
                        v.c.mtime, v.c.muser, v.c.uuid,
652
                        v.c.checksum, v.c.cluster])
653
        if before != inf:
654
            c = select([func.max(self.versions.c.serial)],
655
                self.versions.c.node.in_(nodes))
656
            c = c.where(self.versions.c.mtime < before)
657
            c = c.group_by(self.versions.c.node)
658
        else:
659
            c = select([self.nodes.c.latest_version],
660
                self.nodes.c.node.in_(nodes))
661
        s = s.where(and_(v.c.serial.in_(c),
662
                         v.c.cluster == cluster))
663
        s = s.order_by(v.c.node)
664
        r = self.conn.execute(s)
665
        rproxy = r.fetchall()
666
        r.close()
667
        return (tuple(row.values()) for row in rproxy)
668
        
669
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
670
        """Return a sequence of values for the properties of
671
           the version specified by serial and the keys, in the order given.
672
           If keys is empty, return all properties in the order
673
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
674
        """
675
        
676
        v = self.versions.alias()
677
        s = select([v.c.serial, v.c.node, v.c.hash,
678
                    v.c.size, v.c.type, v.c.source,
679
                    v.c.mtime, v.c.muser, v.c.uuid,
680
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
681
        rp = self.conn.execute(s)
682
        r = rp.fetchone()
683
        rp.close()
684
        if r is None:
685
            return r
686
        
687
        if not keys:
688
            return r
689
        return [r[propnames[k]] for k in keys if k in propnames]
690
    
691
    def version_put_property(self, serial, key, value):
692
        """Set value for the property of version specified by key."""
693
        
694
        if key not in _propnames:
695
            return
696
        s = self.versions.update()
697
        s = s.where(self.versions.c.serial == serial)
698
        s = s.values(**{key: value})
699
        self.conn.execute(s).close()
700
    
701
    def version_recluster(self, serial, cluster):
702
        """Move the version into another cluster."""
703
        
704
        props = self.version_get_properties(serial)
705
        if not props:
706
            return
707
        node = props[NODE]
708
        size = props[SIZE]
709
        oldcluster = props[CLUSTER]
710
        if cluster == oldcluster:
711
            return
712
        
713
        mtime = time()
714
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
715
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
716
        
717
        s = self.versions.update()
718
        s = s.where(self.versions.c.serial == serial)
719
        s = s.values(cluster = cluster)
720
        self.conn.execute(s).close()
721
    
722
    def version_remove(self, serial):
723
        """Remove the serial specified."""
724
        
725
        props = self.version_get_properties(serial)
726
        if not props:
727
            return
728
        node = props[NODE]
729
        hash = props[HASH]
730
        size = props[SIZE]
731
        cluster = props[CLUSTER]
732
        
733
        mtime = time()
734
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
735
        
736
        s = self.versions.delete().where(self.versions.c.serial == serial)
737
        self.conn.execute(s).close()
738
        
739
        props = self.version_lookup(node, cluster=cluster, all_props=False)
740
        if props:
741
            self.nodes_set_latest_version(v.node, serial)
742
        
743
        return hash, size
744
    
745
    def attribute_get(self, serial, domain, keys=()):
746
        """Return a list of (key, value) pairs of the version specified by serial.
747
           If keys is empty, return all attributes.
748
           Othwerise, return only those specified.
749
        """
750
        
751
        if keys:
752
            attrs = self.attributes.alias()
753
            s = select([attrs.c.key, attrs.c.value])
754
            s = s.where(and_(attrs.c.key.in_(keys),
755
                             attrs.c.serial == serial,
756
                             attrs.c.domain == domain))
757
        else:
758
            attrs = self.attributes.alias()
759
            s = select([attrs.c.key, attrs.c.value])
760
            s = s.where(and_(attrs.c.serial == serial,
761
                             attrs.c.domain == domain))
762
        r = self.conn.execute(s)
763
        l = r.fetchall()
764
        r.close()
765
        return l
766
    
767
    def attribute_set(self, serial, domain, items):
768
        """Set the attributes of the version specified by serial.
769
           Receive attributes as an iterable of (key, value) pairs.
770
        """
771
        #insert or replace
772
        #TODO better upsert
773
        for k, v in items:
774
            s = self.attributes.update()
775
            s = s.where(and_(self.attributes.c.serial == serial,
776
                             self.attributes.c.domain == domain,
777
                             self.attributes.c.key == k))
778
            s = s.values(value = v)
779
            rp = self.conn.execute(s)
780
            rp.close()
781
            if rp.rowcount == 0:
782
                s = self.attributes.insert()
783
                s = s.values(serial=serial, domain=domain, key=k, value=v)
784
                self.conn.execute(s).close()
785
    
786
    def attribute_del(self, serial, domain, keys=()):
787
        """Delete attributes of the version specified by serial.
788
           If keys is empty, delete all attributes.
789
           Otherwise delete those specified.
790
        """
791
        
792
        if keys:
793
            #TODO more efficient way to do this?
794
            for key in keys:
795
                s = self.attributes.delete()
796
                s = s.where(and_(self.attributes.c.serial == serial,
797
                                 self.attributes.c.domain == domain,
798
                                 self.attributes.c.key == key))
799
                self.conn.execute(s).close()
800
        else:
801
            s = self.attributes.delete()
802
            s = s.where(and_(self.attributes.c.serial == serial,
803
                             self.attributes.c.domain == domain))
804
            self.conn.execute(s).close()
805
    
806
    def attribute_copy(self, source, dest):
807
        s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
808
            self.attributes.c.serial == source)
809
        rp = self.conn.execute(s)
810
        attributes = rp.fetchall()
811
        rp.close()
812
        for dest, domain, k, v in attributes:
813
            #insert or replace
814
            s = self.attributes.update().where(and_(
815
                self.attributes.c.serial == dest,
816
                self.attributes.c.domain == domain,
817
                self.attributes.c.key == k))
818
            rp = self.conn.execute(s, value=v)
819
            rp.close()
820
            if rp.rowcount == 0:
821
                s = self.attributes.insert()
822
                values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
823
                self.conn.execute(s, values).close()
824
    
825
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
826
        """Return a list with all keys pairs defined
827
           for all latest versions under parent that
828
           do not belong to the cluster.
829
        """
830
        
831
        # TODO: Use another table to store before=inf results.
832
        a = self.attributes.alias('a')
833
        v = self.versions.alias('v')
834
        n = self.nodes.alias('n')
835
        s = select([a.c.key]).distinct()
836
        if before != inf:
837
            filtered = select([func.max(self.versions.c.serial)])
838
            filtered = filtered.where(self.versions.c.mtime < before)
839
            filtered = filtered.where(self.versions.c.node == v.c.node)
840
        else:
841
            filtered = select([self.nodes.c.latest_version])
842
            filtered = filtered.where(self.nodes.c.node == v.c.node)
843
        s = s.where(v.c.serial == filtered)
844
        s = s.where(v.c.cluster != except_cluster)
845
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
846
            self.nodes.c.parent == parent)))
847
        s = s.where(a.c.serial == v.c.serial)
848
        s = s.where(a.c.domain == domain)
849
        s = s.where(n.c.node == v.c.node)
850
        conj = []
851
        for path, match in pathq:
852
            if match == MATCH_PREFIX:
853
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
854
            elif match == MATCH_EXACT:
855
                conj.append(n.c.path == path)
856
        if conj:
857
            s = s.where(or_(*conj))
858
        rp = self.conn.execute(s)
859
        rows = rp.fetchall()
860
        rp.close()
861
        return [r[0] for r in rows]
862
    
863
    def latest_version_list(self, parent, prefix='', delimiter=None,
864
                            start='', limit=10000, before=inf,
865
                            except_cluster=0, pathq=[], domain=None,
866
                            filterq=[], sizeq=None, all_props=False):
867
        """Return a (list of (path, serial) tuples, list of common prefixes)
868
           for the current versions of the paths with the given parent,
869
           matching the following criteria.
870
           
871
           The property tuple for a version is returned if all
872
           of these conditions are true:
873
                
874
                a. parent matches
875
                
876
                b. path > start
877
                
878
                c. path starts with prefix (and paths in pathq)
879
                
880
                d. version is the max up to before
881
                
882
                e. version is not in cluster
883
                
884
                f. the path does not have the delimiter occuring
885
                   after the prefix, or ends with the delimiter
886
                
887
                g. serial matches the attribute filter query.
888
                   
889
                   A filter query is a comma-separated list of
890
                   terms in one of these three forms:
891
                   
892
                   key
893
                       an attribute with this key must exist
894
                   
895
                   !key
896
                       an attribute with this key must not exist
897
                   
898
                   key ?op value
899
                       the attribute with this key satisfies the value
900
                       where ?op is one of ==, != <=, >=, <, >.
901
                
902
                h. the size is in the range set by sizeq
903
           
904
           The list of common prefixes includes the prefixes
905
           matching up to the first delimiter after prefix,
906
           and are reported only once, as "virtual directories".
907
           The delimiter is included in the prefixes.
908
           
909
           If arguments are None, then the corresponding matching rule
910
           will always match.
911
           
912
           Limit applies to the first list of tuples returned.
913
           
914
           If all_props is True, return all properties after path, not just serial.
915
        """
916
        
917
        if not start or start < prefix:
918
            start = strprevling(prefix)
919
        nextling = strnextling(prefix)
920
        
921
        v = self.versions.alias('v')
922
        n = self.nodes.alias('n')
923
        if not all_props:
924
            s = select([n.c.path, v.c.serial]).distinct()
925
        else:
926
            s = select([n.c.path,
927
                        v.c.serial, v.c.node, v.c.hash,
928
                        v.c.size, v.c.type, v.c.source,
929
                        v.c.mtime, v.c.muser, v.c.uuid,
930
                        v.c.checksum, v.c.cluster]).distinct()
931
        if before != inf:
932
            filtered = select([func.max(self.versions.c.serial)])
933
            filtered = filtered.where(self.versions.c.mtime < before)
934
        else:
935
            filtered = select([self.nodes.c.latest_version])
936
        s = s.where(v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
937
        s = s.where(v.c.cluster != except_cluster)
938
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
939
            self.nodes.c.parent == parent)))
940
        
941
        s = s.where(n.c.node == v.c.node)
942
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
943
        conj = []
944
        for path, match in pathq:
945
            if match == MATCH_PREFIX:
946
                conj.append(n.c.path.like(self.escape_like(path) + '%', escape='\\'))
947
            elif match == MATCH_EXACT:
948
                conj.append(n.c.path == path)
949
        if conj:
950
            s = s.where(or_(*conj))
951
        
952
        if sizeq and len(sizeq) == 2:
953
            if sizeq[0]:
954
                s = s.where(v.c.size >= sizeq[0])
955
            if sizeq[1]:
956
                s = s.where(v.c.size < sizeq[1])
957
        
958
        if domain and filterq:
959
            a = self.attributes.alias('a')
960
            included, excluded, opers = parse_filters(filterq)
961
            if included:
962
                subs = select([1])
963
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
964
                subs = subs.where(a.c.domain == domain)
965
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
966
                s = s.where(exists(subs))
967
            if excluded:
968
                subs = select([1])
969
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
970
                subs = subs.where(a.c.domain == domain)
971
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
972
                s = s.where(not_(exists(subs)))
973
            if opers:
974
                for k, o, val in opers:
975
                    subs = select([1])
976
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
977
                    subs = subs.where(a.c.domain == domain)
978
                    subs = subs.where(and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
979
                    s = s.where(exists(subs))
980
        
981
        s = s.order_by(n.c.path)
982
        
983
        if not delimiter:
984
            s = s.limit(limit)
985
            rp = self.conn.execute(s, start=start)
986
            r = rp.fetchall()
987
            rp.close()
988
            return r, ()
989
        
990
        pfz = len(prefix)
991
        dz = len(delimiter)
992
        count = 0
993
        prefixes = []
994
        pappend = prefixes.append
995
        matches = []
996
        mappend = matches.append
997
        
998
        rp = self.conn.execute(s, start=start)
999
        while True:
1000
            props = rp.fetchone()
1001
            if props is None:
1002
                break
1003
            path = props[0]
1004
            serial = props[1]
1005
            idx = path.find(delimiter, pfz)
1006
            
1007
            if idx < 0:
1008
                mappend(props)
1009
                count += 1
1010
                if count >= limit:
1011
                    break
1012
                continue
1013
            
1014
            if idx + dz == len(path):
1015
                mappend(props)
1016
                count += 1
1017
                continue # Get one more, in case there is a path.
1018
            pf = path[:idx + dz]
1019
            pappend(pf)
1020
            if count >= limit: 
1021
                break
1022
            
1023
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
1024
        rp.close()
1025
        
1026
        return matches, prefixes
1027
    
1028
    def latest_uuid(self, uuid):
1029
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
1030
        
1031
        v = self.versions.alias('v')
1032
        n = self.nodes.alias('n')
1033
        s = select([n.c.path, v.c.serial])
1034
        filtered = select([func.max(self.versions.c.serial)])
1035
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
1036
        s = s.where(n.c.node == v.c.node)
1037
        
1038
        r = self.conn.execute(s)
1039
        l = r.fetchone()
1040
        r.close()
1041
        return l