Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 63092950

History | View | Annotate | Download (53.1 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.sql.expression import true
43
from sqlalchemy.exc import NoSuchTableError
44

    
45
from dbworker import DBWorker, ESCAPE_CHAR
46

    
47
from pithos.backends.filter import parse_filters
48

    
49

    
50
ROOTNODE = 0
51

    
52
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
53
 CLUSTER) = range(11)
54

    
55
(MATCH_PREFIX, MATCH_EXACT) = range(2)
56

    
57
inf = float('inf')
58

    
59

    
60
def strnextling(prefix):
61
    """Return the first unicode string
62
       greater than but not starting with given prefix.
63
       strnextling('hello') -> 'hellp'
64
    """
65
    if not prefix:
66
        ## all strings start with the null string,
67
        ## therefore we have to approximate strnextling('')
68
        ## with the last unicode character supported by python
69
        ## 0x10ffff for wide (32-bit unicode) python builds
70
        ## 0x00ffff for narrow (16-bit unicode) python builds
71
        ## We will not autodetect. 0xffff is safe enough.
72
        return unichr(0xffff)
73
    s = prefix[:-1]
74
    c = ord(prefix[-1])
75
    if c >= 0xffff:
76
        raise RuntimeError
77
    s += unichr(c + 1)
78
    return s
79

    
80

    
81
def strprevling(prefix):
82
    """Return an approximation of the last unicode string
83
       less than but not starting with given prefix.
84
       strprevling(u'hello') -> u'helln\\xffff'
85
    """
86
    if not prefix:
87
        ## There is no prevling for the null string
88
        return prefix
89
    s = prefix[:-1]
90
    c = ord(prefix[-1])
91
    if c > 0:
92
        s += unichr(c - 1) + unichr(0xffff)
93
    return s
94

    
95
_propnames = {
96
    'serial': 0,
97
    'node': 1,
98
    'hash': 2,
99
    'size': 3,
100
    'type': 4,
101
    'source': 5,
102
    'mtime': 6,
103
    'muser': 7,
104
    'uuid': 8,
105
    'checksum': 9,
106
    'cluster': 10,
107
}
108

    
109

    
110
def create_tables(engine):
111
    metadata = MetaData()
112

    
113
    #create nodes table
114
    columns = []
115
    columns.append(Column('node', Integer, primary_key=True))
116
    columns.append(Column('parent', Integer,
117
                          ForeignKey('nodes.node',
118
                                     ondelete='CASCADE',
119
                                     onupdate='CASCADE'),
120
                          autoincrement=False))
121
    columns.append(Column('latest_version', Integer))
122
    columns.append(Column('path', String(2048), default='', nullable=False))
123
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
124
    Index('idx_nodes_path', nodes.c.path, unique=True)
125
    Index('idx_nodes_parent', nodes.c.parent)
126

    
127
    #create policy table
128
    columns = []
129
    columns.append(Column('node', Integer,
130
                          ForeignKey('nodes.node',
131
                                     ondelete='CASCADE',
132
                                     onupdate='CASCADE'),
133
                          primary_key=True))
134
    columns.append(Column('key', String(128), primary_key=True))
135
    columns.append(Column('value', String(256)))
136
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
137

    
138
    #create statistics table
139
    columns = []
140
    columns.append(Column('node', Integer,
141
                          ForeignKey('nodes.node',
142
                                     ondelete='CASCADE',
143
                                     onupdate='CASCADE'),
144
                          primary_key=True))
145
    columns.append(Column('population', Integer, nullable=False, default=0))
146
    columns.append(Column('size', BigInteger, nullable=False, default=0))
147
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
148
    columns.append(Column('cluster', Integer, nullable=False, default=0,
149
                          primary_key=True, autoincrement=False))
150
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
151

    
152
    #create versions table
153
    columns = []
154
    columns.append(Column('serial', Integer, primary_key=True))
155
    columns.append(Column('node', Integer,
156
                          ForeignKey('nodes.node',
157
                                     ondelete='CASCADE',
158
                                     onupdate='CASCADE')))
159
    columns.append(Column('hash', String(256)))
160
    columns.append(Column('size', BigInteger, nullable=False, default=0))
161
    columns.append(Column('type', String(256), nullable=False, default=''))
162
    columns.append(Column('source', Integer))
163
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
164
    columns.append(Column('muser', String(256), nullable=False, default=''))
165
    columns.append(Column('uuid', String(64), nullable=False, default=''))
166
    columns.append(Column('checksum', String(256), nullable=False, default=''))
167
    columns.append(Column('cluster', Integer, nullable=False, default=0))
168
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
169
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
170
    Index('idx_versions_node_uuid', versions.c.uuid)
171

    
172
    #create attributes table
173
    columns = []
174
    columns.append(Column('serial', Integer,
175
                          ForeignKey('versions.serial',
176
                                     ondelete='CASCADE',
177
                                     onupdate='CASCADE'),
178
                          primary_key=True))
179
    columns.append(Column('domain', String(256), primary_key=True))
180
    columns.append(Column('key', String(128), primary_key=True))
181
    columns.append(Column('value', String(256)))
182
    columns.append(Column('node', Integer, nullable=False, default=0))
183
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
184
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
185
    Index('idx_attributes_domain', attributes.c.domain)
186
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
187

    
188
    metadata.create_all(engine)
189
    return metadata.sorted_tables
190

    
191

    
192
class Node(DBWorker):
193
    """Nodes store path organization and have multiple versions.
194
       Versions store object history and have multiple attributes.
195
       Attributes store metadata.
196
    """
197

    
198
    # TODO: Provide an interface for included and excluded clusters.
199

    
200
    def __init__(self, **params):
201
        DBWorker.__init__(self, **params)
202
        try:
203
            metadata = MetaData(self.engine)
204
            self.nodes = Table('nodes', metadata, autoload=True)
205
            self.policy = Table('policy', metadata, autoload=True)
206
            self.statistics = Table('statistics', metadata, autoload=True)
207
            self.versions = Table('versions', metadata, autoload=True)
208
            self.attributes = Table('attributes', metadata, autoload=True)
209
        except NoSuchTableError:
210
            tables = create_tables(self.engine)
211
            map(lambda t: self.__setattr__(t.name, t), tables)
212

    
213
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
214
                                           self.nodes.c.parent == ROOTNODE))
215
        wrapper = self.wrapper
216
        wrapper.execute()
217
        try:
