Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 9b595ecc

History | View | Annotate | Download (48.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.sql.expression import true
43
from sqlalchemy.exc import NoSuchTableError
44

    
45
from dbworker import DBWorker, ESCAPE_CHAR
46

    
47
from pithos.backends.filter import parse_filters
48

    
49

    
50
ROOTNODE = 0
51

    
52
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
53
 CLUSTER) = range(11)
54

    
55
(MATCH_PREFIX, MATCH_EXACT) = range(2)
56

    
57
inf = float('inf')
58

    
59

    
60
def strnextling(prefix):
61
    """Return the first unicode string
62
       greater than but not starting with given prefix.
63
       strnextling('hello') -> 'hellp'
64
    """
65
    if not prefix:
66
        ## all strings start with the null string,
67
        ## therefore we have to approximate strnextling('')
68
        ## with the last unicode character supported by python
69
        ## 0x10ffff for wide (32-bit unicode) python builds
70
        ## 0x00ffff for narrow (16-bit unicode) python builds
71
        ## We will not autodetect. 0xffff is safe enough.
72
        return unichr(0xffff)
73
    s = prefix[:-1]
74
    c = ord(prefix[-1])
75
    if c >= 0xffff:
76
        raise RuntimeError
77
    s += unichr(c + 1)
78
    return s
79

    
80

    
81
def strprevling(prefix):
82
    """Return an approximation of the last unicode string
83
       less than but not starting with given prefix.
84
       strprevling(u'hello') -> u'helln\\xffff'
85
    """
86
    if not prefix:
87
        ## There is no prevling for the null string
88
        return prefix
89
    s = prefix[:-1]
90
    c = ord(prefix[-1])
91
    if c > 0:
92
        s += unichr(c - 1) + unichr(0xffff)
93
    return s
94

    
95
_propnames = {
96
    'serial': 0,
97
    'node': 1,
98
    'hash': 2,
99
    'size': 3,
100
    'type': 4,
101
    'source': 5,
102
    'mtime': 6,
103
    'muser': 7,
104
    'uuid': 8,
105
    'checksum': 9,
106
    'cluster': 10,
107
}
108

    
109

    
110
def create_tables(engine):
111
    metadata = MetaData()
112

    
113
    #create nodes table
114
    columns = []
115
    columns.append(Column('node', Integer, primary_key=True))
116
    columns.append(Column('parent', Integer,
117
                          ForeignKey('nodes.node',
118
                                     ondelete='CASCADE',
119
                                     onupdate='CASCADE'),
120
                          autoincrement=False))
121
    columns.append(Column('latest_version', Integer))
122
    columns.append(Column('path', String(2048), default='', nullable=False))
123
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
124
    Index('idx_nodes_path', nodes.c.path, unique=True)
125
    Index('idx_nodes_parent', nodes.c.parent)
126
    Index('idx_latest_version', nodes.c.latest_version)
127

    
128
    #create policy table
129
    columns = []
130
    columns.append(Column('node', Integer,
131
                          ForeignKey('nodes.node',
132
                                     ondelete='CASCADE',
133
                                     onupdate='CASCADE'),
134
                          primary_key=True))
135
    columns.append(Column('key', String(128), primary_key=True))
136
    columns.append(Column('value', String(256)))
137
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
138

    
139
    #create statistics table
140
    columns = []
141
    columns.append(Column('node', Integer,
142
                          ForeignKey('nodes.node',
143
                                     ondelete='CASCADE',
144
                                     onupdate='CASCADE'),
145
                          primary_key=True))
146
    columns.append(Column('population', Integer, nullable=False, default=0))
147
    columns.append(Column('size', BigInteger, nullable=False, default=0))
148
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
149
    columns.append(Column('cluster', Integer, nullable=False, default=0,
150
                          primary_key=True, autoincrement=False))
151
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
152

    
153
    #create versions table
154
    columns = []
155
    columns.append(Column('serial', Integer, primary_key=True))
156
    columns.append(Column('node', Integer,
157
                          ForeignKey('nodes.node',
158
                                     ondelete='CASCADE',
159
                                     onupdate='CASCADE')))
160
    columns.append(Column('hash', String(256)))
161
    columns.append(Column('size', BigInteger, nullable=False, default=0))
162
    columns.append(Column('type', String(256), nullable=False, default=''))
163
    columns.append(Column('source', Integer))
164
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
165
    columns.append(Column('muser', String(256), nullable=False, default=''))
166
    columns.append(Column('uuid', String(64), nullable=False, default=''))
167
    columns.append(Column('checksum', String(256), nullable=False, default=''))
168
    columns.append(Column('cluster', Integer, nullable=False, default=0))
169
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
170
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
171
    Index('idx_versions_node_uuid', versions.c.uuid)
172

    
173
    #create attributes table
174
    columns = []
175
    columns.append(Column('serial', Integer,
176
                          ForeignKey('versions.serial',
177
                                     ondelete='CASCADE',
178
                                     onupdate='CASCADE'),
179
                          primary_key=True))
180
    columns.append(Column('domain', String(256), primary_key=True))
181
    columns.append(Column('key', String(128), primary_key=True))
182
    columns.append(Column('value', String(256)))
183
    columns.append(Column('node', Integer, nullable=False, default=0))
184
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
185
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
186
    Index('idx_attributes_domain', attributes.c.domain)
187
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
188

    
189
    metadata.create_all(engine)
190
    return metadata.sorted_tables
191

    
192

    
193
class Node(DBWorker):
194
    """Nodes store path organization and have multiple versions.
195
       Versions store object history and have multiple attributes.
196
       Attributes store metadata.
197
    """
198

    
199
    # TODO: Provide an interface for included and excluded clusters.
200

    
201
    def __init__(self, **params):
