Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 235a4227

History | View | Annotate | Download (41.1 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41
from sqlalchemy.exc import NoSuchTableError
42

    
43
from dbworker import DBWorker, ESCAPE_CHAR
44

    
45
from pithos.backends.filter import parse_filters
46

    
47

    
48
ROOTNODE = 0
49

    
50
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
51
 CLUSTER) = range(11)
52

    
53
(MATCH_PREFIX, MATCH_EXACT) = range(2)
54

    
55
inf = float('inf')
56

    
57

    
58
def strnextling(prefix):
59
    """Return the first unicode string
60
       greater than but not starting with given prefix.
61
       strnextling('hello') -> 'hellp'
62
    """
63
    if not prefix:
64
        ## all strings start with the null string,
65
        ## therefore we have to approximate strnextling('')
66
        ## with the last unicode character supported by python
67
        ## 0x10ffff for wide (32-bit unicode) python builds
68
        ## 0x00ffff for narrow (16-bit unicode) python builds
69
        ## We will not autodetect. 0xffff is safe enough.
70
        return unichr(0xffff)
71
    s = prefix[:-1]
72
    c = ord(prefix[-1])
73
    if c >= 0xffff:
74
        raise RuntimeError
75
    s += unichr(c + 1)
76
    return s
77

    
78

    
79
def strprevling(prefix):
80
    """Return an approximation of the last unicode string
81
       less than but not starting with given prefix.
82
       strprevling(u'hello') -> u'helln\\xffff'
83
    """
84
    if not prefix:
85
        ## There is no prevling for the null string
86
        return prefix
87
    s = prefix[:-1]
88
    c = ord(prefix[-1])
89
    if c > 0:
90
        s += unichr(c - 1) + unichr(0xffff)
91
    return s
92

    
93
_propnames = {
94
    'serial': 0,
95
    'node': 1,
96
    'hash': 2,
97
    'size': 3,
98
    'type': 4,
99
    'source': 5,
100
    'mtime': 6,
101
    'muser': 7,
102
    'uuid': 8,
103
    'checksum': 9,
104
    'cluster': 10
105
}
106

    
107

    
108
def create_tables(engine):
109
    metadata = MetaData()
110

    
111
    #create nodes table
112
    columns = []
113
    columns.append(Column('node', Integer, primary_key=True))
114
    columns.append(Column('parent', Integer,
115
                          ForeignKey('nodes.node',
116
                                     ondelete='CASCADE',
117
                                     onupdate='CASCADE'),
118
                          autoincrement=False))
119
    columns.append(Column('latest_version', Integer))
120
    columns.append(Column('path', String(2048), default='', nullable=False))
121
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
122
    Index('idx_nodes_path', nodes.c.path, unique=True)
123
    Index('idx_nodes_parent', nodes.c.parent)
124

    
125
    #create policy table
126
    columns = []
127
    columns.append(Column('node', Integer,
128
                          ForeignKey('nodes.node',
129
                                     ondelete='CASCADE',
130
                                     onupdate='CASCADE'),
131
                          primary_key=True))
132
    columns.append(Column('key', String(128), primary_key=True))
133
    columns.append(Column('value', String(256)))
134
    policy = Table('policy', metadata, *columns, mysql_engine='InnoDB')
135

    
136
    #create statistics table
137
    columns = []
138
    columns.append(Column('node', Integer,
139
                          ForeignKey('nodes.node',
140
                                     ondelete='CASCADE',
141
                                     onupdate='CASCADE'),
142
                          primary_key=True))
143
    columns.append(Column('population', Integer, nullable=False, default=0))
144
    columns.append(Column('size', BigInteger, nullable=False, default=0))
145
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
146
    columns.append(Column('cluster', Integer, nullable=False, default=0,
147
                          primary_key=True, autoincrement=False))
148
    statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
149

    
150
    #create versions table
151
    columns = []
152
    columns.append(Column('serial', Integer, primary_key=True))
153
    columns.append(Column('node', Integer,
154
                          ForeignKey('nodes.node',
155
                                     ondelete='CASCADE',
156
                                     onupdate='CASCADE')))
157
    columns.append(Column('hash', String(256)))
158
    columns.append(Column('size', BigInteger, nullable=False, default=0))
159
    columns.append(Column('type', String(256), nullable=False, default=''))
160
    columns.append(Column('source', Integer))
161
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
162
    columns.append(Column('muser', String(256), nullable=False, default=''))
163
    columns.append(Column('uuid', String(64), nullable=False, default=''))
164
    columns.append(Column('checksum', String(256), nullable=False, default=''))
165
    columns.append(Column('cluster', Integer, nullable=False, default=0))
166
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
167
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
168
    Index('idx_versions_node_uuid', versions.c.uuid)