218
            rp = self.conn.execute(s)
219
            r = rp.fetchone()
220
            rp.close()
221
            if not r:
222
                s = self.nodes.insert(
223
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
224
                self.conn.execute(s)
225
        finally:
226
            wrapper.commit()
227

    
228
    def node_create(self, parent, path):
229
        """Create a new node from the given properties.
230
           Return the node identifier of the new node.
231
        """
232
        #TODO catch IntegrityError?
233
        s = self.nodes.insert().values(parent=parent, path=path)
234
        r = self.conn.execute(s)
235
        inserted_primary_key = r.inserted_primary_key[0]
236
        r.close()
237
        return inserted_primary_key
238

    
239
    def node_lookup(self, path, for_update=False):
240
        """Lookup the current node of the given path.
241
           Return None if the path is not found.
242
        """
243

    
244
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
245
        s = select([self.nodes.c.node],
246
                   self.nodes.c.path.like(self.escape_like(path),
247
                                          escape=ESCAPE_CHAR),
248
                   for_update=for_update)
249
        r = self.conn.execute(s)
250
        row = r.fetchone()
251
        r.close()
252
        if row:
253
            return row[0]
254
        return None
255

    
256
    def node_lookup_bulk(self, paths):
257
        """Lookup the current nodes for the given paths.
258
           Return {} if the path is not found.
259
        """
260

    
261
        if not paths:
262
            return {}
263
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
264
        s = select([self.nodes.c.path, self.nodes.c.node],
265
                   self.nodes.c.path.in_(paths))
266
        r = self.conn.execute(s)
267
        rows = r.fetchall()
268
        r.close()
269
        return dict([(row.node, row.path) for row in rows])
270

    
271
    def node_get_properties(self, node):
272
        """Return the node's (parent, path).
273
           Return None if the node is not found.
274
        """
275

    
276
        s = select([self.nodes.c.parent, self.nodes.c.path])
277
        s = s.where(self.nodes.c.node == node)
278
        r = self.conn.execute(s)
279
        l = r.fetchone()
280
        r.close()
281
        return l
282

    
283
    def node_get_properties_bulk(self, nodes):
284
        """Return the (parent, path) for the specific nodes.
285
        """
286

    
287
        s = select([self.nodes.c.node, self.nodes.c.parent, self.nodes.c.path])
288
        s = s.where(self.nodes.c.node.in_(nodes))
289
        r = self.conn.execute(s)
290
        l = r.fetchall()
291
        r.close()
292
        return l
293

    
294
    def node_get_versions(self, node, keys=(), propnames=_propnames):
295
        """Return the properties of all versions at node.
296
           If keys is empty, return all properties in the order
297
           (serial, node, hash, size, type, source, mtime, muser, uuid,
298
            checksum, cluster).
299
        """
300

    
301
        s = select([self.versions.c.serial,
302
                    self.versions.c.node,
303
                    self.versions.c.hash,
304
                    self.versions.c.size,
305
                    self.versions.c.type,
306
                    self.versions.c.source,
307
                    self.versions.c.mtime,
308
                    self.versions.c.muser,
309
                    self.versions.c.uuid,
310
                    self.versions.c.checksum,
311
                    self.versions.c.cluster], self.versions.c.node == node)
312
        s = s.order_by(self.versions.c.serial)
313
        r = self.conn.execute(s)
314
        rows = r.fetchall()
315
        r.close()
316
        if not rows:
317
            return rows
318

    
319
        if not keys:
320
            return rows
321

    
322
        return [[p[propnames[k]] for k in keys if k in propnames] for
323
                p in rows]
324

    
325
    def node_count_children(self, node):
326
        """Return node's child count."""
327

    
328
        s = select([func.count(self.nodes.c.node)])
329
        s = s.where(and_(self.nodes.c.parent == node,
330
                         self.nodes.c.node != ROOTNODE))
331
        r = self.conn.execute(s)
332
        row = r.fetchone()
333
        r.close()
334
        return row[0]
335

    
336
    def node_purge_children(self, parent, before=inf, cluster=0,
337
                            update_statistics_ancestors_depth=None):
338
        """Delete all versions with the specified
339
           parent and cluster, and return
340
           the hashes, the total size and the serials of versions deleted.
341
           Clears out nodes with no remaining versions.
342
        """
343
        #update statistics
344
        c1 = select([self.nodes.c.node],
345
                    self.nodes.c.parent == parent)
346
        where_clause = and_(self.versions.c.node.in_(c1),
347
                            self.versions.c.cluster == cluster)
348
        if before != inf:
349
            where_clause = and_(where_clause,
350
                                self.versions.c.mtime <= before)
351
        s = select([func.count(self.versions.c.serial),
352
                    func.sum(self.versions.c.size)])
353
        s = s.where(where_clause)
354
        r = self.conn.execute(s)
355
        row = r.fetchone()
356
        r.close()
357
        if not row:
358
            return (), 0, ()
359
        nr, size = row[0], row[1] if row[1] else 0
360
        mtime = time()
361
        self.statistics_update(parent, -nr, -size, mtime, cluster)
362
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
363
                                         update_statistics_ancestors_depth)
364

    
365
        s = select([self.versions.c.hash, self.versions.c.serial])
366
        s = s.where(where_clause)
367
        r = self.conn.execute(s)
368
        hashes = []
369
        serials = []
370
        for row in r.fetchall():
371
            hashes += [row[0]]
372
            serials += [row[1]]
373
        r.close()
374

    
375
        #delete versions
376
        s = self.versions.delete().where(where_clause)
377
        r = self.conn.execute(s)
378
        r.close()
379

    
380
        #delete nodes
381
        s = select([self.nodes.c.node],
382
                   and_(self.nodes.c.parent == parent,
383
                        select([func.count(self.versions.c.serial)],
384
                               self.versions.c.node == self.nodes.c.node).
385
                        as_scalar() == 0))
386
        rp = self.conn.execute(s)
387
        nodes = [row[0] for row in rp.fetchall()]
388
        rp.close()
389
        if nodes:
390
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
391
            self.conn.execute(s).close()
392

    
393
        return hashes, size, serials
394

    
395
    def node_purge(self, node, before=inf, cluster=0,
396
                   update_statistics_ancestors_depth=None):
397
        """Delete all versions with the specified
398
           node and cluster, and return
399
           the hashes and size of versions deleted.
400
           Clears out the node if it has no remaining versions.
401
        """