202
        DBWorker.__init__(self, **params)
203
        try:
204
            metadata = MetaData(self.engine)
205
            self.nodes = Table('nodes', metadata, autoload=True)
206
            self.policy = Table('policy', metadata, autoload=True)
207
            self.statistics = Table('statistics', metadata, autoload=True)
208
            self.versions = Table('versions', metadata, autoload=True)
209
            self.attributes = Table('attributes', metadata, autoload=True)
210
        except NoSuchTableError:
211
            tables = create_tables(self.engine)
212
            map(lambda t: self.__setattr__(t.name, t), tables)
213

    
214
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
215
                                           self.nodes.c.parent == ROOTNODE))
216
        wrapper = self.wrapper
217
        wrapper.execute()
218
        try:
219
            rp = self.conn.execute(s)
220
            r = rp.fetchone()
221
            rp.close()
222
            if not r:
223
                s = self.nodes.insert(
224
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
225
                self.conn.execute(s)
226
        finally:
227
            wrapper.commit()
228

    
229
    def node_create(self, parent, path):
230
        """Create a new node from the given properties.
231
           Return the node identifier of the new node.
232
        """
233
        #TODO catch IntegrityError?
234
        s = self.nodes.insert().values(parent=parent, path=path)
235
        r = self.conn.execute(s)
236
        inserted_primary_key = r.inserted_primary_key[0]
237
        r.close()
238
        return inserted_primary_key
239

    
240
    def node_lookup(self, path, for_update=False):
241
        """Lookup the current node of the given path.
242
           Return None if the path is not found.
243
        """
244

    
245
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
246
        s = select([self.nodes.c.node],
247
                   self.nodes.c.path.like(self.escape_like(path),
248
                                          escape=ESCAPE_CHAR),
249
                   for_update=for_update)
250
        r = self.conn.execute(s)
251
        row = r.fetchone()
252
        r.close()
253
        if row:
254
            return row[0]
255
        return None
256

    
257
    def node_lookup_bulk(self, paths):
258
        """Lookup the current nodes for the given paths.
259
           Return () if the path is not found.
260
        """
261

    
262
        if not paths:
263
            return ()
264
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
265
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
266
        r = self.conn.execute(s)
267
        rows = r.fetchall()
268
        r.close()
269
        return [row[0] for row in rows]
270

    
271
    def node_get_properties(self, node):
272
        """Return the node's (parent, path).
273
           Return None if the node is not found.
274
        """
275

    
276
        s = select([self.nodes.c.parent, self.nodes.c.path])
277
        s = s.where(self.nodes.c.node == node)
278
        r = self.conn.execute(s)
279
        l = r.fetchone()
280
        r.close()
281
        return l
282

    
283
    def node_get_versions(self, node, keys=(), propnames=_propnames):
284
        """Return the properties of all versions at node.
285
           If keys is empty, return all properties in the order
286
           (serial, node, hash, size, type, source, mtime, muser, uuid,
287
            checksum, cluster).
288
        """
289

    
290
        s = select([self.versions.c.serial,
291
                    self.versions.c.node,
292
                    self.versions.c.hash,
293
                    self.versions.c.size,
294
                    self.versions.c.type,
295
                    self.versions.c.source,
296
                    self.versions.c.mtime,
297
                    self.versions.c.muser,
298
                    self.versions.c.uuid,
299
                    self.versions.c.checksum,
300
                    self.versions.c.cluster], self.versions.c.node == node)
301
        s = s.order_by(self.versions.c.serial)
302
        r = self.conn.execute(s)
303
        rows = r.fetchall()
304
        r.close()
305
        if not rows:
306
            return rows
307

    
308
        if not keys:
309
            return rows
310

    
311
        return [[p[propnames[k]] for k in keys if k in propnames] for
312
                p in rows]
313

    
314
    def node_count_children(self, node):
315
        """Return node's child count."""
316

    
317
        s = select([func.count(self.nodes.c.node)])
318
        s = s.where(and_(self.nodes.c.parent == node,
319
                         self.nodes.c.node != ROOTNODE))
320
        r = self.conn.execute(s)
321
        row = r.fetchone()
322
        r.close()
323
        return row[0]
324

    
325
    def node_purge_children(self, parent, before=inf, cluster=0,
326
                            update_statistics_ancestors_depth=None):
327
        """Delete all versions with the specified
328
           parent and cluster, and return
329
           the hashes, the total size and the serials of versions deleted.
330
           Clears out nodes with no remaining versions.
331
        """
332
        #update statistics
333
        c1 = select([self.nodes.c.node],
334
                    self.nodes.c.parent == parent)
335
        where_clause = and_(self.versions.c.node.in_(c1),
336
                            self.versions.c.cluster == cluster)
337
        if before != inf:
338
            where_clause = and_(where_clause,
339
                                self.versions.c.mtime <= before)
340
        s = select([func.count(self.versions.c.serial),
341
                    func.sum(self.versions.c.size)])
342
        s = s.where(where_clause)
343
        r = self.conn.execute(s)
344
        row = r.fetchone()
345
        r.close()
346
        if not row:
347
            return (), 0, ()
348
        nr, size = row[0], row[1] if row[1] else 0
349
        mtime = time()
350
        self.statistics_update(parent, -nr, -size, mtime, cluster)
351
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
352
                                         update_statistics_ancestors_depth)
353

    
354
        s = select([self.versions.c.hash, self.versions.c.serial])
355
        s = s.where(where_clause)
356
        r = self.conn.execute(s)
357
        hashes = []
358
        serials = []
359
        for row in r.fetchall():
360
            hashes += [row[0]]
361
            serials += [row[1]]
362
        r.close()
363

    
364
        #delete versions
