Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ ad9ada51

History | View | Annotate | Download (45.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.types import Text
41
from sqlalchemy.schema import Index, Sequence
42
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
43
from sqlalchemy.ext.compiler import compiles
44
from sqlalchemy.engine.reflection import Inspector
45
from sqlalchemy.exc import NoSuchTableError
46

    
47
from dbworker import DBWorker, ESCAPE_CHAR
48

    
49
from pithos.backends.filter import parse_filters
50

    
51

    
52
ROOTNODE = 0
53

    
54
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
55
 CLUSTER) = range(11)
56

    
57
(MATCH_PREFIX, MATCH_EXACT) = range(2)
58

    
59
inf = float('inf')
60

    
61

    
62
def strnextling(prefix):
63
    """Return the first unicode string
64
       greater than but not starting with given prefix.
65
       strnextling('hello') -> 'hellp'
66
    """
67
    if not prefix:
68
        ## all strings start with the null string,
69
        ## therefore we have to approximate strnextling('')
70
        ## with the last unicode character supported by python
71
        ## 0x10ffff for wide (32-bit unicode) python builds
72
        ## 0x00ffff for narrow (16-bit unicode) python builds
73
        ## We will not autodetect. 0xffff is safe enough.
74
        return unichr(0xffff)
75
    s = prefix[:-1]
76
    c = ord(prefix[-1])
77
    if c >= 0xffff:
78
        raise RuntimeError
79
    s += unichr(c + 1)
80
    return s
81

    
82

    
83
def strprevling(prefix):
84
    """Return an approximation of the last unicode string
85
       less than but not starting with given prefix.
86
       strprevling(u'hello') -> u'helln\\xffff'
87
    """
88
    if not prefix:
89
        ## There is no prevling for the null string
90
        return prefix
91
    s = prefix[:-1]
92
    c = ord(prefix[-1])
93
    if c > 0:
94
        s += unichr(c - 1) + unichr(0xffff)
95
    return s
96

    
97
_propnames = {
98
    'serial': 0,
99
    'node': 1,
100
    'hash': 2,
101
    'size': 3,
102
    'type': 4,
103
    'source': 5,
104
    'mtime': 6,
105
    'muser': 7,
106
    'uuid': 8,
107
    'checksum': 9,
108
    'cluster': 10
109
}
110

    
111

    
112
def create_tables(engine):
113
    metadata = MetaData()
114

    
115
    #create nodes table
116
    columns = []
117
    columns.append(Column('node', Integer, primary_key=True))
118
    columns.append(Column('parent', Integer,
119
                          ForeignKey('nodes.node',
120
                                     ondelete='CASCADE',
121
                                     onupdate='CASCADE'),
122
                          autoincrement=False))
123
    columns.append(Column('latest_version', Integer))
124
    columns.append(Column('path', String(2048), default='', nullable=False))
125
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
126
    Index('idx_nodes_path', nodes.c.path, unique=True)
127
    Index('idx_nodes_parent', nodes.c.parent)
128

    
129
    #create policy table
130
    columns = []
131
    columns.append(Column('node', Integer,
132
                          ForeignKey('nodes.node',
133
                                     ondelete='CASCADE',
134
                                     onupdate='CASCADE'),
135
                          primary_key=True))
136
    columns.append(Column('key', String(128), primary_key=True))
137
    columns.append(Column('value', String(256)))
138
    policy = Table('policy', metadata, *columns, mysql_engine='InnoDB')
139

    
140
    #create statistics table
141
    columns = []
142
    columns.append(Column('node', Integer,
143
                          ForeignKey('nodes.node',
144
                                     ondelete='CASCADE',
145
                                     onupdate='CASCADE'),
146
                          primary_key=True))
147
    columns.append(Column('population', Integer, nullable=False, default=0))
148
    columns.append(Column('size', BigInteger, nullable=False, default=0))
149
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
150
    columns.append(Column('cluster', Integer, nullable=False, default=0,
151
                          primary_key=True, autoincrement=False))
152
    statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
153

    
154
    #create versions table
155
    columns = []
156
    columns.append(Column('serial', Integer, primary_key=True))
157
    columns.append(Column('node', Integer,
158
                          ForeignKey('nodes.node',
159
                                     ondelete='CASCADE',
160
                                     onupdate='CASCADE')))
161
    columns.append(Column('hash', String(256)))
162
    columns.append(Column('size', BigInteger, nullable=False, default=0))
163
    columns.append(Column('type', String(256), nullable=False, default=''))
164
    columns.append(Column('source', Integer))
165
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
166
    columns.append(Column('muser', String(256), nullable=False, default=''))
167
    columns.append(Column('uuid', String(64), nullable=False, default=''))
168
    columns.append(Column('checksum', String(256), nullable=False, default=''))
169
    columns.append(Column('cluster', Integer, nullable=False, default=0))
170
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
171
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
172
    Index('idx_versions_node_uuid', versions.c.uuid)
173

    
174
    #create attributes table
175
    columns = []
176
    columns.append(Column('serial', Integer,
177
                          ForeignKey('versions.serial',
178
                                     ondelete='CASCADE',
179
                                     onupdate='CASCADE'),
180
                          primary_key=True))
181
    columns.append(Column('domain', String(256), primary_key=True))