402

    
403
        #update statistics
404
        s = select([func.count(self.versions.c.serial),
405
                    func.sum(self.versions.c.size)])
406
        where_clause = and_(self.versions.c.node == node,
407
                            self.versions.c.cluster == cluster)
408
        if before != inf:
409
            where_clause = and_(where_clause,
410
                                self.versions.c.mtime <= before)
411
        s = s.where(where_clause)
412
        r = self.conn.execute(s)
413
        row = r.fetchone()
414
        nr, size = row[0], row[1]
415
        r.close()
416
        if not nr:
417
            return (), 0, ()
418
        mtime = time()
419
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
420
                                         update_statistics_ancestors_depth)
421

    
422
        s = select([self.versions.c.hash, self.versions.c.serial])
423
        s = s.where(where_clause)
424
        r = self.conn.execute(s)
425
        hashes = []
426
        serials = []
427
        for row in r.fetchall():
428
            hashes += [row[0]]
429
            serials += [row[1]]
430
        r.close()
431

    
432
        #delete versions
433
        s = self.versions.delete().where(where_clause)
434
        r = self.conn.execute(s)
435
        r.close()
436

    
437
        #delete nodes
438
        s = select([self.nodes.c.node],
439
                   and_(self.nodes.c.node == node,
440
                        select([func.count(self.versions.c.serial)],
441
                               self.versions.c.node == self.nodes.c.node).
442
                        as_scalar() == 0))
443
        rp = self.conn.execute(s)
444
        nodes = [row[0] for row in rp.fetchall()]
445
        rp.close()
446
        if nodes:
447
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
448
            self.conn.execute(s).close()
449

    
450
        return hashes, size, serials
451

    
452
    def node_remove(self, node, update_statistics_ancestors_depth=None):
453
        """Remove the node specified.
454
           Return false if the node has children or is not found.
455
        """
456

    
457
        if self.node_count_children(node):
458
            return False
459

    
460
        mtime = time()
461
        s = select([func.count(self.versions.c.serial),
462
                    func.sum(self.versions.c.size),
463
                    self.versions.c.cluster])
464
        s = s.where(self.versions.c.node == node)
465
        s = s.group_by(self.versions.c.cluster)
466
        r = self.conn.execute(s)
467
        for population, size, cluster in r.fetchall():
468
            self.statistics_update_ancestors(
469
                node, -population, -size, mtime, cluster,
470
                update_statistics_ancestors_depth)
471
        r.close()
472

    
473
        s = self.nodes.delete().where(self.nodes.c.node == node)
474
        self.conn.execute(s).close()
475
        return True
476

    
477
    def node_accounts(self, accounts=()):
478
        s = select([self.nodes.c.path, self.nodes.c.node])
479
        s = s.where(and_(self.nodes.c.node != 0,
480
                         self.nodes.c.parent == 0))
481
        if accounts:
482
            s = s.where(self.nodes.c.path.in_(accounts))
483
        r = self.conn.execute(s)
484
        rows = r.fetchall()
485
        r.close()
486
        return rows
487

    
488
    def node_account_quotas(self):
489
        s = select([self.nodes.c.path, self.policy.c.value])
490
        s = s.where(and_(self.nodes.c.node != 0,
491
                         self.nodes.c.parent == 0))
492
        s = s.where(self.nodes.c.node == self.policy.c.node)
493
        s = s.where(self.policy.c.key == 'quota')
494
        r = self.conn.execute(s)
495
        rows = r.fetchall()
496
        r.close()
497
        return dict(rows)
498

    
499
    def node_account_usage(self, account=None, cluster=0):
500
        """Return usage for a specific account.
501

502
        Keyword arguments:
503
        account -- (default None: list usage for all the accounts)
504
        cluster -- list current, history or deleted usage (default 0: normal)
505
        """
506

    
507
        n1 = self.nodes.alias('n1')
508
        n2 = self.nodes.alias('n2')
509
        n3 = self.nodes.alias('n3')
510

    
511
        s = select([n3.c.path, func.sum(self.versions.c.size)])
512
        s = s.where(n1.c.node == self.versions.c.node)
513
        s = s.where(self.versions.c.cluster == cluster)
514
        s = s.where(n1.c.parent == n2.c.node)
515
        s = s.where(n2.c.parent == n3.c.node)
516
        s = s.where(n3.c.parent == 0)
517
        s = s.where(n3.c.node != 0)
518
        if account:
519
            s = s.where(n3.c.path == account)
520
        s = s.group_by(n3.c.path)
521
        r = self.conn.execute(s)
522
        usage = r.fetchall()
523
        r.close()
524
        return dict(usage)
525

    
526
    def policy_get(self, node):
527
        s = select([self.policy.c.key, self.policy.c.value],
528
                   self.policy.c.node == node)
529
        r = self.conn.execute(s)
530
        d = dict(r.fetchall())
531
        r.close()
532
        return d
533

    
534
    def policy_set(self, node, policy):
535
        #insert or replace
536
        for k, v in policy.iteritems():
537
            s = self.policy.update().where(and_(self.policy.c.node == node,
538
                                                self.policy.c.key == k))
539
            s = s.values(value=v)
540
            rp = self.conn.execute(s)
541
            rp.close()
542
            if rp.rowcount == 0:
543
                s = self.policy.insert()
544
                values = {'node': node, 'key': k, 'value': v}
545
                r = self.conn.execute(s, values)
546
                r.close()
547

    
548
    def statistics_get(self, node, cluster=0):
549
        """Return population, total size and last mtime
550
           for all versions under node that belong to the cluster.
551
        """
552

    
553
        s = select([self.statistics.c.population,
554
                    self.statistics.c.size,
555
                    self.statistics.c.mtime])
556
        s = s.where(and_(self.statistics.c.node == node,
557
                         self.statistics.c.cluster == cluster))
558
        r = self.conn.execute(s)
559
        row = r.fetchone()
560
        r.close()
561
        return row
562

    
563
    def statistics_update(self, node, population, size, mtime, cluster=0):
564
        """Update the statistics of the given node.
565
           Statistics keep track the population, total
566
           size of objects and mtime in the node's namespace.
567
           May be zero or positive or negative numbers.
568
        """
569
        s = select([self.statistics.c.population, self.statistics.c.size],
570
                   and_(self.statistics.c.node == node,
571
                        self.statistics.c.cluster == cluster))