365
        s = self.versions.delete().where(where_clause)
366
        r = self.conn.execute(s)
367
        r.close()
368

    
369
        #delete nodes
370
        s = select([self.nodes.c.node],
371
                   and_(self.nodes.c.parent == parent,
372
                        select([func.count(self.versions.c.serial)],
373
                               self.versions.c.node == self.nodes.c.node).
374
                        as_scalar() == 0))
375
        rp = self.conn.execute(s)
376
        nodes = [row[0] for row in rp.fetchall()]
377
        rp.close()
378
        if nodes:
379
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
380
            self.conn.execute(s).close()
381

    
382
        return hashes, size, serials
383

    
384
    def node_purge(self, node, before=inf, cluster=0,
385
                   update_statistics_ancestors_depth=None):
386
        """Delete all versions with the specified
387
           node and cluster, and return
388
           the hashes and size of versions deleted.
389
           Clears out the node if it has no remaining versions.
390
        """
391

    
392
        #update statistics
393
        s = select([func.count(self.versions.c.serial),
394
                    func.sum(self.versions.c.size)])
395
        where_clause = and_(self.versions.c.node == node,
396
                            self.versions.c.cluster == cluster)
397
        if before != inf:
398
            where_clause = and_(where_clause,
399
                                self.versions.c.mtime <= before)
400
        s = s.where(where_clause)
401
        r = self.conn.execute(s)
402
        row = r.fetchone()
403
        nr, size = row[0], row[1]
404
        r.close()
405
        if not nr:
406
            return (), 0, ()
407
        mtime = time()
408
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
409
                                         update_statistics_ancestors_depth)
410

    
411
        s = select([self.versions.c.hash, self.versions.c.serial])
412
        s = s.where(where_clause)
413
        r = self.conn.execute(s)
414
        hashes = []
415
        serials = []
416
        for row in r.fetchall():
417
            hashes += [row[0]]
418
            serials += [row[1]]
419
        r.close()
420

    
421
        #delete versions
422
        s = self.versions.delete().where(where_clause)
423
        r = self.conn.execute(s)
424
        r.close()
425

    
426
        #delete nodes
427
        s = select([self.nodes.c.node],
428
                   and_(self.nodes.c.node == node,
429
                        select([func.count(self.versions.c.serial)],
430
                               self.versions.c.node == self.nodes.c.node).
431
                        as_scalar() == 0))
432
        rp = self.conn.execute(s)
433
        nodes = [row[0] for row in rp.fetchall()]
434
        rp.close()
435
        if nodes:
436
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
437
            self.conn.execute(s).close()
438

    
439
        return hashes, size, serials
440

    
441
    def node_remove(self, node, update_statistics_ancestors_depth=None):
442
        """Remove the node specified.
443
           Return false if the node has children or is not found.
444
        """
445

    
446
        if self.node_count_children(node):
447
            return False
448

    
449
        mtime = time()
450
        s = select([func.count(self.versions.c.serial),
451
                    func.sum(self.versions.c.size),
452
                    self.versions.c.cluster])
453
        s = s.where(self.versions.c.node == node)
454
        s = s.group_by(self.versions.c.cluster)
455
        r = self.conn.execute(s)
456
        for population, size, cluster in r.fetchall():
457
            self.statistics_update_ancestors(
458
                node, -population, -size, mtime, cluster,
459
                update_statistics_ancestors_depth)
460
        r.close()
461

    
462
        s = self.nodes.delete().where(self.nodes.c.node == node)
463
        self.conn.execute(s).close()
464
        return True
465

    
466
    def node_accounts(self, accounts=()):
467
        s = select([self.nodes.c.path, self.nodes.c.node])
468
        s = s.where(and_(self.nodes.c.node != 0,
469
                         self.nodes.c.parent == 0))
470
        if accounts:
471
            s = s.where(self.nodes.c.path.in_(accounts))
472
        r = self.conn.execute(s)
473
        rows = r.fetchall()
474
        r.close()
475
        return rows
476

    
477
    def node_account_quotas(self):
478
        s = select([self.nodes.c.path, self.policy.c.value])
479
        s = s.where(and_(self.nodes.c.node != 0,
480
                         self.nodes.c.parent == 0))
481
        s = s.where(self.nodes.c.node == self.policy.c.node)
482
        s = s.where(self.policy.c.key == 'quota')
483
        r = self.conn.execute(s)
484
        rows = r.fetchall()
485
        r.close()
486
        return dict(rows)
487

    
488
    def node_account_usage(self, account=None, cluster=0):
489
        """Return usage for a specific account.
490

491
        Keyword arguments:
492
        account -- (default None: list usage for all the accounts)
493
        cluster -- list current, history or deleted usage (default 0: normal)
494
        """
495

    
496
        n1 = self.nodes.alias('n1')
497
        n2 = self.nodes.alias('n2')
498
        n3 = self.nodes.alias('n3')
499

    
500
        s = select([n3.c.path, func.sum(self.versions.c.size)])
501
        s = s.where(n1.c.node == self.versions.c.node)
502
        s = s.where(self.versions.c.cluster == cluster)
503
        s = s.where(n1.c.parent == n2.c.node)
504
        s = s.where(n2.c.parent == n3.c.node)
505
        s = s.where(n3.c.parent == 0)
506
        s = s.where(n3.c.node != 0)
507
        if account:
508
            s = s.where(n3.c.path == account)
509
        s = s.group_by(n3.c.path)
510
        r = self.conn.execute(s)
511
        usage = r.fetchall()
512
        r.close()
513
        return dict(usage)
514

    
515
    def policy_get(self, node):
516
        s = select([self.policy.c.key, self.policy.c.value],
517
                   self.policy.c.node == node)