182
    columns.append(Column('key', String(128), primary_key=True))
183
    columns.append(Column('value', String(256)))
184
    columns.append(Column('node', Integer, nullable=False, default=0))
185
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
186
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
187
    Index('idx_attributes_domain', attributes.c.domain)
188
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
189

    
190
    metadata.create_all(engine)
191
    return metadata.sorted_tables
192

    
193

    
194
class Node(DBWorker):
195
    """Nodes store path organization and have multiple versions.
196
       Versions store object history and have multiple attributes.
197
       Attributes store metadata.
198
    """
199

    
200
    # TODO: Provide an interface for included and excluded clusters.
201

    
202
    def __init__(self, **params):
203
        DBWorker.__init__(self, **params)
204
        try:
205
            metadata = MetaData(self.engine)
206
            self.nodes = Table('nodes', metadata, autoload=True)
207
            self.policy = Table('policy', metadata, autoload=True)
208
            self.statistics = Table('statistics', metadata, autoload=True)
209
            self.versions = Table('versions', metadata, autoload=True)
210
            self.attributes = Table('attributes', metadata, autoload=True)
211
        except NoSuchTableError:
212
            tables = create_tables(self.engine)
213
            map(lambda t: self.__setattr__(t.name, t), tables)
214

    
215
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
216
                                           self.nodes.c.parent == ROOTNODE))
217
        wrapper = self.wrapper
218
        wrapper.execute()
219
        try:
220
            rp = self.conn.execute(s)
221
            r = rp.fetchone()
222
            rp.close()
223
            if not r:
224
                s = self.nodes.insert(
225
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
226
                self.conn.execute(s)
227
        finally:
228
            wrapper.commit()
229

    
230
    def node_create(self, parent, path):
231
        """Create a new node from the given properties.
232
           Return the node identifier of the new node.
233
        """
234
        #TODO catch IntegrityError?
235
        s = self.nodes.insert().values(parent=parent, path=path)
236
        r = self.conn.execute(s)
237
        inserted_primary_key = r.inserted_primary_key[0]
238
        r.close()
239
        return inserted_primary_key
240

    
241
    def node_lookup(self, path, for_update=False):
242
        """Lookup the current node of the given path.
243
           Return None if the path is not found.
244
        """
245

    
246
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
247
        s = select([self.nodes.c.node], self.nodes.c.path.like(
248
            self.escape_like(path), escape=ESCAPE_CHAR), for_update=for_update)
249
        r = self.conn.execute(s)
250
        row = r.fetchone()
251
        r.close()
252
        if row:
253
            return row[0]
254
        return None
255

    
256
    def node_lookup_bulk(self, paths):
257
        """Lookup the current nodes for the given paths.
258
           Return () if the path is not found.
259
        """
260

    
261
        if not paths:
262
            return ()
263
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
264
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
265
        r = self.conn.execute(s)
266
        rows = r.fetchall()
267
        r.close()
268
        return [row[0] for row in rows]
269

    
270
    def node_get_properties(self, node):
271
        """Return the node's (parent, path).
272
           Return None if the node is not found.
273
        """
274

    
275
        s = select([self.nodes.c.parent, self.nodes.c.path])
276
        s = s.where(self.nodes.c.node == node)
277
        r = self.conn.execute(s)
278
        l = r.fetchone()
279
        r.close()
280
        return l
281

    
282
    def node_get_versions(self, node, keys=(), propnames=_propnames):
283
        """Return the properties of all versions at node.
284
           If keys is empty, return all properties in the order
285
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
286
        """
287

    
288
        s = select([self.versions.c.serial,
289
                    self.versions.c.node,
290
                    self.versions.c.hash,
291
                    self.versions.c.size,
292
                    self.versions.c.type,
293
                    self.versions.c.source,
294
                    self.versions.c.mtime,
295
                    self.versions.c.muser,
296
                    self.versions.c.uuid,
297
                    self.versions.c.checksum,
298
                    self.versions.c.cluster], self.versions.c.node == node)
299
        s = s.order_by(self.versions.c.serial)
300
        r = self.conn.execute(s)
301
        rows = r.fetchall()
302
        r.close()
303
        if not rows:
304
            return rows
305

    
306
        if not keys:
307
            return rows
308

    
309
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
310

    
311
    def node_count_children(self, node):
312
        """Return node's child count."""
313

    
314
        s = select([func.count(self.nodes.c.node)])
315
        s = s.where(and_(self.nodes.c.parent == node,
316
                         self.nodes.c.node != ROOTNODE))
317
        r = self.conn.execute(s)
318
        row = r.fetchone()
319
        r.close()
320
        return row[0]
321

    
322
    def node_purge_children(self, parent, before=inf, cluster=0,
323
                            update_statistics_ancestors_depth=None):
324
        """Delete all versions with the specified
325
           parent and cluster, and return
326
           the hashes, the total size and the serials of versions deleted.
327
           Clears out nodes with no remaining versions.
328
        """
329
        #update statistics
330
        c1 = select([self.nodes.c.node],
331
                    self.nodes.c.parent == parent)
332
        where_clause = and_(self.versions.c.node.in_(c1),
333
                            self.versions.c.cluster == cluster)
334
        if before != inf:
335
            where_clause = and_(where_clause,
336
                                self.versions.c.mtime <= before)
337
        s = select([func.count(self.versions.c.serial),
338
                    func.sum(self.versions.c.size)])
339
        s = s.where(where_clause)
340
        r = self.conn.execute(s)
341
        row = r.fetchone()
342
        r.close()
343
        if not row:
344
            return (), 0, ()
345
        nr, size = row[0], row[1] if row[1] else 0
346
        mtime = time()
347
        self.statistics_update(parent, -nr, -size, mtime, cluster)
348
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
349
                                         update_statistics_ancestors_depth)