572
        rp = self.conn.execute(s)
573
        r = rp.fetchone()
574
        rp.close()
575
        if not r:
576
            prepopulation, presize = (0, 0)
577
        else:
578
            prepopulation, presize = r
579
        population += prepopulation
580
        population = max(population, 0)
581
        size += presize
582

    
583
        #insert or replace
584
        #TODO better upsert
585
        u = self.statistics.update().where(and_(
586
            self.statistics.c.node == node,
587
            self.statistics.c.cluster == cluster))
588
        u = u.values(population=population, size=size, mtime=mtime)
589
        rp = self.conn.execute(u)
590
        rp.close()
591
        if rp.rowcount == 0:
592
            ins = self.statistics.insert()
593
            ins = ins.values(node=node, population=population, size=size,
594
                             mtime=mtime, cluster=cluster)
595
            self.conn.execute(ins).close()
596

    
597
    def statistics_update_ancestors(self, node, population, size, mtime,
598
                                    cluster=0, recursion_depth=None):
599
        """Update the statistics of the given node's parent.
600
           Then recursively update all parents up to the root
601
           or up to the ``recursion_depth`` (if not None).
602
           Population is not recursive.
603
        """
604

    
605
        i = 0
606
        while True:
607
            if node == ROOTNODE:
608
                break
609
            if recursion_depth and recursion_depth <= i:
610
                break
611
            props = self.node_get_properties(node)
612
            if props is None:
613
                break
614
            parent, path = props
615
            self.statistics_update(parent, population, size, mtime, cluster)
616
            node = parent
617
            population = 0  # Population isn't recursive
618
            i += 1
619

    
620
    def statistics_latest(self, node, before=inf, except_cluster=0):
621
        """Compute population, total size and last mtime
622
           for all latest versions under node that
623
           do not belong to the cluster.
624
        """
625

    
626
        # The node.
627
        props = self.node_get_properties(node)
628
        if props is None:
629
            return None
630
        parent, path = props
631

    
632
        # The latest version.
633
        s = select([self.versions.c.size,
634
                    self.versions.c.mtime])
635
        if before != inf:
636
            filtered = select([func.max(self.versions.c.serial)],
637
                              self.versions.c.node == node)
638
            filtered = filtered.where(self.versions.c.mtime < before)
639
        else:
640
            filtered = select([self.nodes.c.latest_version],
641
                              self.nodes.c.node == node)
642
        s = s.where(and_(self.versions.c.cluster != except_cluster,
643
                         self.versions.c.serial == filtered))
644
        r = self.conn.execute(s)
645
        props = r.fetchone()
646
        r.close()
647
        if not props:
648
            return None
649
        mtime = props.mtime
650

    
651
        # First level, just under node (get population).
652
        v = self.versions.alias('v')
653
        s = select([func.count(v.c.serial),
654
                    func.sum(v.c.size),
655
                    func.max(v.c.mtime)])
656
        if before != inf:
657
            c1 = select([func.max(self.versions.c.serial)],
658
                        and_(self.versions.c.mtime < before,
659
                             self.versions.c.node == v.c.node))
660
        else:
661
            c1 = select([self.nodes.c.latest_version])
662
            c1 = c1.where(self.nodes.c.node == v.c.node)
663
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
664
        s = s.where(and_(v.c.serial == c1,
665
                         v.c.cluster != except_cluster,
666
                         v.c.node.in_(c2)))
667
        rp = self.conn.execute(s)
668
        r = rp.fetchone()
669
        rp.close()
670
        if not r:
671
            return None
672
        count = r[0]
673
        mtime = max(mtime, r[2])
674
        if count == 0:
675
            return (0, 0, mtime)
676

    
677
        # All children (get size and mtime).
678
        # This is why the full path is stored.
679
        if before != inf:
680
            s = select([func.count(v.c.serial),
681
                    func.sum(v.c.size),
682
                    func.max(v.c.mtime)])
683
            c1 = select([func.max(self.versions.c.serial)],
684
                        and_(self.versions.c.mtime < before,
685
                             self.versions.c.node == v.c.node))
686
        else:
687
            inner_join = \
688
                    self.versions.join(self.nodes, onclause=\
689
                    self.versions.c.serial == self.nodes.c.latest_version)
690
            s = select([func.count(self.versions.c.serial),
691
                    func.sum(self.versions.c.size),
692
                    func.max(self.versions.c.mtime)], from_obj=[inner_join])
693

    
694
        c2 = select([self.nodes.c.node],
695
                    self.nodes.c.path.like(self.escape_like(path) + '%',
696
                                           escape=ESCAPE_CHAR))
697
        if before != inf:
698
            s = s.where(and_(v.c.serial == c1,
699
                         v.c.cluster != except_cluster,
700
                         v.c.node.in_(c2)))
701
        else:
702
            s = s.where(and_(self.versions.c.cluster != except_cluster,
703
                        self.versions.c.node.in_(c2)))
704

    
705
        rp = self.conn.execute(s)
706
        r = rp.fetchone()
707
        rp.close()
708
        if not r:
709
            return None
710
        size = long(r[1] - props.size)
711
        mtime = max(mtime, r[2])
712
        return (count, size, mtime)
713

    
714
    def statistics_latest_bulk(self, nodes, before=inf, except_cluster=0):
715
        """Compute population, total size and last mtime
716
           for all latest versions under node that
717
           do not belong to the cluster.
718
        """
719

    
720
        # The node.
721
        props = self.node_get_properties_bulk(nodes)
722
        if not props:  # empty list
723
            return ()
724
        paths = [p.path for p in props]
725

    
726
        # The latest version.
727
        s = select([self.versions.c.node,
728
                    self.versions.c.size,
729
                    self.versions.c.mtime])
730
        if before != inf:
731
            filtered = select([func.max(self.versions.c.serial)],
732
                              self.versions.c.node.in_(nodes))
733
            filtered = filtered.where(self.versions.c.mtime < before)
734
        else:
735
            filtered = select([self.nodes.c.latest_version],
736
                              self.nodes.c.node.in_(nodes))
737
        s = s.where(and_(self.versions.c.cluster != except_cluster,
738
                         self.versions.c.serial.in_(filtered)))
739
        r = self.conn.execute(s)
740
        props = r.fetchall()
741
        r.close()
742
        if not props:
743
            return ()
744
        mtimes = {}
745
        sizes = {}
746
        for p in props:
747
            mtimes[p.node] = p.mtime
748
            sizes[p.node] = p.size
749

    
750
        # First level, just under node (get population).
751
        n = self.nodes.alias('n')
752
        v = self.versions.alias('v')
753
        s = select([n.c.parent,
754
                    func.count(v.c.serial).label('count'),
755
                    func.sum(v.c.size).label('total_size'),
756
                    func.max(v.c.mtime).label('max_timestamp')])
757
        if before != inf:
758
            c1 = select([func.max(self.versions.c.serial)],
759
                        and_(self.versions.c.mtime < before,
760
                             self.versions.c.node == v.c.node))
761
        else:
762
            c1 = select([self.nodes.c.latest_version])
763
            c1 = c1.where(self.nodes.c.node == v.c.node)
764
        c2 = select([self.nodes.c.node], self.nodes.c.parent.in_(nodes))
765
        s = s.where(and_(v.c.serial == c1,
766
                         v.c.cluster != except_cluster,
767
                         v.c.node.in_(c2),
768
                         v.c.node == n.c.node))
769
        s = s.group_by(n.c.parent)
770
        rp = self.conn.execute(s)
771
        r = rp.fetchall()
772
        rp.close()
773
        if not r:
774
            return None
775
        counts = {}
776
        for stats in r:
777
            counts[stats.parent] = stats.total_size
778
            mtimes[stats.parent] = max(mtimes[stats.parent],
779
                                       stats.max_timestamp)
780

    
781
        # All children (get size and mtime).
782
        # This is why the full path is stored.
783
        n1 = self.nodes.alias('n1')
784
        n2 = self.nodes.alias('n2')
785
        if before != inf:
786
            s = select([n2.c.node,
787
                        func.count(v.c.serial).label('count'),
788
                        func.sum(v.c.size).label('total_size'),
789
                        func.max(v.c.mtime).label('max_timestamp')])
790
            c1 = select([func.max(self.versions.c.serial)],
791
                        and_(self.versions.c.mtime < before,
792
                             self.versions.c.node == v.c.node))
793
        else:
794
            inner_join = \
795
                    self.versions.join(self.nodes, onclause=\
796
                    self.versions.c.serial == self.nodes.c.latest_version)
797
            s = select([n2.c.node,
798
                        func.count(self.versions.c.serial).label('count'),
799
                        func.sum(self.versions.c.size).label('total_size'),
800
                        func.max(self.versions.c.mtime).label('max_timestamp')],
801
                       from_obj=[inner_join])
802

    
803
        c2 = select([self.nodes.c.node])
804
        like = lambda p: self.nodes.c.path.like(self.escape_like(p) + '%',
805
                                                escape=ESCAPE_CHAR)
806
        c2 = c2.where(or_(*map(like, paths)))
807
        if before != inf:
808
            s = s.where(and_(v.c.serial == c1,
809
                         v.c.cluster != except_cluster,
810
                         v.c.node.in_(c2)))
811
        else:
812
            s = s.where(and_(self.versions.c.cluster != except_cluster,
813
                        self.versions.c.node.in_(c2)))
814
        s = s.where(self.nodes.c.parent == n1.c.node)
815
        s = s.where(n1.c.parent == n2.c.node)
816
        s = s.where(n2.c.node != ROOTNODE)
817
        s = s.group_by(n2.c.node)
818

    
819
        rp = self.conn.execute(s)
820
        r = rp.fetchall()
821
        rp.close()
822
        if not r:
823
            return None
824
        for stats in r:
825
            sizes[stats.node] = long(stats.total_size - sizes[stats.node])
826
            mtimes[stats.node] = max(mtimes[stats.node], stats.max_timestamp)
827
        l = []
828
        append = l.append
829
        for node in nodes:
830
            append((node, counts[node], sizes[node], mtimes[node]))
831
        return l
832

    
833
    def nodes_set_latest_version(self, node, serial):
834
        s = self.nodes.update().where(self.nodes.c.node == node)
835
        s = s.values(latest_version=serial)
836
        self.conn.execute(s).close()
837

    
838
    def version_create(self, node, hash, size, type, source, muser, uuid,
839
                       checksum, cluster=0,
840
                       update_statistics_ancestors_depth=None):
841
        """Create a new version from the given properties.
842
           Return the (serial, mtime) of the new version.
843
        """
844

    
845
        mtime = time()
846
        s = self.versions.insert().values(
847
            node=node, hash=hash, size=size, type=type, source=source,
848
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
849
            cluster=cluster)
850
        serial = self.conn.execute(s).inserted_primary_key[0]
851
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
852
                                         update_statistics_ancestors_depth)
853

    
854
        self.nodes_set_latest_version(node, serial)
855

    
856
        return serial, mtime
857

    
858
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
859
        """Lookup the current version of the given node.
860
           Return a list with its properties:
861
           (serial, node, hash, size, type, source, mtime,
862
            muser, uuid, checksum, cluster)
863
           or None if the current version is not found in the given cluster.
864
        """
865

    
866
        v = self.versions.alias('v')
867
        if not all_props:
868
            s = select([v.c.serial])
869
        else:
870
            s = select([v.c.serial, v.c.node, v.c.hash,
871
                        v.c.size, v.c.type, v.c.source,
872
                        v.c.mtime, v.c.muser, v.c.uuid,
873
                        v.c.checksum, v.c.cluster])
874
        if before != inf:
875
            c = select([func.max(self.versions.c.serial)],
876
                       self.versions.c.node == node)
877
            c = c.where(self.versions.c.mtime < before)
878
        else:
879
            c = select([self.nodes.c.latest_version],
880
                       self.nodes.c.node == node)
881
        s = s.where(and_(v.c.serial == c,
882
                         v.c.cluster == cluster))
883
        r = self.conn.execute(s)
884
        props = r.fetchone()
885
        r.close()
886
        if props:
887
            return props
888
        return None
889

    
890
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
891
                            all_props=True, order_by_path=False):
892
        """Lookup the current versions of the given nodes.
893
           Return a list with their properties:
894
           (serial, node, hash, size, type, source, mtime, muser, uuid,
895
            checksum, cluster).
896
        """
897
        if not nodes:
898
            return ()
899
        v = self.versions.alias('v')
900
        n = self.nodes.alias('n')
901
        if not all_props:
902
            s = select([v.c.serial])
903
        else:
904
            s = select([v.c.serial, v.c.node, v.c.hash,
905
                        v.c.size, v.c.type, v.c.source,
906
                        v.c.mtime, v.c.muser, v.c.uuid,
907
                        v.c.checksum, v.c.cluster])
908
        if before != inf:
909
            c = select([func.max(self.versions.c.serial)],
910
                       self.versions.c.node.in_(nodes))
911
            c = c.where(self.versions.c.mtime < before)
912
            c = c.group_by(self.versions.c.node)
913
        else:
914
            c = select([self.nodes.c.latest_version],
915
                       self.nodes.c.node.in_(nodes))
916
        s = s.where(and_(v.c.serial.in_(c),
917
                         v.c.cluster == cluster))
918
        if order_by_path:
919
            s = s.where(v.c.node == n.c.node)
920
            s = s.order_by(n.c.path)
921
        else:
922
            s = s.order_by(v.c.node)
923
        r = self.conn.execute(s)
924
        rows = r.fetchall()
925
        r.close()
926
        return rows
927

    
928
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
929
                               node=None):
930
        """Return a sequence of values for the properties of
931
           the version specified by serial and the keys, in the order given.
932
           If keys is empty, return all properties in the order
933
           (serial, node, hash, size, type, source, mtime, muser, uuid,
934
            checksum, cluster).
935
        """
936

    
937
        v = self.versions.alias()
938
        s = select([v.c.serial, v.c.node, v.c.hash,
939
                    v.c.size, v.c.type, v.c.source,
940
                    v.c.mtime, v.c.muser, v.c.uuid,
941
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
942
        if node is not None:
943
            s = s.where(v.c.node == node)
944
        rp = self.conn.execute(s)
945
        r = rp.fetchone()
946
        rp.close()
947
        if r is None:
948
            return r
949

    
950
        if not keys:
951
            return r
952
        return [r[propnames[k]] for k in keys if k in propnames]
953

    
954
    def version_put_property(self, serial, key, value):
955
        """Set value for the property of version specified by key."""
956

    
957
        if key not in _propnames:
958
            return
959
        s = self.versions.update()
960
        s = s.where(self.versions.c.serial == serial)
961
        s = s.values(**{key: value})
962
        self.conn.execute(s).close()
963

    
964
    def version_recluster(self, serial, cluster,
965
                          update_statistics_ancestors_depth=None):
966
        """Move the version into another cluster."""
967

    
968
        props = self.version_get_properties(serial)
969
        if not props:
970
            return
971
        node = props[NODE]
972
        size = props[SIZE]
973
        oldcluster = props[CLUSTER]
974
        if cluster == oldcluster:
975
            return
976

    
977
        mtime = time()
978
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
979
                                         update_statistics_ancestors_depth)
980
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
981
                                         update_statistics_ancestors_depth)
982

    
983
        s = self.versions.update()
984
        s = s.where(self.versions.c.serial == serial)
985
        s = s.values(cluster=cluster)
986
        self.conn.execute(s).close()
987

    
988
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
989
        """Remove the serial specified."""
990

    
991
        props = self.version_get_properties(serial)
992
        if not props:
993
            return
994
        node = props[NODE]
995
        hash = props[HASH]
996
        size = props[SIZE]
997
        cluster = props[CLUSTER]
998

    
999
        mtime = time()
1000
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
1001
                                         update_statistics_ancestors_depth)
1002

    
1003
        s = self.versions.delete().where(self.versions.c.serial == serial)
1004
        self.conn.execute(s).close()
1005

    
1006
        props = self.version_lookup(node, cluster=cluster, all_props=False)
1007
        if props:
1008
            self.nodes_set_latest_version(node, serial)
1009

    
1010
        return hash, size
1011

    
1012
    def attribute_get(self, serial, domain, keys=()):
1013
        """Return a list of (key, value) pairs of the specific version.
1014

1015
        If keys is empty, return all attributes.
1016
        Othwerise, return only those specified.
1017
        """
1018

    
1019
        if keys:
1020
            attrs = self.attributes.alias()
1021
            s = select([attrs.c.key, attrs.c.value])
1022
            s = s.where(and_(attrs.c.key.in_(keys),
1023
                             attrs.c.serial == serial,
1024
                             attrs.c.domain == domain))
1025
        else:
1026
            attrs = self.attributes.alias()
1027
            s = select([attrs.c.key, attrs.c.value])
1028
            s = s.where(and_(attrs.c.serial == serial,
1029
                             attrs.c.domain == domain))
1030
        r = self.conn.execute(s)
1031
        l = r.fetchall()
1032
        r.close()
1033
        return l
1034

    
1035
    def attribute_set(self, serial, domain, node, items, is_latest=True):
1036
        """Set the attributes of the version specified by serial.
1037
           Receive attributes as an iterable of (key, value) pairs.
1038
        """
1039
        #insert or replace
1040
        #TODO better upsert
1041
        for k, v in items:
1042
            s = self.attributes.update()
1043
            s = s.where(and_(self.attributes.c.serial == serial,
1044
                             self.attributes.c.domain == domain,
1045
                             self.attributes.c.key == k))
1046
            s = s.values(value=v)
1047
            rp = self.conn.execute(s)
1048
            rp.close()
1049
            if rp.rowcount == 0:
1050
                s = self.attributes.insert()
1051
                s = s.values(serial=serial, domain=domain, node=node,
1052
                             is_latest=is_latest, key=k, value=v)
1053
                self.conn.execute(s).close()
1054

    
1055
    def attribute_del(self, serial, domain, keys=()):
1056
        """Delete attributes of the version specified by serial.
1057
           If keys is empty, delete all attributes.
1058
           Otherwise delete those specified.
1059
        """
1060

    
1061
        if keys:
1062
            #TODO more efficient way to do this?
1063
            for key in keys:
1064
                s = self.attributes.delete()
1065
                s = s.where(and_(self.attributes.c.serial == serial,
1066
                                 self.attributes.c.domain == domain,
1067
                                 self.attributes.c.key == key))
1068
                self.conn.execute(s).close()
1069
        else:
1070
            s = self.attributes.delete()
1071
            s = s.where(and_(self.attributes.c.serial == serial,
1072
                             self.attributes.c.domain == domain))
1073
            self.conn.execute(s).close()
1074

    
1075
    def attribute_copy(self, source, dest):
1076
        s = select(
1077
            [dest, self.attributes.c.domain, self.attributes.c.node,
1078
             self.attributes.c.key, self.attributes.c.value],
1079
            self.attributes.c.serial == source)