518
        r = self.conn.execute(s)
519
        d = dict(r.fetchall())
520
        r.close()
521
        return d
522

    
523
    def policy_set(self, node, policy):
524
        #insert or replace
525
        for k, v in policy.iteritems():
526
            s = self.policy.update().where(and_(self.policy.c.node == node,
527
                                                self.policy.c.key == k))
528
            s = s.values(value=v)
529
            rp = self.conn.execute(s)
530
            rp.close()
531
            if rp.rowcount == 0:
532
                s = self.policy.insert()
533
                values = {'node': node, 'key': k, 'value': v}
534
                r = self.conn.execute(s, values)
535
                r.close()
536

    
537
    def statistics_get(self, node, cluster=0):
538
        """Return population, total size and last mtime
539
           for all versions under node that belong to the cluster.
540
        """
541

    
542
        s = select([self.statistics.c.population,
543
                    self.statistics.c.size,
544
                    self.statistics.c.mtime])
545
        s = s.where(and_(self.statistics.c.node == node,
546
                         self.statistics.c.cluster == cluster))
547
        r = self.conn.execute(s)
548
        row = r.fetchone()
549
        r.close()
550
        return row
551

    
552
    def statistics_update(self, node, population, size, mtime, cluster=0):
553
        """Update the statistics of the given node.
554
           Statistics keep track the population, total
555
           size of objects and mtime in the node's namespace.
556
           May be zero or positive or negative numbers.
557
        """
558
        s = select([self.statistics.c.population, self.statistics.c.size],
559
                   and_(self.statistics.c.node == node,
560
                        self.statistics.c.cluster == cluster))
561
        rp = self.conn.execute(s)
562
        r = rp.fetchone()
563
        rp.close()
564
        if not r:
565
            prepopulation, presize = (0, 0)
566
        else:
567
            prepopulation, presize = r
568
        population += prepopulation
569
        population = max(population, 0)
570
        size += presize
571

    
572
        #insert or replace
573
        #TODO better upsert
574
        u = self.statistics.update().where(and_(
575
            self.statistics.c.node == node,
576
            self.statistics.c.cluster == cluster))
577
        u = u.values(population=population, size=size, mtime=mtime)
578
        rp = self.conn.execute(u)
579
        rp.close()
580
        if rp.rowcount == 0:
581
            ins = self.statistics.insert()
582
            ins = ins.values(node=node, population=population, size=size,
583
                             mtime=mtime, cluster=cluster)
584
            self.conn.execute(ins).close()
585

    
586
    def statistics_update_ancestors(self, node, population, size, mtime,
587
                                    cluster=0, recursion_depth=None):
588
        """Update the statistics of the given node's parent.
589
           Then recursively update all parents up to the root
590
           or up to the ``recursion_depth`` (if not None).
591
           Population is not recursive.
592
        """
593

    
594
        i = 0
595
        while True:
596
            if node == ROOTNODE:
597
                break
598
            if recursion_depth and recursion_depth <= i:
599
                break
600
            props = self.node_get_properties(node)
601
            if props is None:
602
                break
603
            parent, path = props
604
            self.statistics_update(parent, population, size, mtime, cluster)
605
            node = parent
606
            population = 0  # Population isn't recursive
607
            i += 1
608

    
609
    def statistics_latest(self, node, before=inf, except_cluster=0):
610
        """Return population, total size and last mtime
611
           for all latest versions under node that
612
           do not belong to the cluster.
613
        """
614

    
615
        # The node.
616
        props = self.node_get_properties(node)
617
        if props is None:
618
            return None
619
        parent, path = props
620

    
621
        # The latest version.
622
        s = select([self.versions.c.serial,
623
                    self.versions.c.node,
624
                    self.versions.c.hash,
625
                    self.versions.c.size,
626
                    self.versions.c.type,
627
                    self.versions.c.source,
628
                    self.versions.c.mtime,
629
                    self.versions.c.muser,
630
                    self.versions.c.uuid,
631
                    self.versions.c.checksum,
632
                    self.versions.c.cluster])
633
        if before != inf:
634
            filtered = select([func.max(self.versions.c.serial)],
635
                              self.versions.c.node == node)
636
            filtered = filtered.where(self.versions.c.mtime < before)
637
        else:
638
            filtered = select([self.nodes.c.latest_version],
639
                              self.nodes.c.node == node)
640
        s = s.where(and_(self.versions.c.cluster != except_cluster,
641
                         self.versions.c.serial == filtered))
642
        r = self.conn.execute(s)
643
        props = r.fetchone()
644
        r.close()
645
        if not props:
646
            return None
647
        mtime = props[MTIME]
648

    
649
        # First level, just under node (get population).
650
        v = self.versions.alias('v')
651
        s = select([func.count(v.c.serial),
652
                    func.sum(v.c.size),
653
                    func.max(v.c.mtime)])
654
        if before != inf:
655
            c1 = select([func.max(self.versions.c.serial)],
656
                        and_(self.versions.c.mtime < before,
657
                             self.versions.c.node == v.c.node))
658
        else:
659
            c1 = select([self.nodes.c.latest_version])
660
            c1 = c1.where(self.nodes.c.node == v.c.node)
661
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
662
        s = s.where(and_(v.c.serial == c1,
663
                         v.c.cluster != except_cluster,
664
                         v.c.node.in_(c2)))
665
        rp = self.conn.execute(s)
666
        r = rp.fetchone()
667
        rp.close()
668
        if not r:
669
            return None
670
        count = r[0]
671
        mtime = max(mtime, r[2])
672
        if count == 0:
673
            return (0, 0, mtime)
674

    
675
        # All children (get size and mtime).
676
        # This is why the full path is stored.
677
        if before != inf:
678
            s = select([func.count(v.c.serial),
679
                        func.sum(v.c.size),
680
                        func.max(v.c.mtime)])
681
            c1 = select([func.max(self.versions.c.serial)],
682
                        and_(self.versions.c.mtime < before,
683
                             self.versions.c.node == v.c.node))
684
        else:
685
            inner_join = self.versions.join(
686
                self.nodes,
687
                onclause=self.versions.c.serial == self.nodes.c.latest_version)
688
            s = select([func.count(self.versions.c.serial),
689
                       func.sum(self.versions.c.size),
690
                       func.max(self.versions.c.mtime)], from_obj=[inner_join])
691

    
692
        c2 = select([self.nodes.c.node],
693
                    self.nodes.c.path.like(self.escape_like(path) + '%',
694
                                           escape=ESCAPE_CHAR))
695
        if before != inf:
696
            s = s.where(and_(v.c.serial == c1,
697
                             v.c.cluster != except_cluster,
698
                             v.c.node.in_(c2)))
699
        else:
700
            s = s.where(and_(self.versions.c.cluster != except_cluster,
701
                        self.versions.c.node.in_(c2)))
702

    
703
        rp = self.conn.execute(s)
704
        r = rp.fetchone()
705
        rp.close()
706
        if not r:
707
            return None
708
        size = long(r[1] - props[SIZE])
709
        mtime = max(mtime, r[2])
710
        return (count, size, mtime)
711

    
712
    def nodes_set_latest_version(self, node, serial):
713
        s = self.nodes.update().where(self.nodes.c.node == node)
714
        s = s.values(latest_version=serial)
715
        self.conn.execute(s).close()
716

    
717
    def version_create(self, node, hash, size, type, source, muser, uuid,
718
                       checksum, cluster=0,
719
                       update_statistics_ancestors_depth=None):
720
        """Create a new version from the given properties.
721
           Return the (serial, mtime) of the new version.
722
        """
723

    
724
        mtime = time()
725
        s = self.versions.insert().values(
726
            node=node, hash=hash, size=size, type=type, source=source,
727
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
728
            cluster=cluster)
729
        serial = self.conn.execute(s).inserted_primary_key[0]
730
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
731
                                         update_statistics_ancestors_depth)
732

    
733
        self.nodes_set_latest_version(node, serial)
734

    
735
        return serial, mtime
736

    
737
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
738
        """Lookup the current version of the given node.
739
           Return a list with its properties:
740
           (serial, node, hash, size, type, source, mtime,
741
            muser, uuid, checksum, cluster)
742
           or None if the current version is not found in the given cluster.
743
        """
744

    
745
        v = self.versions.alias('v')
746
        if not all_props:
747
            s = select([v.c.serial])
748
        else:
749
            s = select([v.c.serial, v.c.node, v.c.hash,
750
                        v.c.size, v.c.type, v.c.source,
751
                        v.c.mtime, v.c.muser, v.c.uuid,
752
                        v.c.checksum, v.c.cluster])
753
        if before != inf:
754
            c = select([func.max(self.versions.c.serial)],
755
                       self.versions.c.node == node)
756
            c = c.where(self.versions.c.mtime < before)
757
        else:
758
            c = select([self.nodes.c.latest_version],
759
                       self.nodes.c.node == node)
760
        s = s.where(and_(v.c.serial == c,
761
                         v.c.cluster == cluster))
762
        r = self.conn.execute(s)
763
        props = r.fetchone()
764
        r.close()
765
        if props:
766
            return props
767
        return None
768

    
769
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
770
                            all_props=True, order_by_path=False):
771
        """Lookup the current versions of the given nodes.
772
           Return a list with their properties:
773
           (serial, node, hash, size, type, source, mtime, muser, uuid,
774
            checksum, cluster).
775
        """
776
        if not nodes:
777
            return ()
778
        v = self.versions.alias('v')
779
        n = self.nodes.alias('n')
780
        if not all_props:
781
            s = select([v.c.serial])
782
        else:
783
            s = select([v.c.serial, v.c.node, v.c.hash,
784
                        v.c.size, v.c.type, v.c.source,
785
                        v.c.mtime, v.c.muser, v.c.uuid,
786
                        v.c.checksum, v.c.cluster])
787
        if before != inf:
788
            c = select([func.max(self.versions.c.serial)],
789
                       self.versions.c.node.in_(nodes))
790
            c = c.where(self.versions.c.mtime < before)
791
            c = c.group_by(self.versions.c.node)
792
        else:
793
            c = select([self.nodes.c.latest_version],
794
                       self.nodes.c.node.in_(nodes))
795
        s = s.where(and_(v.c.serial.in_(c),
796
                         v.c.cluster == cluster))
797
        if order_by_path:
798
            s = s.where(v.c.node == n.c.node)
799
            s = s.order_by(n.c.path)
800
        else:
801
            s = s.order_by(v.c.node)
802
        r = self.conn.execute(s)
803
        rproxy = r.fetchall()
804
        r.close()
805
        return (tuple(row.values()) for row in rproxy)
806

    
807
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
808
                               node=None):
809
        """Return a sequence of values for the properties of
810
           the version specified by serial and the keys, in the order given.
811
           If keys is empty, return all properties in the order
812
           (serial, node, hash, size, type, source, mtime, muser, uuid,
813
            checksum, cluster).
814
        """
815

    
816
        v = self.versions.alias()