350

    
351
        s = select([self.versions.c.hash, self.versions.c.serial])
352
        s = s.where(where_clause)
353
        r = self.conn.execute(s)
354
        hashes = []
355
        serials = []
356
        for row in r.fetchall():
357
            hashes += [row[0]]
358
            serials += [row[1]]
359
        r.close()
360

    
361
        #delete versions
362
        s = self.versions.delete().where(where_clause)
363
        r = self.conn.execute(s)
364
        r.close()
365

    
366
        #delete nodes
367
        s = select([self.nodes.c.node],
368
                   and_(self.nodes.c.parent == parent,
369
                        select([func.count(self.versions.c.serial)],
370
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
371
        rp = self.conn.execute(s)
372
        nodes = [r[0] for r in rp.fetchall()]
373
        rp.close()
374
        if nodes:
375
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
376
            self.conn.execute(s).close()
377

    
378
        return hashes, size, serials
379

    
380
    def node_purge(self, node, before=inf, cluster=0,
381
                   update_statistics_ancestors_depth=None):
382
        """Delete all versions with the specified
383
           node and cluster, and return
384
           the hashes and size of versions deleted.
385
           Clears out the node if it has no remaining versions.
386
        """
387

    
388
        #update statistics
389
        s = select([func.count(self.versions.c.serial),
390
                    func.sum(self.versions.c.size)])
391
        where_clause = and_(self.versions.c.node == node,
392
                            self.versions.c.cluster == cluster)
393
        if before != inf:
394
            where_clause = and_(where_clause,
395
                                self.versions.c.mtime <= before)
396
        s = s.where(where_clause)
397
        r = self.conn.execute(s)
398
        row = r.fetchone()
399
        nr, size = row[0], row[1]
400
        r.close()
401
        if not nr:
402
            return (), 0, ()
403
        mtime = time()
404
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
405
                                         update_statistics_ancestors_depth)
406

    
407
        s = select([self.versions.c.hash, self.versions.c.serial])
408
        s = s.where(where_clause)
409
        r = self.conn.execute(s)
410
        hashes = []
411
        serials = []
412
        for row in r.fetchall():
413
            hashes += [row[0]]
414
            serials += [row[1]]
415
        r.close()
416

    
417
        #delete versions
418
        s = self.versions.delete().where(where_clause)
419
        r = self.conn.execute(s)
420
        r.close()
421

    
422
        #delete nodes
423
        s = select([self.nodes.c.node],
424
                   and_(self.nodes.c.node == node,
425
                        select([func.count(self.versions.c.serial)],
426
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
427
        rp= self.conn.execute(s)
428
        nodes = [r[0] for r in rp.fetchall()]
429
        rp.close()
430
        if nodes:
431
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
432
            self.conn.execute(s).close()
433

    
434
        return hashes, size, serials
435

    
436
    def node_remove(self, node, update_statistics_ancestors_depth=None):
437
        """Remove the node specified.
438
           Return false if the node has children or is not found.
439
        """
440

    
441
        if self.node_count_children(node):
442
            return False
443

    
444
        mtime = time()
445
        s = select([func.count(self.versions.c.serial),
446
                    func.sum(self.versions.c.size),
447
                    self.versions.c.cluster])
448
        s = s.where(self.versions.c.node == node)
449
        s = s.group_by(self.versions.c.cluster)
450
        r = self.conn.execute(s)
451
        for population, size, cluster in r.fetchall():
452
            self.statistics_update_ancestors(
453
                node, -population, -size, mtime, cluster,
454
                update_statistics_ancestors_depth)
455
        r.close()
456

    
457
        s = self.nodes.delete().where(self.nodes.c.node == node)
458
        self.conn.execute(s).close()
459
        return True
460

    
461
    def node_accounts(self, accounts=()):
462
        s = select([self.nodes.c.path, self.nodes.c.node])
463
        s = s.where(and_(self.nodes.c.node != 0,
464
                         self.nodes.c.parent == 0))
465
        if accounts:
466
            s = s.where(self.nodes.c.path.in_(accounts))
467
        r = self.conn.execute(s)
468
        rows = r.fetchall()
469
        r.close()
470
        return rows
471

    
472
    def node_account_quotas(self):
473
        s = select([self.nodes.c.path, self.policy.c.value])
474
        s = s.where(and_(self.nodes.c.node != 0,
475
                         self.nodes.c.parent == 0))
476
        s = s.where(self.nodes.c.node == self.policy.c.node)
477
        s = s.where(self.policy.c.key == 'quota')
478
        r = self.conn.execute(s)
479
        rows = r.fetchall()
480
        r.close()
481
        return dict(rows)
482

    
483
    def node_account_usage(self, account_node, cluster):
484
        select_children = select(
485
            [self.nodes.c.node]).where(self.nodes.c.parent == account_node)
486
        select_descendants = select([self.nodes.c.node]).where(
487
            or_(self.nodes.c.parent.in_(select_children),
488
                self.nodes.c.node.in_(select_children)))
489
        s = select([func.sum(self.versions.c.size)])
490
        s = s.where(self.nodes.c.node == self.versions.c.node)
491
        s = s.where(self.nodes.c.node.in_(select_descendants))
492
        s = s.where(self.versions.c.cluster == cluster)
493
        r = self.conn.execute(s)
494
        usage = r.fetchone()[0]
495
        r.close()
496
        return usage
497

    
498
    def policy_get(self, node):
499
        s = select([self.policy.c.key, self.policy.c.value],
500
                   self.policy.c.node == node)
501
        r = self.conn.execute(s)
502
        d = dict(r.fetchall())
503
        r.close()
504
        return d
505

    
506
    def policy_set(self, node, policy):
507
        #insert or replace
508
        for k, v in policy.iteritems():
509
            s = self.policy.update().where(and_(self.policy.c.node == node,
510
                                                self.policy.c.key == k))
511
            s = s.values(value=v)
512
            rp = self.conn.execute(s)
513
            rp.close()
514
            if rp.rowcount == 0:
515
                s = self.policy.insert()
516
                values = {'node': node, 'key': k, 'value': v}
517
                r = self.conn.execute(s, values)
518
                r.close()
519

    
520
    def statistics_get(self, node, cluster=0):
521
        """Return population, total size and last mtime
522
           for all versions under node that belong to the cluster.
523
        """
524

    
525
        s = select([self.statistics.c.population,
526
                    self.statistics.c.size,
527
                    self.statistics.c.mtime])
528
        s = s.where(and_(self.statistics.c.node == node,
529
                         self.statistics.c.cluster == cluster))
530
        r = self.conn.execute(s)
531
        row = r.fetchone()
532
        r.close()
533
        return row
534

    
535
    def statistics_update(self, node, population, size, mtime, cluster=0):
536
        """Update the statistics of the given node.
537
           Statistics keep track the population, total
538
           size of objects and mtime in the node's namespace.
539
           May be zero or positive or negative numbers.
540
        """
541
        s = select([self.statistics.c.population, self.statistics.c.size],
542
                   and_(self.statistics.c.node == node,
543
                        self.statistics.c.cluster == cluster))
544
        rp = self.conn.execute(s)
545
        r = rp.fetchone()
546
        rp.close()
547
        if not r:
548
            prepopulation, presize = (0, 0)
549
        else:
550
            prepopulation, presize = r
551
        population += prepopulation
552
        population = max(population, 0)
553
        size += presize
554

    
555
        #insert or replace
556
        #TODO better upsert
557
        u = self.statistics.update().where(and_(self.statistics.c.node == node,
558
                                           self.statistics.c.cluster == cluster))
559
        u = u.values(population=population, size=size, mtime=mtime)
560
        rp = self.conn.execute(u)
561
        rp.close()
562
        if rp.rowcount == 0:
563
            ins = self.statistics.insert()
564
            ins = ins.values(node=node, population=population, size=size,
565
                             mtime=mtime, cluster=cluster)
566
            self.conn.execute(ins).close()
567

    
568
    def statistics_update_ancestors(self, node, population, size, mtime,
569
                                    cluster=0, recursion_depth=None):
570
        """Update the statistics of the given node's parent.
571
           Then recursively update all parents up to the root
572
           or up to the ``recursion_depth`` (if not None).
573
           Population is not recursive.
574
        """
575

    
576
        i = 0
577
        while True:
578
            if node == ROOTNODE:
579
                break
580
            if recursion_depth and recursion_depth <= i:
581
                break
582
            props = self.node_get_properties(node)
583
            if props is None:
584
                break
585
            parent, path = props
586
            self.statistics_update(parent, population, size, mtime, cluster)
587
            node = parent
588
            population = 0  # Population isn't recursive
589
            i += 1
590

    
591
    def statistics_latest(self, node, before=inf, except_cluster=0):
592
        """Return population, total size and last mtime
593
           for all latest versions under node that
594
           do not belong to the cluster.
595
        """
596

    
597
        # The node.
598
        props = self.node_get_properties(node)
599
        if props is None:
600
            return None
601
        parent, path = props
602

    
603
        # The latest version.
604
        s = select([self.versions.c.serial,
605
                    self.versions.c.node,
606
                    self.versions.c.hash,
607
                    self.versions.c.size,
608
                    self.versions.c.type,
609
                    self.versions.c.source,
610
                    self.versions.c.mtime,
611
                    self.versions.c.muser,
612
                    self.versions.c.uuid,
613
                    self.versions.c.checksum,
614
                    self.versions.c.cluster])
615
        if before != inf:
616
            filtered = select([func.max(self.versions.c.serial)],
617
                              self.versions.c.node == node)
618
            filtered = filtered.where(self.versions.c.mtime < before)
619
        else:
620
            filtered = select([self.nodes.c.latest_version],
621
                              self.nodes.c.node == node)
622
        s = s.where(and_(self.versions.c.cluster != except_cluster,
623
                         self.versions.c.serial == filtered))
624
        r = self.conn.execute(s)
625
        props = r.fetchone()
626
        r.close()
627
        if not props:
628
            return None
629
        mtime = props[MTIME]
630

    
631
        # First level, just under node (get population).
632
        v = self.versions.alias('v')
633
        s = select([func.count(v.c.serial),
634
                    func.sum(v.c.size),
635
                    func.max(v.c.mtime)])
636
        if before != inf:
637
            c1 = select([func.max(self.versions.c.serial)])
638
            c1 = c1.where(self.versions.c.mtime < before)
639
            c1.where(self.versions.c.node == v.c.node)
640
        else:
641
            c1 = select([self.nodes.c.latest_version])
642
            c1 = c1.where(self.nodes.c.node == v.c.node)
643
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
644
        s = s.where(and_(v.c.serial == c1,
645
                         v.c.cluster != except_cluster,
646
                         v.c.node.in_(c2)))
647
        rp = self.conn.execute(s)
648
        r = rp.fetchone()
649
        rp.close()
650
        if not r:
651
            return None
652
        count = r[0]
653
        mtime = max(mtime, r[2])
654
        if count == 0:
655
            return (0, 0, mtime)
656

    
657
        # All children (get size and mtime).
658
        # This is why the full path is stored.
659
        s = select([func.count(v.c.serial),
660
                    func.sum(v.c.size),
661
                    func.max(v.c.mtime)])
662
        if before != inf:
663
            c1 = select([func.max(self.versions.c.serial)],
664
                        self.versions.c.node == v.c.node)
665
            c1 = c1.where(self.versions.c.mtime < before)
666
        else:
667
            c1 = select([self.nodes.c.latest_version],
668
                        self.nodes.c.node == v.c.node)
669
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(
670
            self.escape_like(path) + '%', escape=ESCAPE_CHAR))
671
        s = s.where(and_(v.c.serial == c1,
672
                         v.c.cluster != except_cluster,
673
                         v.c.node.in_(c2)))
674
        rp = self.conn.execute(s)
675
        r = rp.fetchone()
676
        rp.close()
677
        if not r:
678
            return None
679
        size = r[1] - props[SIZE]
680
        mtime = max(mtime, r[2])
681
        return (count, size, mtime)
682

    
683
    def nodes_set_latest_version(self, node, serial):
684
        s = self.nodes.update().where(self.nodes.c.node == node)
685
        s = s.values(latest_version=serial)
686
        self.conn.execute(s).close()
687

    
688
    def version_create(self, node, hash, size, type, source, muser, uuid,
689
                       checksum, cluster=0,
690
                       update_statistics_ancestors_depth=None):
691
        """Create a new version from the given properties.
692
           Return the (serial, mtime) of the new version.
693
        """
694

    
695
        mtime = time()
696
        s = self.versions.insert(
697
        ).values(node=node, hash=hash, size=size, type=type, source=source,
698
                 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
699
        serial = self.conn.execute(s).inserted_primary_key[0]
700
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
701
                                         update_statistics_ancestors_depth)
702

    
703
        self.nodes_set_latest_version(node, serial)
704

    
705
        return serial, mtime
706

    
707
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
708
        """Lookup the current version of the given node.
709
           Return a list with its properties:
710
           (serial, node, hash, size, type, source, mtime,
711
            muser, uuid, checksum, cluster)
712
           or None if the current version is not found in the given cluster.
713
        """
714

    
715
        v = self.versions.alias('v')
716
        if not all_props:
717
            s = select([v.c.serial])
718
        else:
719
            s = select([v.c.serial, v.c.node, v.c.hash,
720
                        v.c.size, v.c.type, v.c.source,
721
                        v.c.mtime, v.c.muser, v.c.uuid,
722
                        v.c.checksum, v.c.cluster])
723
        if before != inf:
724
            c = select([func.max(self.versions.c.serial)],
725
                       self.versions.c.node == node)
726
            c = c.where(self.versions.c.mtime < before)
727
        else:
728
            c = select([self.nodes.c.latest_version],
729
                       self.nodes.c.node == node)
730
        s = s.where(and_(v.c.serial == c,
731
                         v.c.cluster == cluster))
732
        r = self.conn.execute(s)
733
        props = r.fetchone()
734
        r.close()
735
        if props:
736
            return props
737
        return None
738

    
739
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
740
        """Lookup the current versions of the given nodes.
741
           Return a list with their properties:
742
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
743
        """
744
        if not nodes:
745
            return ()
746
        v = self.versions.alias('v')
747
        if not all_props:
748
            s = select([v.c.serial])
749
        else:
750
            s = select([v.c.serial, v.c.node, v.c.hash,
751
                        v.c.size, v.c.type, v.c.source,
752
                        v.c.mtime, v.c.muser, v.c.uuid,
753
                        v.c.checksum, v.c.cluster])
754
        if before != inf:
755
            c = select([func.max(self.versions.c.serial)],
756
                       self.versions.c.node.in_(nodes))
757
            c = c.where(self.versions.c.mtime < before)
758
            c = c.group_by(self.versions.c.node)
759
        else:
760
            c = select([self.nodes.c.latest_version],
761
                       self.nodes.c.node.in_(nodes))
762
        s = s.where(and_(v.c.serial.in_(c),
763
                         v.c.cluster == cluster))
764
        s = s.order_by(v.c.node)
765
        r = self.conn.execute(s)
766
        rproxy = r.fetchall()
767
        r.close()
768
        return (tuple(row.values()) for row in rproxy)
769

    
770
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
771
        """Return a sequence of values for the properties of
772
           the version specified by serial and the keys, in the order given.
773
           If keys is empty, return all properties in the order
774
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
775
        """
776

    
777
        v = self.versions.alias()
778
        s = select([v.c.serial, v.c.node, v.c.hash,
779
                    v.c.size, v.c.type, v.c.source,
780
                    v.c.mtime, v.c.muser, v.c.uuid,
781
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
782
        rp = self.conn.execute(s)
783
        r = rp.fetchone()
784
        rp.close()
785
        if r is None:
786
            return r
787

    
788
        if not keys:
789
            return r
790
        return [r[propnames[k]] for k in keys if k in propnames]
791

    
792
    def version_put_property(self, serial, key, value):
793
        """Set value for the property of version specified by key."""
794

    
795
        if key not in _propnames:
796
            return
797
        s = self.versions.update()
798
        s = s.where(self.versions.c.serial == serial)
799
        s = s.values(**{key: value})
800
        self.conn.execute(s).close()
801

    
802
    def version_recluster(self, serial, cluster,
803
                          update_statistics_ancestors_depth=None):
804
        """Move the version into another cluster."""
805

    
806
        props = self.version_get_properties(serial)
807
        if not props:
808
            return
809
        node = props[NODE]
810
        size = props[SIZE]
811
        oldcluster = props[CLUSTER]
812
        if cluster == oldcluster:
813
            return
814

    
815
        mtime = time()
816
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
817
                                         update_statistics_ancestors_depth)
818
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
819
                                         update_statistics_ancestors_depth)
820

    
821
        s = self.versions.update()
822
        s = s.where(self.versions.c.serial == serial)
823
        s = s.values(cluster=cluster)
824
        self.conn.execute(s).close()
825

    
826
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
827
        """Remove the serial specified."""
828

    
829
        props = self.version_get_properties(serial)
830
        if not props:
831
            return
832
        node = props[NODE]
833
        hash = props[HASH]
834
        size = props[SIZE]
835
        cluster = props[CLUSTER]
836

    
837
        mtime = time()
838
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
839
                                         update_statistics_ancestors_depth)
840

    
841
        s = self.versions.delete().where(self.versions.c.serial == serial)
842
        self.conn.execute(s).close()
843

    
844
        props = self.version_lookup(node, cluster=cluster, all_props=False)
845
        if props:
846
            self.nodes_set_latest_version(node, serial)
847

    
848
        return hash, size
849

    
850
    def attribute_get(self, serial, domain, keys=()):
851
        """Return a list of (key, value) pairs of the version specified by serial.
852
           If keys is empty, return all attributes.
853
           Othwerise, return only those specified.
854
        """
855

    
856
        if keys:
857
            attrs = self.attributes.alias()
858
            s = select([attrs.c.key, attrs.c.value])
859
            s = s.where(and_(attrs.c.key.in_(keys),
860
                             attrs.c.serial == serial,
861
                             attrs.c.domain == domain))
862
        else:
863
            attrs = self.attributes.alias()
864
            s = select([attrs.c.key, attrs.c.value])
865
            s = s.where(and_(attrs.c.serial == serial,
866
                             attrs.c.domain == domain))
867
        r = self.conn.execute(s)
868
        l = r.fetchall()
869
        r.close()
870
        return l
871

    
872
    def attribute_set(self, serial, domain, node, items, is_latest=True):
873
        """Set the attributes of the version specified by serial.
874
           Receive attributes as an iterable of (key, value) pairs.
875
        """
876
        #insert or replace
877
        #TODO better upsert
878
        for k, v in items:
879
            s = self.attributes.update()
880
            s = s.where(and_(self.attributes.c.serial == serial,
881
                             self.attributes.c.domain == domain,
882
                             self.attributes.c.key == k))
883
            s = s.values(value=v)
884
            rp = self.conn.execute(s)
885
            rp.close()
886
            if rp.rowcount == 0:
887
                s = self.attributes.insert()
888
                s = s.values(serial=serial, domain=domain, node=node,
889
                             is_latest=is_latest, key=k, value=v)
890
                self.conn.execute(s).close()
891

    
892
    def attribute_del(self, serial, domain, keys=()):
893
        """Delete attributes of the version specified by serial.
894
           If keys is empty, delete all attributes.
895
           Otherwise delete those specified.
896
        """
897

    
898
        if keys:
899
            #TODO more efficient way to do this?
900
            for key in keys:
901
                s = self.attributes.delete()
902
                s = s.where(and_(self.attributes.c.serial == serial,
903
                                 self.attributes.c.domain == domain,
904
                                 self.attributes.c.key == key))
905
                self.conn.execute(s).close()
906
        else:
907
            s = self.attributes.delete()
908
            s = s.where(and_(self.attributes.c.serial == serial,
909
                             self.attributes.c.domain == domain))
910
            self.conn.execute(s).close()
911

    
912
    def attribute_copy(self, source, dest):
913
        s = select(
914
            [dest, self.attributes.c.domain, self.attributes.c.node,
915
             self.attributes.c.key, self.attributes.c.value],
916
            self.attributes.c.serial == source)
917
        rp = self.conn.execute(s)
918
        attributes = rp.fetchall()
919
        rp.close()
920
        for dest, domain, node, k, v in attributes:
921
            # insert or replace
922
            s = self.attributes.update().where(and_(
923
                self.attributes.c.serial == dest,
924
                self.attributes.c.domain == domain,
925
                self.attributes.c.key == k))
926
            s = s.values(value=v)
927
            rp = self.conn.execute(s)
928
            rp.close()
929
            if rp.rowcount == 0:
930
                s = self.attributes.insert()
931
                s = s.values(serial=dest, domain=domain, node=node,
932
                             is_latest=True, key=k, value=v)
933
            self.conn.execute(s).close()
934

    
935
    def attribute_unset_is_latest(self, node, exclude):
936
        u = self.attributes.update().where(and_(
937
            self.attributes.c.node == node,
938
                     self.attributes.c.serial != exclude)).values(
939
                             {'is_latest': False})
940
        self.conn.execute(u)
941

    
942
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=None):
943
        """Return a list with all keys pairs defined
944
           for all latest versions under parent that
945
           do not belong to the cluster.
946
        """
947

    
948
        pathq = pathq or []
949

    
950
        # TODO: Use another table to store before=inf results.
951
        a = self.attributes.alias('a')
952
        v = self.versions.alias('v')
953
        n = self.nodes.alias('n')
954
        s = select([a.c.key]).distinct()
955
        if before != inf:
956
            filtered = select([func.max(self.versions.c.serial)])
957
            filtered = filtered.where(self.versions.c.mtime < before)
958
            filtered = filtered.where(self.versions.c.node == v.c.node)
959
        else:
960
            filtered = select([self.nodes.c.latest_version])
961
            filtered = filtered.where(self.nodes.c.node == v.c.node)
962
        s = s.where(v.c.serial == filtered)
963
        s = s.where(v.c.cluster != except_cluster)
964
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
965
                                        self.nodes.c.parent == parent)))