1080
        rp = self.conn.execute(s)
1081
        attributes = rp.fetchall()
1082
        rp.close()
1083
        for dest, domain, node, k, v in attributes:
1084
            select_src_node = select(
1085
                [self.versions.c.node],
1086
                self.versions.c.serial == dest)
1087
            # insert or replace
1088
            s = self.attributes.update().where(and_(
1089
                self.attributes.c.serial == dest,
1090
                self.attributes.c.domain == domain,
1091
                self.attributes.c.key == k))
1092
            s = s.values(node=select_src_node, value=v)
1093
            rp = self.conn.execute(s)
1094
            rp.close()
1095
            if rp.rowcount == 0:
1096
                s = self.attributes.insert()
1097
                s = s.values(serial=dest, domain=domain, node=select_src_node,
1098
                             is_latest=True, key=k, value=v)
1099
            self.conn.execute(s).close()
1100

    
1101
    def attribute_unset_is_latest(self, node, exclude):
1102
        u = self.attributes.update().where(and_(
1103
            self.attributes.c.node == node,
1104
            self.attributes.c.serial != exclude)).values({'is_latest': False})
1105
        self.conn.execute(u)
1106

    
1107
    def latest_attribute_keys(self, parent, domain, before=inf,
1108
                              except_cluster=0, pathq=None):
1109
        """Return a list with all keys pairs defined
1110
           for all latest versions under parent that
1111
           do not belong to the cluster.
1112
        """
1113

    
1114
        pathq = pathq or []
1115

    
1116
        # TODO: Use another table to store before=inf results.
1117
        v = self.versions.alias('v')
1118
        s = select([self.attributes.c.key]).distinct()
1119
        if before != inf:
1120
            filtered = select([func.max(v.c.serial)],
1121
                              and_(v.c.mtime < before,
1122
                                   v.c.node == self.versions.c.node))
1123
        else:
1124
            filtered = select([self.nodes.c.latest_version])
1125
            filtered = filtered.where(self.nodes.c.node == \
1126
                            self.versions.c.node).correlate(self.versions)
1127
        s = s.where(self.versions.c.serial == filtered)
1128
        s = s.where(self.versions.c.cluster != except_cluster)
1129
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1130
                                        self.nodes.c.parent == parent)))
1131
        s = s.where(self.attributes.c.serial == self.versions.c.serial)
1132
        s = s.where(self.attributes.c.domain == domain)
1133
        s = s.where(self.nodes.c.node == self.versions.c.node)
1134
        s = s.order_by(self.attributes.c.key)
1135
        conja = []
1136
        conjb = []
1137
        for path, match in pathq:
1138
            if match == MATCH_PREFIX:
1139
                conja.append(self.nodes.c.path.like(self.escape_like(path) + '%',
1140
                                          escape=ESCAPE_CHAR))
1141
            elif match == MATCH_EXACT:
1142
                conjb.append(path)
1143
        if conja or conjb:
1144
            s = s.where(or_(self.nodes.c.path.in_(conjb),*conja))
1145
        rp = self.conn.execute(s)
1146
        rows = rp.fetchall()
1147
        rp.close()
1148
        return [r[0] for r in rows]
1149

    
1150
    def latest_version_list(self, parent, prefix='', delimiter=None,
1151
                            start='', limit=10000, before=inf,
1152
                            except_cluster=0, pathq=[], domain=None,
1153
                            filterq=[], sizeq=None, all_props=False):
1154
        """Return a (list of (path, serial) tuples, list of common prefixes)
1155
           for the current versions of the paths with the given parent,
1156
           matching the following criteria.
1157

1158
           The property tuple for a version is returned if all
1159
           of these conditions are true:
1160

1161
                a. parent matches
1162

1163
                b. path > start
1164

1165
                c. path starts with prefix (and paths in pathq)
1166

1167
                d. version is the max up to before
1168

1169
                e. version is not in cluster
1170

1171
                f. the path does not have the delimiter occuring
1172
                   after the prefix, or ends with the delimiter
1173

1174
                g. serial matches the attribute filter query.
1175

1176
                   A filter query is a comma-separated list of
1177
                   terms in one of these three forms:
1178

1179
                   key
1180
                       an attribute with this key must exist
1181

1182
                   !key
1183
                       an attribute with this key must not exist
1184

1185
                   key ?op value
1186
                       the attribute with this key satisfies the value
1187
                       where ?op is one of ==, != <=, >=, <, >.
1188

1189
                h. the size is in the range set by sizeq
1190

1191
           The list of common prefixes includes the prefixes
1192
           matching up to the first delimiter after prefix,
1193
           and are reported only once, as "virtual directories".
1194
           The delimiter is included in the prefixes.
1195

1196
           If arguments are None, then the corresponding matching rule
1197
           will always match.
1198

1199
           Limit applies to the first list of tuples returned.
1200

1201
           If all_props is True, return all properties after path,
1202
           not just serial.
1203
        """
1204

    
1205
        if not start or start < prefix:
1206
            start = strprevling(prefix)
1207
        nextling = strnextling(prefix)
1208

    
1209
        v = self.versions.alias('v')
1210
        n = self.nodes.alias('n')
1211
        if before != inf:
1212
            filtered = select([func.max(v.c.serial)],
1213
                              and_(v.c.mtime < before,
1214
                                   v.c.node == self.versions.c.node))
1215
            inner_join = \
1216
                    self.nodes.join(self.versions,
1217
                            onclause=self.versions.c.serial==filtered)
1218
        else:
1219
            filtered = select([self.nodes.c.latest_version])
1220
            filtered = filtered.where(self.nodes.c.node == self.versions.c.node).correlate(self.versions)
1221
            inner_join = \
1222
                    self.nodes.join(self.versions,
1223
                            onclause=\
1224
                            self.versions.c.serial==filtered)
1225
        if not all_props:
1226
            s = select([self.nodes.c.path,
1227
                self.versions.c.serial],from_obj=[inner_join]).distinct()
1228
        else:
1229
            s = select([self.nodes.c.path,
1230
                        self.versions.c.serial, self.versions.c.node,
1231
                        self.versions.c.hash,
1232
                        self.versions.c.size, self.versions.c.type,
1233
                        self.versions.c.source,
1234
                        self.versions.c.mtime, self.versions.c.muser,
1235
                        self.versions.c.uuid,
1236
                        self.versions.c.checksum,
1237
                        self.versions.c.cluster],from_obj=[inner_join]).distinct()