817
        s = select([v.c.serial, v.c.node, v.c.hash,
818
                    v.c.size, v.c.type, v.c.source,
819
                    v.c.mtime, v.c.muser, v.c.uuid,
820
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
821
        if node is not None:
822
            s = s.where(v.c.node == node)
823
        rp = self.conn.execute(s)
824
        r = rp.fetchone()
825
        rp.close()
826
        if r is None:
827
            return r
828

    
829
        if not keys:
830
            return r
831
        return [r[propnames[k]] for k in keys if k in propnames]
832

    
833
    def version_put_property(self, serial, key, value):
834
        """Set value for the property of version specified by key."""
835

    
836
        if key not in _propnames:
837
            return
838
        s = self.versions.update()
839
        s = s.where(self.versions.c.serial == serial)
840
        s = s.values(**{key: value})
841
        self.conn.execute(s).close()
842

    
843
    def version_recluster(self, serial, cluster,
844
                          update_statistics_ancestors_depth=None):
845
        """Move the version into another cluster."""
846

    
847
        props = self.version_get_properties(serial)
848
        if not props:
849
            return
850
        node = props[NODE]
851
        size = props[SIZE]
852
        oldcluster = props[CLUSTER]
853
        if cluster == oldcluster:
854
            return
855

    
856
        mtime = time()
857
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
858
                                         update_statistics_ancestors_depth)
859
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
860
                                         update_statistics_ancestors_depth)
861

    
862
        s = self.versions.update()
863
        s = s.where(self.versions.c.serial == serial)
864
        s = s.values(cluster=cluster)
865
        self.conn.execute(s).close()
866

    
867
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
868
        """Remove the serial specified."""
869

    
870
        props = self.version_get_properties(serial)
871
        if not props:
872
            return
873
        node = props[NODE]
874
        hash = props[HASH]
875
        size = props[SIZE]
876
        cluster = props[CLUSTER]
877

    
878
        mtime = time()
879
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
880
                                         update_statistics_ancestors_depth)
881

    
882
        s = self.versions.delete().where(self.versions.c.serial == serial)
883
        self.conn.execute(s).close()
884

    
885
        props = self.version_lookup(node, cluster=cluster, all_props=False)
886
        if props:
887
            self.nodes_set_latest_version(node, serial)
888

    
889
        return hash, size
890

    
891
    def attribute_get(self, serial, domain, keys=()):
892
        """Return a list of (key, value) pairs of the specific version.
893

894
        If keys is empty, return all attributes.
895
        Othwerise, return only those specified.
896
        """
897

    
898
        if keys:
899
            attrs = self.attributes.alias()
900
            s = select([attrs.c.key, attrs.c.value])
901
            s = s.where(and_(attrs.c.key.in_(keys),
902
                             attrs.c.serial == serial,
903
                             attrs.c.domain == domain))
904
        else:
905
            attrs = self.attributes.alias()
906
            s = select([attrs.c.key, attrs.c.value])
907
            s = s.where(and_(attrs.c.serial == serial,
908
                             attrs.c.domain == domain))
909
        r = self.conn.execute(s)
910
        l = r.fetchall()
911
        r.close()
912
        return l
913

    
914
    def attribute_set(self, serial, domain, node, items, is_latest=True):
915
        """Set the attributes of the version specified by serial.
916
           Receive attributes as an iterable of (key, value) pairs.
917
        """
918
        #insert or replace
919
        #TODO better upsert
920
        for k, v in items:
921
            s = self.attributes.update()
922
            s = s.where(and_(self.attributes.c.serial == serial,
923
                             self.attributes.c.domain == domain,
924
                             self.attributes.c.key == k))
925
            s = s.values(value=v)
926
            rp = self.conn.execute(s)
927
            rp.close()
928
            if rp.rowcount == 0:
929
                s = self.attributes.insert()
930
                s = s.values(serial=serial, domain=domain, node=node,
931
                             is_latest=is_latest, key=k, value=v)
932
                self.conn.execute(s).close()
933

    
934
    def attribute_del(self, serial, domain, keys=()):
935
        """Delete attributes of the version specified by serial.
936
           If keys is empty, delete all attributes.
937
           Otherwise delete those specified.
938
        """
939

    
940
        if keys:
941
            #TODO more efficient way to do this?
942
            for key in keys:
943
                s = self.attributes.delete()
944
                s = s.where(and_(self.attributes.c.serial == serial,
945
                                 self.attributes.c.domain == domain,
946
                                 self.attributes.c.key == key))
947
                self.conn.execute(s).close()
948
        else:
949
            s = self.attributes.delete()
950
            s = s.where(and_(self.attributes.c.serial == serial,
951
                             self.attributes.c.domain == domain))
952
            self.conn.execute(s).close()
953

    
954
    def attribute_copy(self, source, dest):
955
        s = select(
956
            [dest, self.attributes.c.domain, self.attributes.c.node,
957
             self.attributes.c.key, self.attributes.c.value],
958
            self.attributes.c.serial == source)
959
        rp = self.conn.execute(s)
960
        attributes = rp.fetchall()
961
        rp.close()
962
        for dest, domain, node, k, v in attributes:
963
            select_src_node = select(
964
                [self.versions.c.node],
965
                self.versions.c.serial == dest)
966
            # insert or replace
967
            s = self.attributes.update().where(and_(
968
                self.attributes.c.serial == dest,
969
                self.attributes.c.domain == domain,
970
                self.attributes.c.key == k))
971
            s = s.values(node=select_src_node, value=v)
972
            rp = self.conn.execute(s)
973
            rp.close()
974
            if rp.rowcount == 0:
975
                s = self.attributes.insert()
976
                s = s.values(serial=dest, domain=domain, node=select_src_node,
977
                             is_latest=True, key=k, value=v)
978
            self.conn.execute(s).close()
979

    
980
    def attribute_unset_is_latest(self, node, exclude):
981
        u = self.attributes.update().where(and_(
982
            self.attributes.c.node == node,
983
            self.attributes.c.serial != exclude)).values({'is_latest': False})