966
        s = s.where(a.c.serial == v.c.serial)
967
        s = s.where(a.c.domain == domain)
968
        s = s.where(n.c.node == v.c.node)
969
        conj = []
970
        for path, match in pathq:
971
            if match == MATCH_PREFIX:
972
                conj.append(
973
                    n.c.path.like(
974
                        self.escape_like(path) + '%',
975
                        escape=ESCAPE_CHAR
976
                    )
977
                )
978
            elif match == MATCH_EXACT:
979
                conj.append(n.c.path == path)
980
        if conj:
981
            s = s.where(or_(*conj))
982
        rp = self.conn.execute(s)
983
        rows = rp.fetchall()
984
        rp.close()
985
        return [r[0] for r in rows]
986

    
987
    def latest_version_list(self, parent, prefix='', delimiter=None,
988
                            start='', limit=10000, before=inf,
989
                            except_cluster=0, pathq=[], domain=None,
990
                            filterq=[], sizeq=None, all_props=False):
991
        """Return a (list of (path, serial) tuples, list of common prefixes)
992
           for the current versions of the paths with the given parent,
993
           matching the following criteria.
994

995
           The property tuple for a version is returned if all
996
           of these conditions are true:
997

998
                a. parent matches
999

1000
                b. path > start
1001

1002
                c. path starts with prefix (and paths in pathq)
1003

1004
                d. version is the max up to before
1005

1006
                e. version is not in cluster
1007

1008
                f. the path does not have the delimiter occuring
1009
                   after the prefix, or ends with the delimiter
1010

1011
                g. serial matches the attribute filter query.
1012

1013
                   A filter query is a comma-separated list of
1014
                   terms in one of these three forms:
1015

1016
                   key
1017
                       an attribute with this key must exist
1018

1019
                   !key
1020
                       an attribute with this key must not exist
1021

1022
                   key ?op value
1023
                       the attribute with this key satisfies the value
1024
                       where ?op is one of ==, != <=, >=, <, >.
1025

1026
                h. the size is in the range set by sizeq
1027

1028
           The list of common prefixes includes the prefixes
1029
           matching up to the first delimiter after prefix,
1030
           and are reported only once, as "virtual directories".
1031
           The delimiter is included in the prefixes.
1032

1033
           If arguments are None, then the corresponding matching rule
1034
           will always match.
1035

1036
           Limit applies to the first list of tuples returned.
1037

1038
           If all_props is True, return all properties after path, not just serial.
1039
        """