169

    
170
    #create attributes table
171
    columns = []
172
    columns.append(Column('serial', Integer,
173
                          ForeignKey('versions.serial',
174
                                     ondelete='CASCADE',
175
                                     onupdate='CASCADE'),
176
                          primary_key=True))
177
    columns.append(Column('domain', String(256), primary_key=True))
178
    columns.append(Column('key', String(128), primary_key=True))
179
    columns.append(Column('value', String(256)))
180
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
181

    
182
    metadata.create_all(engine)
183
    return metadata.sorted_tables
184

    
185

    
186
class Node(DBWorker):
187
    """Nodes store path organization and have multiple versions.
188
       Versions store object history and have multiple attributes.
189
       Attributes store metadata.
190
    """
191

    
192
    # TODO: Provide an interface for included and excluded clusters.
193

    
194
    def __init__(self, **params):
195
        DBWorker.__init__(self, **params)
196
        try:
197
            metadata = MetaData(self.engine)
198
            self.nodes = Table('nodes', metadata, autoload=True)
199
            self.policy = Table('policy', metadata, autoload=True)
200
            self.statistics = Table('statistics', metadata, autoload=True)
201
            self.versions = Table('versions', metadata, autoload=True)
202
            self.attributes = Table('attributes', metadata, autoload=True)
203
        except NoSuchTableError:
204
            tables = create_tables(self.engine)
205
            map(lambda t: self.__setattr__(t.name, t), tables)
206

    
207
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
208
                                           self.nodes.c.parent == ROOTNODE))
209
        wrapper = self.wrapper
210
        wrapper.execute()
211
        try:
212
            rp = self.conn.execute(s)
213
            r = rp.fetchone()
214
            rp.close()
215
            if not r:
216
                s = self.nodes.insert(
217
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
218
                self.conn.execute(s)
219
        finally:
220
            wrapper.commit()
221

    
222
    def node_create(self, parent, path):
223
        """Create a new node from the given properties.
224
           Return the node identifier of the new node.
225
        """
226
        #TODO catch IntegrityError?
227
        s = self.nodes.insert().values(parent=parent, path=path)
228
        r = self.conn.execute(s)
229
        inserted_primary_key = r.inserted_primary_key[0]
230
        r.close()
231
        return inserted_primary_key
232

    
233
    def node_lookup(self, path):
234
        """Lookup the current node of the given path.
235
           Return None if the path is not found.
236
        """
237

    
238
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
239
        s = select([self.nodes.c.node], self.nodes.c.path.like(
240
            self.escape_like(path), escape=ESCAPE_CHAR))
241
        r = self.conn.execute(s)
242
        row = r.fetchone()
243
        r.close()
244
        if row:
245
            return row[0]
246
        return None
247

    
248
    def node_lookup_bulk(self, paths):
249
        """Lookup the current nodes for the given paths.
250
           Return () if the path is not found.
251
        """
252

    
253
        if not paths:
254
            return ()
255
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
256
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
257
        r = self.conn.execute(s)
258
        rows = r.fetchall()
259
        r.close()
260
        return [row[0] for row in rows]
261

    
262
    def node_get_properties(self, node):
263
        """Return the node's (parent, path).
264
           Return None if the node is not found.
265
        """
266

    
267
        s = select([self.nodes.c.parent, self.nodes.c.path])
268
        s = s.where(self.nodes.c.node == node)
269
        r = self.conn.execute(s)
270
        l = r.fetchone()
271
        r.close()
272
        return l
273

    
274
    def node_get_versions(self, node, keys=(), propnames=_propnames):
275
        """Return the properties of all versions at node.
276
           If keys is empty, return all properties in the order
277
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
278
        """
279

    
280
        s = select([self.versions.c.serial,
281
                    self.versions.c.node,
282
                    self.versions.c.hash,
283
                    self.versions.c.size,
284
                    self.versions.c.type,
285
                    self.versions.c.source,
286
                    self.versions.c.mtime,
287
                    self.versions.c.muser,
288
                    self.versions.c.uuid,
289
                    self.versions.c.checksum,
290
                    self.versions.c.cluster], self.versions.c.node == node)
291
        s = s.order_by(self.versions.c.serial)
292
        r = self.conn.execute(s)
293
        rows = r.fetchall()
294
        r.close()
295
        if not rows:
296
            return rows
297

    
298
        if not keys:
299
            return rows
300

    
301
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
302

    
303
    def node_count_children(self, node):
304
        """Return node's child count."""
305

    
306
        s = select([func.count(self.nodes.c.node)])
307
        s = s.where(and_(self.nodes.c.parent == node,
308
                         self.nodes.c.node != ROOTNODE))
309
        r = self.conn.execute(s)
310
        row = r.fetchone()
311
        r.close()
312
        return row[0]
313

    
314
    def node_purge_children(self, parent, before=inf, cluster=0):
315
        """Delete all versions with the specified
316
           parent and cluster, and return
317
           the hashes, the total size and the serials of versions deleted.
318
           Clears out nodes with no remaining versions.
319
        """
320
        #update statistics
321
        c1 = select([self.nodes.c.node],
322
                    self.nodes.c.parent == parent)
323
        where_clause = and_(self.versions.c.node.in_(c1),
324
                            self.versions.c.cluster == cluster)
325
        if before != inf:
326
            where_clause = and_(where_clause,
327
                                self.versions.c.mtime <= before)
328
        s = select([func.count(self.versions.c.serial),
329
                    func.sum(self.versions.c.size)])
330
        s = s.where(where_clause)
331
        r = self.conn.execute(s)
332
        row = r.fetchone()
333
        r.close()
334
        if not row:
335
            return (), 0, ()
336
        nr, size = row[0], row[1] if row[1] else 0
337
        mtime = time()
338
        self.statistics_update(parent, -nr, -size, mtime, cluster)
339
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
340

    
341
        s = select([self.versions.c.hash, self.versions.c.serial])
342
        s = s.where(where_clause)
343
        r = self.conn.execute(s)
344
        hashes = []
345
        serials = []
346
        for row in r.fetchall():
347
            hashes += [row[0]]
348
            serials += [row[1]]
349
        r.close()
350

    
351
        #delete versions
352
        s = self.versions.delete().where(where_clause)
353
        r = self.conn.execute(s)
354
        r.close()
355

    
356
        #delete nodes
357
        s = select([self.nodes.c.node],
358
                   and_(self.nodes.c.parent == parent,
359
                        select([func.count(self.versions.c.serial)],
360
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
361
        rp = self.conn.execute(s)
362
        nodes = [r[0] for r in rp.fetchall()]
363
        rp.close()
364
        if nodes:
365
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
366
            self.conn.execute(s).close()
367

    
368
        return hashes, size, serials
369

    
370
    def node_purge(self, node, before=inf, cluster=0):
371
        """Delete all versions with the specified
372
           node and cluster, and return
373
           the hashes and size of versions deleted.
374
           Clears out the node if it has no remaining versions.
375
        """
376

    
377
        #update statistics
378
        s = select([func.count(self.versions.c.serial),
379
                    func.sum(self.versions.c.size)])
380
        where_clause = and_(self.versions.c.node == node,
381
                            self.versions.c.cluster == cluster)
382
        if before != inf:
383
            where_clause = and_(where_clause,
384
                                self.versions.c.mtime <= before)
385
        s = s.where(where_clause)
386
        r = self.conn.execute(s)
387
        row = r.fetchone()
388
        nr, size = row[0], row[1]
389
        r.close()
390
        if not nr:
391
            return (), 0, ()
392
        mtime = time()
393
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
394

    
395
        s = select([self.versions.c.hash, self.versions.c.serial])
396
        s = s.where(where_clause)
397
        r = self.conn.execute(s)
398
        hashes = []
399
        serials = []
400
        for row in r.fetchall():
401
            hashes += [row[0]]
402
            serials += [row[1]]
403
        r.close()
404

    
405
        #delete versions
406
        s = self.versions.delete().where(where_clause)
407
        r = self.conn.execute(s)
408
        r.close()
409

    
410
        #delete nodes
411
        s = select([self.nodes.c.node],
412
                   and_(self.nodes.c.node == node,
413
                        select([func.count(self.versions.c.serial)],
414
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
415
        rp= self.conn.execute(s)
416
        nodes = [r[0] for r in rp.fetchall()]
417
        rp.close()
418
        if nodes:
419
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
420
            self.conn.execute(s).close()
421

    
422
        return hashes, size, serials
423

    
424
    def node_remove(self, node):
425
        """Remove the node specified.
426
           Return false if the node has children or is not found.
427
        """
428

    
429
        if self.node_count_children(node):
430
            return False
431

    
432
        mtime = time()
433
        s = select([func.count(self.versions.c.serial),
434
                    func.sum(self.versions.c.size),
435
                    self.versions.c.cluster])
436
        s = s.where(self.versions.c.node == node)
437
        s = s.group_by(self.versions.c.cluster)
438
        r = self.conn.execute(s)
439
        for population, size, cluster in r.fetchall():
440
            self.statistics_update_ancestors(
441
                node, -population, -size, mtime, cluster)
442
        r.close()
443

    
444
        s = self.nodes.delete().where(self.nodes.c.node == node)
445
        self.conn.execute(s).close()
446
        return True
447

    
448
    def node_accounts(self):
449
        s = select([self.nodes.c.path])
450
        s = s.where(and_(self.nodes.c.node != 0, self.nodes.c.parent == 0))
451
        account_nodes = self.conn.execute(s).fetchall()
452
        return sorted(i[0] for i in account_nodes)
453

    
454
    def policy_get(self, node):
455
        s = select([self.policy.c.key, self.policy.c.value],
456
                   self.policy.c.node == node)
457
        r = self.conn.execute(s)
458
        d = dict(r.fetchall())
459
        r.close()
460
        return d
461

    
462
    def policy_set(self, node, policy):
463
        #insert or replace
464
        for k, v in policy.iteritems():
465
            s = self.policy.update().where(and_(self.policy.c.node == node,
466
                                                self.policy.c.key == k))
467
            s = s.values(value=v)
468
            rp = self.conn.execute(s)
469
            rp.close()
470
            if rp.rowcount == 0:
471
                s = self.policy.insert()
472
                values = {'node': node, 'key': k, 'value': v}
473
                r = self.conn.execute(s, values)
474
                r.close()
475

    
476
    def statistics_get(self, node, cluster=0):
477
        """Return population, total size and last mtime
478
           for all versions under node that belong to the cluster.
479
        """
480

    
481
        s = select([self.statistics.c.population,
482
                    self.statistics.c.size,
483
                    self.statistics.c.mtime])
484
        s = s.where(and_(self.statistics.c.node == node,
485
                         self.statistics.c.cluster == cluster))
486
        r = self.conn.execute(s)
487
        row = r.fetchone()
488
        r.close()
489
        return row
490

    
491
    def statistics_update(self, node, population, size, mtime, cluster=0):
492
        """Update the statistics of the given node.
493
           Statistics keep track the population, total
494
           size of objects and mtime in the node's namespace.
495
           May be zero or positive or negative numbers.
496
        """
497
        s = select([self.statistics.c.population, self.statistics.c.size],
498
                   and_(self.statistics.c.node == node,
499
                        self.statistics.c.cluster == cluster))
500
        rp = self.conn.execute(s)
501
        r = rp.fetchone()
502
        rp.close()
503
        if not r:
504
            prepopulation, presize = (0, 0)
505
        else:
506
            prepopulation, presize = r
507
        population += prepopulation
508
        population = max(population, 0)
509
        size += presize
510

    
511
        #insert or replace
512
        #TODO better upsert
513
        u = self.statistics.update().where(and_(self.statistics.c.node == node,
514
                                           self.statistics.c.cluster == cluster))
515
        u = u.values(population=population, size=size, mtime=mtime)
516
        rp = self.conn.execute(u)
517
        rp.close()
518
        if rp.rowcount == 0:
519
            ins = self.statistics.insert()
520
            ins = ins.values(node=node, population=population, size=size,
521
                             mtime=mtime, cluster=cluster)
522
            self.conn.execute(ins).close()
523

    
524
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
525
        """Update the statistics of the given node's parent.
526
           Then recursively update all parents up to the root.
527
           Population is not recursive.
528
        """
529

    
530
        while True:
531
            if node == ROOTNODE:
532
                break
533
            props = self.node_get_properties(node)
534
            if props is None:
535
                break
536
            parent, path = props
537
            self.statistics_update(parent, population, size, mtime, cluster)
538
            node = parent
539
            population = 0  # Population isn't recursive
540

    
541
    def statistics_latest(self, node, before=inf, except_cluster=0):
542
        """Return population, total size and last mtime
543
           for all latest versions under node that
544
           do not belong to the cluster.
545
        """
546

    
547
        # The node.
548
        props = self.node_get_properties(node)
549
        if props is None:
550
            return None
551
        parent, path = props
552

    
553
        # The latest version.
554
        s = select([self.versions.c.serial,
555
                    self.versions.c.node,
556
                    self.versions.c.hash,
557
                    self.versions.c.size,
558
                    self.versions.c.type,
559
                    self.versions.c.source,
560
                    self.versions.c.mtime,
561
                    self.versions.c.muser,
562
                    self.versions.c.uuid,
563
                    self.versions.c.checksum,
564
                    self.versions.c.cluster])
565
        if before != inf:
566
            filtered = select([func.max(self.versions.c.serial)],
567
                              self.versions.c.node == node)
568
            filtered = filtered.where(self.versions.c.mtime < before)
569
        else:
570
            filtered = select([self.nodes.c.latest_version],
571
                              self.versions.c.node == node)
572
        s = s.where(and_(self.versions.c.cluster != except_cluster,
573
                         self.versions.c.serial == filtered))
574
        r = self.conn.execute(s)
575
        props = r.fetchone()
576
        r.close()
577
        if not props:
578
            return None
579
        mtime = props[MTIME]
580

    
581
        # First level, just under node (get population).
582
        v = self.versions.alias('v')
583
        s = select([func.count(v.c.serial),
584
                    func.sum(v.c.size),
585
                    func.max(v.c.mtime)])
586
        if before != inf:
587
            c1 = select([func.max(self.versions.c.serial)])
588
            c1 = c1.where(self.versions.c.mtime < before)
589
            c1.where(self.versions.c.node == v.c.node)
590
        else:
591
            c1 = select([self.nodes.c.latest_version])
592
            c1.where(self.nodes.c.node == v.c.node)
593
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
594
        s = s.where(and_(v.c.serial == c1,
595
                         v.c.cluster != except_cluster,
596
                         v.c.node.in_(c2)))
597
        rp = self.conn.execute(s)
598
        r = rp.fetchone()
599
        rp.close()
600
        if not r:
601
            return None
602
        count = r[0]
603
        mtime = max(mtime, r[2])
604
        if count == 0:
605
            return (0, 0, mtime)
606

    
607
        # All children (get size and mtime).
608
        # This is why the full path is stored.
609
        s = select([func.count(v.c.serial),
610
                    func.sum(v.c.size),
611
                    func.max(v.c.mtime)])
612
        if before != inf:
613
            c1 = select([func.max(self.versions.c.serial)],
614
                        self.versions.c.node == v.c.node)
615
            c1 = c1.where(self.versions.c.mtime < before)
616
        else:
617
            c1 = select([self.nodes.c.serial],
618
                        self.nodes.c.node == v.c.node)
619
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(
620
            self.escape_like(path) + '%', escape=ESCAPE_CHAR))
621
        s = s.where(and_(v.c.serial == c1,
622
                         v.c.cluster != except_cluster,
623
                         v.c.node.in_(c2)))
624
        rp = self.conn.execute(s)
625
        r = rp.fetchone()
626
        rp.close()
627
        if not r:
628
            return None
629
        size = r[1] - props[SIZE]
630
        mtime = max(mtime, r[2])
631
        return (count, size, mtime)
632

    
633
    def nodes_set_latest_version(self, node, serial):
634
        s = self.nodes.update().where(self.nodes.c.node == node)
635
        s = s.values(latest_version=serial)
636
        self.conn.execute(s).close()
637

    
638
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
639
        """Create a new version from the given properties.
640
           Return the (serial, mtime) of the new version.
641
        """
642

    
643
        mtime = time()
644
        s = self.versions.insert(
645
        ).values(node=node, hash=hash, size=size, type=type, source=source,
646
                 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
647
        serial = self.conn.execute(s).inserted_primary_key[0]
648
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
649

    
650
        self.nodes_set_latest_version(node, serial)
651

    
652
        return serial, mtime
653

    
654
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
655
        """Lookup the current version of the given node.
656
           Return a list with its properties:
657
           (serial, node, hash, size, type, source, mtime,
658
            muser, uuid, checksum, cluster)
659
           or None if the current version is not found in the given cluster.
660
        """
661

    
662
        v = self.versions.alias('v')
663
        if not all_props:
664
            s = select([v.c.serial])
665
        else:
666
            s = select([v.c.serial, v.c.node, v.c.hash,
667
                        v.c.size, v.c.type, v.c.source,
668
                        v.c.mtime, v.c.muser, v.c.uuid,
669
                        v.c.checksum, v.c.cluster])
670
        if before != inf:
671
            c = select([func.max(self.versions.c.serial)],
672
                       self.versions.c.node == node)
673
            c = c.where(self.versions.c.mtime < before)
674
        else:
675
            c = select([self.nodes.c.latest_version],
676
                       self.nodes.c.node == node)
677
        s = s.where(and_(v.c.serial == c,
678
                         v.c.cluster == cluster))
679
        r = self.conn.execute(s)
680
        props = r.fetchone()
681
        r.close()
682
        if props:
683
            return props
684
        return None
685

    
686
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
687
        """Lookup the current versions of the given nodes.
688
           Return a list with their properties:
689
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
690
        """
691
        if not nodes:
692
            return ()
693
        v = self.versions.alias('v')
694
        if not all_props:
695
            s = select([v.c.serial])
696
        else:
697
            s = select([v.c.serial, v.c.node, v.c.hash,
698
                        v.c.size, v.c.type, v.c.source,
699
                        v.c.mtime, v.c.muser, v.c.uuid,
700
                        v.c.checksum, v.c.cluster])
701
        if before != inf:
702
            c = select([func.max(self.versions.c.serial)],
703
                       self.versions.c.node.in_(nodes))
704
            c = c.where(self.versions.c.mtime < before)
705
            c = c.group_by(self.versions.c.node)
706
        else:
707
            c = select([self.nodes.c.latest_version],
708
                       self.nodes.c.node.in_(nodes))
709
        s = s.where(and_(v.c.serial.in_(c),
710
                         v.c.cluster == cluster))
711
        s = s.order_by(v.c.node)
712
        r = self.conn.execute(s)
713
        rproxy = r.fetchall()
714
        r.close()
715
        return (tuple(row.values()) for row in rproxy)
716

    
717
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
718
        """Return a sequence of values for the properties of
719
           the version specified by serial and the keys, in the order given.
720
           If keys is empty, return all properties in the order
721
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
722
        """
723

    
724
        v = self.versions.alias()
725
        s = select([v.c.serial, v.c.node, v.c.hash,
726
                    v.c.size, v.c.type, v.c.source,
727
                    v.c.mtime, v.c.muser, v.c.uuid,
728
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
729
        rp = self.conn.execute(s)
730
        r = rp.fetchone()
731
        rp.close()
732
        if r is None:
733
            return r
734

    
735
        if not keys:
736
            return r
737
        return [r[propnames[k]] for k in keys if k in propnames]
738

    
739
    def version_put_property(self, serial, key, value):
740
        """Set value for the property of version specified by key."""
741

    
742
        if key not in _propnames:
743
            return
744
        s = self.versions.update()
745
        s = s.where(self.versions.c.serial == serial)
746
        s = s.values(**{key: value})
747
        self.conn.execute(s).close()
748

    
749
    def version_recluster(self, serial, cluster):
750
        """Move the version into another cluster."""
751

    
752
        props = self.version_get_properties(serial)
753
        if not props:
754
            return
755
        node = props[NODE]
756
        size = props[SIZE]
757
        oldcluster = props[CLUSTER]
758
        if cluster == oldcluster:
759
            return
760

    
761
        mtime = time()
762
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
763
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
764

    
765
        s = self.versions.update()
766
        s = s.where(self.versions.c.serial == serial)
767
        s = s.values(cluster=cluster)
768
        self.conn.execute(s).close()
769

    
770
    def version_remove(self, serial):
771
        """Remove the serial specified."""
772

    
773
        props = self.version_get_properties(serial)
774
        if not props:
775
            return
776
        node = props[NODE]
777
        hash = props[HASH]
778
        size = props[SIZE]
779
        cluster = props[CLUSTER]
780

    
781
        mtime = time()
782
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
783

    
784
        s = self.versions.delete().where(self.versions.c.serial == serial)
785
        self.conn.execute(s).close()
786

    
787
        props = self.version_lookup(node, cluster=cluster, all_props=False)
788
        if props:
789
            self.nodes_set_latest_version(node, serial)
790

    
791
        return hash, size
792

    
793
    def attribute_get(self, serial, domain, keys=()):
794
        """Return a list of (key, value) pairs of the version specified by serial.
795
           If keys is empty, return all attributes.
796
           Othwerise, return only those specified.
797
        """
798

    
799
        if keys:
800
            attrs = self.attributes.alias()
801
            s = select([attrs.c.key, attrs.c.value])
802
            s = s.where(and_(attrs.c.key.in_(keys),
803
                             attrs.c.serial == serial,
804
                             attrs.c.domain == domain))
805
        else:
806
            attrs = self.attributes.alias()
807
            s = select([attrs.c.key, attrs.c.value])
808
            s = s.where(and_(attrs.c.serial == serial,
809
                             attrs.c.domain == domain))
810
        r = self.conn.execute(s)
811
        l = r.fetchall()
812
        r.close()
813
        return l
814

    
815
    def attribute_set(self, serial, domain, items):
816
        """Set the attributes of the version specified by serial.
817
           Receive attributes as an iterable of (key, value) pairs.
818
        """
819
        #insert or replace
820
        #TODO better upsert
821
        for k, v in items:
822
            s = self.attributes.update()
823
            s = s.where(and_(self.attributes.c.serial == serial,
824
                             self.attributes.c.domain == domain,
825
                             self.attributes.c.key == k))
826
            s = s.values(value=v)
827
            rp = self.conn.execute(s)
828
            rp.close()
829
            if rp.rowcount == 0:
830
                s = self.attributes.insert()
831
                s = s.values(serial=serial, domain=domain, key=k, value=v)
832
                self.conn.execute(s).close()
833

    
834
    def attribute_del(self, serial, domain, keys=()):
835
        """Delete attributes of the version specified by serial.
836
           If keys is empty, delete all attributes.
837
           Otherwise delete those specified.
838
        """
839

    
840
        if keys:
841
            #TODO more efficient way to do this?
842
            for key in keys:
843
                s = self.attributes.delete()
844
                s = s.where(and_(self.attributes.c.serial == serial,
845
                                 self.attributes.c.domain == domain,
846
                                 self.attributes.c.key == key))
847
                self.conn.execute(s).close()
848
        else:
849
            s = self.attributes.delete()
850
            s = s.where(and_(self.attributes.c.serial == serial,
851
                             self.attributes.c.domain == domain))
852
            self.conn.execute(s).close()
853

    
854
    def attribute_copy(self, source, dest):
855
        s = select(
856
            [dest, self.attributes.c.domain,
857
                self.attributes.c.key, self.attributes.c.value],
858
            self.attributes.c.serial == source)
859
        rp = self.conn.execute(s)
860
        attributes = rp.fetchall()
861
        rp.close()
862
        for dest, domain, k, v in attributes:
863
            #insert or replace
864
            s = self.attributes.update().where(and_(
865
                self.attributes.c.serial == dest,
866
                self.attributes.c.domain == domain,
867
                self.attributes.c.key == k))
868
            rp = self.conn.execute(s, value=v)
869
            rp.close()
870
            if rp.rowcount == 0:
871
                s = self.attributes.insert()
872
                values = {'serial': dest, 'domain': domain,
873
                          'key': k, 'value': v}
874
                self.conn.execute(s, values).close()
875

    
876
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=None):
877
        """Return a list with all keys pairs defined
878
           for all latest versions under parent that
879
           do not belong to the cluster.
880
        """
881

    
882
        pathq = pathq or []
883

    
884
        # TODO: Use another table to store before=inf results.
885
        a = self.attributes.alias('a')
886
        v = self.versions.alias('v')
887
        n = self.nodes.alias('n')
888
        s = select([a.c.key]).distinct()
889
        if before != inf:
890
            filtered = select([func.max(self.versions.c.serial)])
891
            filtered = filtered.where(self.versions.c.mtime < before)
892
            filtered = filtered.where(self.versions.c.node == v.c.node)
893
        else:
894
            filtered = select([self.nodes.c.latest_version])
895
            filtered = filtered.where(self.nodes.c.node == v.c.node)
896
        s = s.where(v.c.serial == filtered)
897
        s = s.where(v.c.cluster != except_cluster)
898
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
899
                                        self.nodes.c.parent == parent)))
900
        s = s.where(a.c.serial == v.c.serial)
901
        s = s.where(a.c.domain == domain)
902
        s = s.where(n.c.node == v.c.node)
903
        conj = []
904
        for path, match in pathq:
905
            if match == MATCH_PREFIX:
906
                conj.append(
907
                    n.c.path.like(
908
                        self.escape_like(path) + '%',
909
                        escape=ESCAPE_CHAR
910
                    )
911
                )
912
            elif match == MATCH_EXACT:
913
                conj.append(n.c.path == path)
914
        if conj:
915
            s = s.where(or_(*conj))
916
        rp = self.conn.execute(s)
917
        rows = rp.fetchall()
918
        rp.close()
919
        return [r[0] for r in rows]
920

    
921
    def latest_version_list(self, parent, prefix='', delimiter=None,
922
                            start='', limit=10000, before=inf,
923
                            except_cluster=0, pathq=[], domain=None,
924
                            filterq=[], sizeq=None, all_props=False):
925
        """Return a (list of (path, serial) tuples, list of common prefixes)
926
           for the current versions of the paths with the given parent,
927
           matching the following criteria.
928

929
           The property tuple for a version is returned if all
930
           of these conditions are true:
931

932
                a. parent matches
933

934
                b. path > start
935

936
                c. path starts with prefix (and paths in pathq)
937

938
                d. version is the max up to before
939

940
                e. version is not in cluster
941

942
                f. the path does not have the delimiter occuring
943
                   after the prefix, or ends with the delimiter
944

945
                g. serial matches the attribute filter query.
946

947
                   A filter query is a comma-separated list of
948
                   terms in one of these three forms:
949

950
                   key
951
                       an attribute with this key must exist
952

953
                   !key
954
                       an attribute with this key must not exist
955

956
                   key ?op value
957
                       the attribute with this key satisfies the value
958
                       where ?op is one of ==, != <=, >=, <, >.
959

960
                h. the size is in the range set by sizeq
961

962
           The list of common prefixes includes the prefixes
963
           matching up to the first delimiter after prefix,
964
           and are reported only once, as "virtual directories".
965
           The delimiter is included in the prefixes.
966

967
           If arguments are None, then the corresponding matching rule
968
           will always match.
969

970
           Limit applies to the first list of tuples returned.
971

972
           If all_props is True, return all properties after path, not just serial.
973
        """
974

    
975
        if not start or start < prefix:
976
            start = strprevling(prefix)
977
        nextling = strnextling(prefix)
978

    
979
        v = self.versions.alias('v')
980
        n = self.nodes.alias('n')
981
        if not all_props:
982
            s = select([n.c.path, v.c.serial]).distinct()
983
        else:
984
            s = select([n.c.path,
985
                        v.c.serial, v.c.node, v.c.hash,
986
                        v.c.size, v.c.type, v.c.source,
987
                        v.c.mtime, v.c.muser, v.c.uuid,
988
                        v.c.checksum, v.c.cluster]).distinct()
989
        if before != inf:
990
            filtered = select([func.max(self.versions.c.serial)])
991
            filtered = filtered.where(self.versions.c.mtime < before)
992
        else:
993
            filtered = select([self.nodes.c.latest_version])
994
        s = s.where(
995
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
996
        s = s.where(v.c.cluster != except_cluster)
997
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
998
                                        self.nodes.c.parent == parent)))
999

    
1000
        s = s.where(n.c.node == v.c.node)
1001
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
1002
        conj = []
1003
        for path, match in pathq:
1004
            if match == MATCH_PREFIX:
1005
                conj.append(
1006
                    n.c.path.like(
1007
                        self.escape_like(path) + '%',
1008
                        escape=ESCAPE_CHAR
1009
                    )
1010
                )
1011
            elif match == MATCH_EXACT:
1012
                conj.append(n.c.path == path)
1013
        if conj:
1014
            s = s.where(or_(*conj))
1015

    
1016
        if sizeq and len(sizeq) == 2:
1017
            if sizeq[0]:
1018
                s = s.where(v.c.size >= sizeq[0])
1019
            if sizeq[1]:
1020
                s = s.where(v.c.size < sizeq[1])
1021

    
1022
        if domain and filterq:
1023
            a = self.attributes.alias('a')
1024
            included, excluded, opers = parse_filters(filterq)
1025
            if included:
1026
                subs = select([1])
1027
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1028
                subs = subs.where(a.c.domain == domain)
1029
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1030
                s = s.where(exists(subs))
1031
            if excluded:
1032
                subs = select([1])
1033
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1034
                subs = subs.where(a.c.domain == domain)
1035
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1036
                s = s.where(not_(exists(subs)))
1037
            if opers:
1038
                for k, o, val in opers:
1039
                    subs = select([1])
1040
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1041
                    subs = subs.where(a.c.domain == domain)
1042
                    subs = subs.where(
1043
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1044
                    s = s.where(exists(subs))
1045

    
1046
        s = s.order_by(n.c.path)
1047

    
1048
        if not delimiter:
1049
            s = s.limit(limit)
1050
            rp = self.conn.execute(s, start=start)
1051
            r = rp.fetchall()
1052
            rp.close()
1053
            return r, ()
1054

    
1055
        pfz = len(prefix)
1056
        dz = len(delimiter)
1057
        count = 0
1058
        prefixes = []
1059
        pappend = prefixes.append
1060
        matches = []
1061
        mappend = matches.append
1062

    
1063
        rp = self.conn.execute(s, start=start)
1064
        while True:
1065
            props = rp.fetchone()
1066
            if props is None:
1067
                break
1068
            path = props[0]
1069
            serial = props[1]
1070
            idx = path.find(delimiter, pfz)
1071

    
1072
            if idx < 0:
1073
                mappend(props)
1074
                count += 1
1075
                if count >= limit:
1076
                    break
1077
                continue
1078

    
1079
            if idx + dz == len(path):
1080
                mappend(props)
1081
                count += 1
1082
                continue  # Get one more, in case there is a path.
1083
            pf = path[:idx + dz]
1084
            pappend(pf)
1085
            if count >= limit:
1086
                break
1087

    
1088
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1089
        rp.close()
1090

    
1091
        return matches, prefixes
1092

    
1093
    def latest_uuid(self, uuid, cluster):
1094
        """Return the latest version of the given uuid and cluster.
1095

1096
        Return a (path, serial) tuple.
1097
        If cluster is None, all clusters are considered.
1098

1099
        """
1100

    
1101
        v = self.versions.alias('v')
1102
        n = self.nodes.alias('n')
1103
        s = select([n.c.path, v.c.serial])
1104
        filtered = select([func.max(self.versions.c.serial)])
1105
        filtered = filtered.where(self.versions.c.uuid == uuid)
1106
        if cluster is not None:
1107
            filtered = filtered.where(self.versions.c.cluster == cluster)
1108
        s = s.where(v.c.serial == filtered)
1109
        s = s.where(n.c.node == v.c.node)
1110

    
1111
        r = self.conn.execute(s)
1112
        l = r.fetchone()
1113
        r.close()
1114
        return l