984
        self.conn.execute(u)
985

    
986
    def latest_attribute_keys(self, parent, domain, before=inf,
987
                              except_cluster=0, pathq=None):
988
        """Return a list with all keys pairs defined
989
           for all latest versions under parent that
990
           do not belong to the cluster.
991
        """
992

    
993
        pathq = pathq or []
994

    
995
        # TODO: Use another table to store before=inf results.
996
        v = self.versions.alias('v')
997
        s = select([self.attributes.c.key]).distinct()
998
        if before != inf:
999
            filtered = select([func.max(v.c.serial)],
1000
                              and_(v.c.mtime < before,
1001
                                   v.c.node == self.versions.c.node))
1002
        else:
1003
            filtered = select([self.nodes.c.latest_version])
1004
            filtered = filtered.where(
1005
                self.nodes.c.node == self.versions.c.node).\
1006
                correlate(self.versions)
1007
        s = s.where(self.versions.c.serial == filtered)
1008
        s = s.where(self.versions.c.cluster != except_cluster)
1009
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1010
                                             self.nodes.c.parent == parent)))
1011
        s = s.where(self.attributes.c.serial == self.versions.c.serial)
1012
        s = s.where(self.attributes.c.domain == domain)
1013
        s = s.where(self.nodes.c.node == self.versions.c.node)
1014
        s = s.order_by(self.attributes.c.key)
1015
        conja = []
1016
        conjb = []
1017
        for path, match in pathq:
1018
            if match == MATCH_PREFIX:
1019
                conja.append(self.nodes.c.path.like(
1020
                    self.escape_like(path) + '%', escape=ESCAPE_CHAR))
1021
            elif match == MATCH_EXACT:
1022
                conjb.append(path)
1023
        if conja or conjb:
1024
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1025
        rp = self.conn.execute(s)
1026
        rows = rp.fetchall()
1027
        rp.close()
1028
        return [r[0] for r in rows]
1029

    
1030
    def latest_version_list(self, parent, prefix='', delimiter=None,
1031
                            start='', limit=10000, before=inf,
1032
                            except_cluster=0, pathq=[], domain=None,
1033
                            filterq=[], sizeq=None, all_props=False):
1034
        """Return a (list of (path, serial) tuples, list of common prefixes)
1035
           for the current versions of the paths with the given parent,
1036
           matching the following criteria.
1037

1038
           The property tuple for a version is returned if all
1039
           of these conditions are true:
1040

1041
                a. parent matches
1042

1043
                b. path > start
1044

1045
                c. path starts with prefix (and paths in pathq)
1046

1047
                d. version is the max up to before
1048

1049
                e. version is not in cluster
1050

1051
                f. the path does not have the delimiter occuring
1052
                   after the prefix, or ends with the delimiter
1053

1054
                g. serial matches the attribute filter query.
1055

1056
                   A filter query is a comma-separated list of
1057
                   terms in one of these three forms:
1058

1059
                   key
1060
                       an attribute with this key must exist
1061

1062
                   !key
1063
                       an attribute with this key must not exist
1064

1065
                   key ?op value
1066
                       the attribute with this key satisfies the value
1067
                       where ?op is one of ==, != <=, >=, <, >.
1068

1069
                h. the size is in the range set by sizeq
1070

1071
           The list of common prefixes includes the prefixes
1072
           matching up to the first delimiter after prefix,
1073
           and are reported only once, as "virtual directories".
1074
           The delimiter is included in the prefixes.
1075

1076
           If arguments are None, then the corresponding matching rule
1077
           will always match.
1078

1079
           Limit applies to the first list of tuples returned.
1080

1081
           If all_props is True, return all properties after path,
1082
           not just serial.
1083
        """
1084

    
1085
        if not start or start < prefix:
1086
            start = strprevling(prefix)
1087
        nextling = strnextling(prefix)
1088

    
1089
        v = self.versions.alias('v')
1090
        if before != inf:
1091
            filtered = select([func.max(v.c.serial)],
1092
                              and_(v.c.mtime < before,
1093
                                   v.c.node == self.versions.c.node))
1094
            inner_join = self.nodes.join(
1095
                self.versions,
1096
                onclause=self.versions.c.serial == filtered)
1097
        else:
1098
            filtered = select([self.nodes.c.latest_version])
1099
            filtered = filtered.where(
1100
                self.nodes.c.node == self.versions.c.node).\
1101
                correlate(self.versions)
1102
            inner_join = self.nodes.join(
1103
                self.versions,
1104
                onclause=self.versions.c.serial == filtered)
1105
        if not all_props:
1106
            s = select([self.nodes.c.path, self.versions.c.serial],
1107
                       from_obj=[inner_join]).distinct()
1108
        else:
1109
            s = select([self.nodes.c.path,
1110
                        self.versions.c.serial, self.versions.c.node,
1111
                        self.versions.c.hash,
1112
                        self.versions.c.size, self.versions.c.type,
1113
                        self.versions.c.source,
1114
                        self.versions.c.mtime, self.versions.c.muser,
1115
                        self.versions.c.uuid,
1116
                        self.versions.c.checksum,
1117
                        self.versions.c.cluster],
1118
                       from_obj=[inner_join]).distinct()
1119

    
1120
        s = s.where(self.versions.c.cluster != except_cluster)
1121
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1122
                                             self.nodes.c.parent == parent)))
1123

    
1124
        s = s.where(self.versions.c.node == self.nodes.c.node)
1125
        s = s.where(and_(self.nodes.c.path > bindparam('start'),
1126
                    self.nodes.c.path < nextling))
1127
        conja = []
1128
        conjb = []
1129
        for path, match in pathq:
1130
            if match == MATCH_PREFIX:
1131
                conja.append(
1132
                    self.nodes.c.path.like(self.escape_like(path) + '%',
1133
                                           escape=ESCAPE_CHAR))
1134
            elif match == MATCH_EXACT:
1135
                conjb.append(path)
1136
        if conja or conjb:
1137
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1138

    
1139
        if sizeq and len(sizeq) == 2:
1140
            if sizeq[0]:
1141
                s = s.where(self.versions.c.size >= sizeq[0])
1142
            if sizeq[1]:
1143
                s = s.where(self.versions.c.size < sizeq[1])
1144

    
1145
        if domain and filterq:
1146
            included, excluded, opers = parse_filters(filterq)
1147
            if included:
1148
                subs = select([1])
1149
                subs = subs.where(
1150
                    self.attributes.c.serial ==
1151
                    self.versions.c.serial).correlate(self.versions)
1152
                subs = subs.where(self.attributes.c.domain == domain)
1153
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for
1154
                                  x in included]))
1155
                s = s.where(exists(subs))
1156
            if excluded:
1157
                subs = select([1])
1158
                subs = subs.where(
1159
                    self.attributes.c.serial == self.versions.c.serial).\
1160
                    correlate(self.versions)
1161
                subs = subs.where(self.attributes.c.domain == domain)
1162
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for
1163
                                  x in excluded]))
1164
                s = s.where(not_(exists(subs)))
1165
            if opers:
1166
                for k, o, val in opers:
1167
                    subs = select([1])
1168
                    subs = subs.where(
1169
                        self.attributes.c.serial == self.versions.c.serial).\
1170
                        correlate(self.versions)
1171
                    subs = subs.where(self.attributes.c.domain == domain)
1172
                    subs = subs.where(
1173
                        and_(self.attributes.c.key.op('=')(k),
1174
                             self.attributes.c.value.op(o)(val)))
1175
                    s = s.where(exists(subs))
1176

    
1177
        s = s.order_by(self.nodes.c.path)
1178

    
1179
        if not delimiter:
1180
            s = s.limit(limit)
1181
            rp = self.conn.execute(s, start=start)
1182
            r = rp.fetchall()
1183
            rp.close()
1184
            return r, ()
1185

    
1186
        pfz = len(prefix)
1187
        dz = len(delimiter)
1188
        count = 0
1189
        prefixes = []
1190
        pappend = prefixes.append
1191
        matches = []
1192
        mappend = matches.append
1193

    
1194
        rp = self.conn.execute(s, start=start)
1195
        while True:
1196
            props = rp.fetchone()
1197
            if props is None:
1198
                break
1199
            path = props[0]
1200
            idx = path.find(delimiter, pfz)
1201

    
1202
            if idx < 0:
1203
                mappend(props)
1204
                count += 1
1205
                if count >= limit:
1206
                    break
1207
                continue
1208

    
1209
            if idx + dz == len(path):
1210
                mappend(props)
1211
                count += 1
1212
                continue  # Get one more, in case there is a path.
1213
            pf = path[:idx + dz]
1214
            pappend(pf)
1215
            if count >= limit:
1216
                break
1217

    
1218
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1219
        rp.close()
1220

    
1221
        return matches, prefixes
1222

    
1223
    def latest_uuid(self, uuid, cluster):
1224
        """Return the latest version of the given uuid and cluster.
1225

1226
        Return a (path, serial) tuple.
1227
        If cluster is None, all clusters are considered.
1228

1229
        """
1230

    
1231
        v = self.versions.alias('v')
1232
        n = self.nodes.alias('n')
1233
        s = select([n.c.path, v.c.serial])
1234
        filtered = select([func.max(self.versions.c.serial)])
1235
        filtered = filtered.where(self.versions.c.uuid == uuid)
1236
        if cluster is not None:
1237
            filtered = filtered.where(self.versions.c.cluster == cluster)
1238
        s = s.where(v.c.serial == filtered)
1239
        s = s.where(n.c.node == v.c.node)
1240

    
1241
        r = self.conn.execute(s)
1242
        l = r.fetchone()
1243
        r.close()
1244
        return l
1245

    
1246
    def domain_object_list(self, domain, paths, cluster=None):
1247
        """Return a list of (path, property list, attribute dictionary)
1248
           for the objects in the specific domain and cluster.
1249
        """
1250

    
1251
        v = self.versions.alias('v')
1252
        n = self.nodes.alias('n')
1253
        a = self.attributes.alias('a')
1254

    
1255
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1256
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1257
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1258
        if cluster:
1259
            s = s.where(v.c.cluster == cluster)
1260
        s = s.where(v.c.serial == a.c.serial)
1261
        s = s.where(a.c.domain == domain)
1262
        s = s.where(a.c.node == n.c.node)
1263
        s = s.where(a.c.is_latest == true())
1264
        if paths:
1265
            s = s.where(n.c.path.in_(paths))
1266

    
1267
        r = self.conn.execute(s)
1268
        rows = r.fetchall()
1269
        r.close()
1270

    
1271
        group_by = itemgetter(slice(12))
1272
        rows.sort(key=group_by)
1273
        groups = groupby(rows, group_by)
1274
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1275
                (k, data) in groups]
1276

    
1277
    def get_props(self, paths):
1278
        inner_join = \
1279
            self.nodes.join(
1280
                self.versions,
1281
                onclause=self.versions.c.serial == self.nodes.c.latest_version)
1282
        cc = self.nodes.c.path.in_(paths)
1283
        s = select([self.nodes.c.path, self.versions.c.type],
1284
                   from_obj=[inner_join]).where(cc).distinct()
1285
        r = self.conn.execute(s)
1286
        rows = r.fetchall()
1287
        r.close()
1288
        if rows:
1289
            return rows
1290
        return None