1238

    
1239
        s = s.where(self.versions.c.cluster != except_cluster)
1240
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1241
                                             self.nodes.c.parent == parent)))
1242

    
1243
        s = s.where(self.versions.c.node == self.nodes.c.node)
1244
        s = s.where(and_(self.nodes.c.path > bindparam('start'),
1245
                         self.nodes.c.path < nextling))
1246
        conja = []
1247
        conjb = []
1248
        for path, match in pathq:
1249
            if match == MATCH_PREFIX:
1250
                conja.append(self.nodes.c.path.like(self.escape_like(path) + '%',
1251
                             escape=ESCAPE_CHAR))
1252
            elif match == MATCH_EXACT:
1253
                conjb.append(path)
1254
        if conja or conjb:
1255
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1256

    
1257
        if sizeq and len(sizeq) == 2:
1258
            if sizeq[0]:
1259
                s = s.where(self.versions.c.size >= sizeq[0])
1260
            if sizeq[1]:
1261
                s = s.where(self.versions.c.size < sizeq[1])
1262

    
1263
        if domain and filterq:
1264
            a = self.attributes.alias('a')
1265
            included, excluded, opers = parse_filters(filterq)
1266
            if included:
1267
                subs = select([1])
1268
                subs = subs.where(
1269
                    self.attributes.c.serial ==
1270
                    self.versions.c.serial).correlate(self.versions)
1271
                subs = subs.where(self.attributes.c.domain == domain)
1272
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for
1273
                                  x in included]))
1274
                s = s.where(exists(subs))
1275
            if excluded:
1276
                subs = select([1])
1277
                subs = subs.where(
1278
                    self.attributes.c.serial ==
1279
                    self.versions.c.serial).correlate(self.versions)
1280
                subs = subs.where(self.attributes.c.domain == domain)
1281
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for
1282
                                  x in excluded]))
1283
                s = s.where(not_(exists(subs)))
1284
            if opers:
1285
                for k, o, val in opers:
1286
                    subs = select([1])
1287
                    subs = subs.where(self.attributes.c.serial ==
1288
                                      self.versions.c.serial)
1289
                    subs = subs.correlate(self.versions)
1290
                    subs = subs.where(self.attributes.c.domain == domain)
1291
                    subs = subs.where(
1292
                        and_(self.attributes.c.key.op('=')(k),
1293
                             self.attributes.c.value.op(o)(val)))
1294
                    s = s.where(exists(subs))
1295

    
1296
        s = s.order_by(self.nodes.c.path)
1297

    
1298
        if not delimiter:
1299
            s = s.limit(limit)
1300
            rp = self.conn.execute(s, start=start)
1301
            r = rp.fetchall()
1302
            rp.close()
1303
            return r, ()
1304

    
1305
        pfz = len(prefix)
1306
        dz = len(delimiter)
1307
        count = 0
1308
        prefixes = []
1309
        pappend = prefixes.append
1310
        matches = []
1311
        mappend = matches.append
1312

    
1313
        rp = self.conn.execute(s, start=start)
1314
        while True:
1315
            props = rp.fetchone()
1316
            if props is None:
1317
                break
1318
            path = props[0]
1319
            idx = path.find(delimiter, pfz)
1320

    
1321
            if idx < 0:
1322
                mappend(props)
1323
                count += 1
1324
                if count >= limit:
1325
                    break
1326
                continue
1327

    
1328
            if idx + dz == len(path):
1329
                mappend(props)
1330
                count += 1
1331
                continue  # Get one more, in case there is a path.
1332
            pf = path[:idx + dz]
1333
            pappend(pf)
1334
            if count >= limit:
1335
                break
1336

    
1337
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1338
        rp.close()
1339

    
1340
        return matches, prefixes
1341

    
1342
    def latest_uuid(self, uuid, cluster):
1343
        """Return the latest version of the given uuid and cluster.
1344

1345
        Return a (path, serial) tuple.
1346
        If cluster is None, all clusters are considered.
1347

1348
        """
1349

    
1350
        v = self.versions.alias('v')
1351
        n = self.nodes.alias('n')
1352
        s = select([n.c.path, v.c.serial])
1353
        filtered = select([func.max(self.versions.c.serial)])
1354
        filtered = filtered.where(self.versions.c.uuid == uuid)
1355
        if cluster is not None:
1356
            filtered = filtered.where(self.versions.c.cluster == cluster)
1357
        s = s.where(v.c.serial == filtered)
1358
        s = s.where(n.c.node == v.c.node)
1359

    
1360
        r = self.conn.execute(s)
1361
        l = r.fetchone()
1362
        r.close()
1363
        return l
1364

    
1365
    def domain_object_list(self, domain, paths, cluster=None):
1366
        """Return a list of (path, property list, attribute dictionary)
1367
           for the objects in the specific domain and cluster.
1368
        """
1369

    
1370
        v = self.versions.alias('v')
1371
        n = self.nodes.alias('n')
1372
        a = self.attributes.alias('a')
1373

    
1374
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1375
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1376
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1377
        if cluster:
1378
            s = s.where(v.c.cluster == cluster)
1379
        s = s.where(v.c.serial == a.c.serial)
1380
        s = s.where(a.c.domain == domain)
1381
        s = s.where(a.c.node == n.c.node)
1382
        s = s.where(a.c.is_latest == true())
1383
        if paths:
1384
            s = s.where(n.c.path.in_(paths))
1385

    
1386
        r = self.conn.execute(s)
1387
        rows = r.fetchall()
1388
        r.close()
1389

    
1390
        group_by = itemgetter(slice(12))
1391
        rows.sort(key=group_by)
1392
        groups = groupby(rows, group_by)
1393
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1394
                (k, data) in groups]
1395

    
1396
    def get_props(self, paths):
1397
        inner_join = \
1398
            self.nodes.join(self.versions,
1399
                            onclause=self.versions.c.serial ==
1400
                            self.nodes.c.latest_version)
1401
        cc = self.nodes.c.path.in_(paths)
1402
        s = select([self.nodes.c.path, self.versions.c.type],
1403
                   from_obj=[inner_join]).where(cc).distinct()
1404
        r = self.conn.execute(s)
1405
        rows = r.fetchall()
1406
        r.close()
1407
        if rows:
1408
            return rows
1409
        return None