1040

    
1041
        if not start or start < prefix:
1042
            start = strprevling(prefix)
1043
        nextling = strnextling(prefix)
1044

    
1045
        v = self.versions.alias('v')
1046
        n = self.nodes.alias('n')
1047
        if not all_props:
1048
            s = select([n.c.path, v.c.serial]).distinct()
1049
        else:
1050
            s = select([n.c.path,
1051
                        v.c.serial, v.c.node, v.c.hash,
1052
                        v.c.size, v.c.type, v.c.source,
1053
                        v.c.mtime, v.c.muser, v.c.uuid,
1054
                        v.c.checksum, v.c.cluster]).distinct()
1055
        if before != inf:
1056
            filtered = select([func.max(self.versions.c.serial)])
1057
            filtered = filtered.where(self.versions.c.mtime < before)
1058
        else:
1059
            filtered = select([self.nodes.c.latest_version])
1060
        s = s.where(
1061
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
1062
        s = s.where(v.c.cluster != except_cluster)
1063
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
1064
                                        self.nodes.c.parent == parent)))
1065

    
1066
        s = s.where(n.c.node == v.c.node)
1067
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
1068
        conj = []
1069
        for path, match in pathq:
1070
            if match == MATCH_PREFIX:
1071
                conj.append(
1072
                    n.c.path.like(
1073
                        self.escape_like(path) + '%',
1074
                        escape=ESCAPE_CHAR
1075
                    )
1076
                )
1077
            elif match == MATCH_EXACT:
1078
                conj.append(n.c.path == path)
1079
        if conj:
1080
            s = s.where(or_(*conj))
1081

    
1082
        if sizeq and len(sizeq) == 2:
1083
            if sizeq[0]:
1084
                s = s.where(v.c.size >= sizeq[0])
1085
            if sizeq[1]:
1086
                s = s.where(v.c.size < sizeq[1])
1087

    
1088
        if domain and filterq:
1089
            a = self.attributes.alias('a')
1090
            included, excluded, opers = parse_filters(filterq)
1091
            if included:
1092
                subs = select([1])
1093
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1094
                subs = subs.where(a.c.domain == domain)
1095
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1096
                s = s.where(exists(subs))
1097
            if excluded:
1098
                subs = select([1])
1099
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1100
                subs = subs.where(a.c.domain == domain)
1101
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1102
                s = s.where(not_(exists(subs)))
1103
            if opers:
1104
                for k, o, val in opers:
1105
                    subs = select([1])
1106
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1107
                    subs = subs.where(a.c.domain == domain)
1108
                    subs = subs.where(
1109
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1110
                    s = s.where(exists(subs))
1111

    
1112
        s = s.order_by(n.c.path)
1113

    
1114
        if not delimiter:
1115
            s = s.limit(limit)
1116
            rp = self.conn.execute(s, start=start)
1117
            r = rp.fetchall()
1118
            rp.close()
1119
            return r, ()
1120

    
1121
        pfz = len(prefix)
1122
        dz = len(delimiter)
1123
        count = 0
1124
        prefixes = []
1125
        pappend = prefixes.append
1126
        matches = []
1127
        mappend = matches.append
1128

    
1129
        rp = self.conn.execute(s, start=start)
1130
        while True:
1131
            props = rp.fetchone()
1132
            if props is None:
1133
                break
1134
            path = props[0]
1135
            serial = props[1]
1136
            idx = path.find(delimiter, pfz)
1137

    
1138
            if idx < 0:
1139
                mappend(props)
1140
                count += 1
1141
                if count >= limit:
1142
                    break
1143
                continue
1144

    
1145
            if idx + dz == len(path):
1146
                mappend(props)
1147
                count += 1
1148
                continue  # Get one more, in case there is a path.
1149
            pf = path[:idx + dz]
1150
            pappend(pf)
1151
            if count >= limit:
1152
                break
1153

    
1154
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1155
        rp.close()
1156

    
1157
        return matches, prefixes
1158

    
1159
    def latest_uuid(self, uuid, cluster):
1160
        """Return the latest version of the given uuid and cluster.
1161

1162
        Return a (path, serial) tuple.
1163
        If cluster is None, all clusters are considered.
1164

1165
        """
1166

    
1167
        v = self.versions.alias('v')
1168
        n = self.nodes.alias('n')
1169
        s = select([n.c.path, v.c.serial])
1170
        filtered = select([func.max(self.versions.c.serial)])
1171
        filtered = filtered.where(self.versions.c.uuid == uuid)
1172
        if cluster is not None:
1173
            filtered = filtered.where(self.versions.c.cluster == cluster)
1174
        s = s.where(v.c.serial == filtered)
1175
        s = s.where(n.c.node == v.c.node)
1176

    
1177
        r = self.conn.execute(s)
1178
        l = r.fetchone()
1179
        r.close()
1180
        return l
1181

    
1182
    def domain_object_list(self, domain, paths, cluster=None):
1183
        """Return a list of (path, property list, attribute dictionary)
1184
           for the objects in the specific domain and cluster.
1185
        """
1186

    
1187
        v = self.versions.alias('v')
1188
        n = self.nodes.alias('n')
1189
        a = self.attributes.alias('a')
1190

    
1191
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1192
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1193
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1194
        if cluster:
1195
            s = s.where(v.c.cluster == cluster)
1196
        s = s.where(v.c.serial == a.c.serial)
1197
        s = s.where(a.c.domain == domain)
1198
        s = s.where(a.c.node == n.c.node)
1199
        s = s.where(a.c.is_latest == True)
1200
        if paths:
1201
            s = s.where(n.c.path.in_(paths))
1202

    
1203
        r = self.conn.execute(s)
1204
        rows = r.fetchall()
1205
        r.close()
1206

    
1207
        group_by = itemgetter(slice(12))
1208
        rows.sort(key = group_by)
1209
        groups = groupby(rows, group_by)
1210
        return [(k[0], k[1:], dict([i[12:] for i in data])) \
1211
            for (k, data) in groups]