Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ dc7159be

History | View | Annotate | Download (48.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.exc import NoSuchTableError
43

    
44
from dbworker import DBWorker, ESCAPE_CHAR
45

    
46
from pithos.backends.filter import parse_filters
47

    
48

    
49
ROOTNODE = 0
50

    
51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER) = range(11)
53

    
54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55

    
56
inf = float('inf')
57

    
58

    
59
def strnextling(prefix):
60
    """Return the first unicode string
61
       greater than but not starting with given prefix.
62
       strnextling('hello') -> 'hellp'
63
    """
64
    if not prefix:
65
        ## all strings start with the null string,
66
        ## therefore we have to approximate strnextling('')
67
        ## with the last unicode character supported by python
68
        ## 0x10ffff for wide (32-bit unicode) python builds
69
        ## 0x00ffff for narrow (16-bit unicode) python builds
70
        ## We will not autodetect. 0xffff is safe enough.
71
        return unichr(0xffff)
72
    s = prefix[:-1]
73
    c = ord(prefix[-1])
74
    if c >= 0xffff:
75
        raise RuntimeError
76
    s += unichr(c + 1)
77
    return s
78

    
79

    
80
def strprevling(prefix):
81
    """Return an approximation of the last unicode string
82
       less than but not starting with given prefix.
83
       strprevling(u'hello') -> u'helln\\xffff'
84
    """
85
    if not prefix:
86
        ## There is no prevling for the null string
87
        return prefix
88
    s = prefix[:-1]
89
    c = ord(prefix[-1])
90
    if c > 0:
91
        s += unichr(c - 1) + unichr(0xffff)
92
    return s
93

    
94
_propnames = {
95
    'serial': 0,
96
    'node': 1,
97
    'hash': 2,
98
    'size': 3,
99
    'type': 4,
100
    'source': 5,
101
    'mtime': 6,
102
    'muser': 7,
103
    'uuid': 8,
104
    'checksum': 9,
105
    'cluster': 10,
106
}
107

    
108

    
109
def create_tables(engine):
110
    metadata = MetaData()
111

    
112
    #create nodes table
113
    columns = []
114
    columns.append(Column('node', Integer, primary_key=True))
115
    columns.append(Column('parent', Integer,
116
                          ForeignKey('nodes.node',
117
                                     ondelete='CASCADE',
118
                                     onupdate='CASCADE'),
119
                          autoincrement=False))
120
    columns.append(Column('latest_version', Integer))
121
    columns.append(Column('path', String(2048), default='', nullable=False))
122
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123
    Index('idx_nodes_path', nodes.c.path, unique=True)
124
    Index('idx_nodes_parent', nodes.c.parent)
125

    
126
    #create policy table
127
    columns = []
128
    columns.append(Column('node', Integer,
129
                          ForeignKey('nodes.node',
130
                                     ondelete='CASCADE',
131
                                     onupdate='CASCADE'),
132
                          primary_key=True))
133
    columns.append(Column('key', String(128), primary_key=True))
134
    columns.append(Column('value', String(256)))
135
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
136

    
137
    #create statistics table
138
    columns = []
139
    columns.append(Column('node', Integer,
140
                          ForeignKey('nodes.node',
141
                                     ondelete='CASCADE',
142
                                     onupdate='CASCADE'),
143
                          primary_key=True))
144
    columns.append(Column('population', Integer, nullable=False, default=0))
145
    columns.append(Column('size', BigInteger, nullable=False, default=0))
146
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
147
    columns.append(Column('cluster', Integer, nullable=False, default=0,
148
                          primary_key=True, autoincrement=False))
149
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
150

    
151
    #create versions table
152
    columns = []
153
    columns.append(Column('serial', Integer, primary_key=True))
154
    columns.append(Column('node', Integer,
155
                          ForeignKey('nodes.node',
156
                                     ondelete='CASCADE',
157
                                     onupdate='CASCADE')))
158
    columns.append(Column('hash', String(256)))
159
    columns.append(Column('size', BigInteger, nullable=False, default=0))
160
    columns.append(Column('type', String(256), nullable=False, default=''))
161
    columns.append(Column('source', Integer))
162
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
163
    columns.append(Column('muser', String(256), nullable=False, default=''))
164
    columns.append(Column('uuid', String(64), nullable=False, default=''))
165
    columns.append(Column('checksum', String(256), nullable=False, default=''))
166
    columns.append(Column('cluster', Integer, nullable=False, default=0))
167
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
168
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
169
    Index('idx_versions_node_uuid', versions.c.uuid)
170

    
171
    #create attributes table
172
    columns = []
173
    columns.append(Column('serial', Integer,
174
                          ForeignKey('versions.serial',
175
                                     ondelete='CASCADE',
176
                                     onupdate='CASCADE'),
177
                          primary_key=True))
178
    columns.append(Column('domain', String(256), primary_key=True))
179
    columns.append(Column('key', String(128), primary_key=True))
180
    columns.append(Column('value', String(256)))
181
    columns.append(Column('node', Integer, nullable=False, default=0))
182
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
183
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
184
    Index('idx_attributes_domain', attributes.c.domain)
185
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
186

    
187
    metadata.create_all(engine)
188
    return metadata.sorted_tables
189

    
190

    
191
class Node(DBWorker):
192
    """Nodes store path organization and have multiple versions.
193
       Versions store object history and have multiple attributes.
194
       Attributes store metadata.
195
    """
196

    
197
    # TODO: Provide an interface for included and excluded clusters.
198

    
199
    def __init__(self, **params):
200
        DBWorker.__init__(self, **params)
201
        try:
202
            metadata = MetaData(self.engine)
203
            self.nodes = Table('nodes', metadata, autoload=True)
204
            self.policy = Table('policy', metadata, autoload=True)
205
            self.statistics = Table('statistics', metadata, autoload=True)
206
            self.versions = Table('versions', metadata, autoload=True)
207
            self.attributes = Table('attributes', metadata, autoload=True)
208
        except NoSuchTableError:
209
            tables = create_tables(self.engine)
210
            map(lambda t: self.__setattr__(t.name, t), tables)
211

    
212
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
213
                                           self.nodes.c.parent == ROOTNODE))
214
        wrapper = self.wrapper
215
        wrapper.execute()
216
        try:
217
            rp = self.conn.execute(s)
218
            r = rp.fetchone()
219
            rp.close()
220
            if not r:
221
                s = self.nodes.insert(
222
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
223
                self.conn.execute(s)
224
        finally:
225
            wrapper.commit()
226

    
227
    def node_create(self, parent, path):
228
        """Create a new node from the given properties.
229
           Return the node identifier of the new node.
230
        """
231
        #TODO catch IntegrityError?
232
        s = self.nodes.insert().values(parent=parent, path=path)
233
        r = self.conn.execute(s)
234
        inserted_primary_key = r.inserted_primary_key[0]
235
        r.close()
236
        return inserted_primary_key
237

    
238
    def node_lookup(self, path, for_update=False):
239
        """Lookup the current node of the given path.
240
           Return None if the path is not found.
241
        """
242

    
243
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
244
        s = select([self.nodes.c.node],
245
                   self.nodes.c.path.like(self.escape_like(path),
246
                                          escape=ESCAPE_CHAR),
247
                   for_update=for_update)
248
        r = self.conn.execute(s)
249
        row = r.fetchone()
250
        r.close()
251
        if row:
252
            return row[0]
253
        return None
254

    
255
    def node_lookup_bulk(self, paths):
256
        """Lookup the current nodes for the given paths.
257
           Return () if the path is not found.
258
        """
259

    
260
        if not paths:
261
            return ()
262
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
263
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
264
        r = self.conn.execute(s)
265
        rows = r.fetchall()
266
        r.close()
267
        return [row[0] for row in rows]
268

    
269
    def node_get_properties(self, node):
270
        """Return the node's (parent, path).
271
           Return None if the node is not found.
272
        """
273

    
274
        s = select([self.nodes.c.parent, self.nodes.c.path])
275
        s = s.where(self.nodes.c.node == node)
276
        r = self.conn.execute(s)
277
        l = r.fetchone()
278
        r.close()
279
        return l
280

    
281
    def node_get_versions(self, node, keys=(), propnames=_propnames):
282
        """Return the properties of all versions at node.
283
           If keys is empty, return all properties in the order
284
           (serial, node, hash, size, type, source, mtime, muser, uuid,
285
            checksum, cluster).
286
        """
287

    
288
        s = select([self.versions.c.serial,
289
                    self.versions.c.node,
290
                    self.versions.c.hash,
291
                    self.versions.c.size,
292
                    self.versions.c.type,
293
                    self.versions.c.source,
294
                    self.versions.c.mtime,
295
                    self.versions.c.muser,
296
                    self.versions.c.uuid,
297
                    self.versions.c.checksum,
298
                    self.versions.c.cluster], self.versions.c.node == node)
299
        s = s.order_by(self.versions.c.serial)
300
        r = self.conn.execute(s)
301
        rows = r.fetchall()
302
        r.close()
303
        if not rows:
304
            return rows
305

    
306
        if not keys:
307
            return rows
308

    
309
        return [[p[propnames[k]] for k in keys if k in propnames] for
310
                p in rows]
311

    
312
    def node_count_children(self, node):
313
        """Return node's child count."""
314

    
315
        s = select([func.count(self.nodes.c.node)])
316
        s = s.where(and_(self.nodes.c.parent == node,
317
                         self.nodes.c.node != ROOTNODE))
318
        r = self.conn.execute(s)
319
        row = r.fetchone()
320
        r.close()
321
        return row[0]
322

    
323
    def node_purge_children(self, parent, before=inf, cluster=0,
324
                            update_statistics_ancestors_depth=None):
325
        """Delete all versions with the specified
326
           parent and cluster, and return
327
           the hashes, the total size and the serials of versions deleted.
328
           Clears out nodes with no remaining versions.
329
        """
330
        #update statistics
331
        c1 = select([self.nodes.c.node],
332
                    self.nodes.c.parent == parent)
333
        where_clause = and_(self.versions.c.node.in_(c1),
334
                            self.versions.c.cluster == cluster)
335
        if before != inf:
336
            where_clause = and_(where_clause,
337
                                self.versions.c.mtime <= before)
338
        s = select([func.count(self.versions.c.serial),
339
                    func.sum(self.versions.c.size)])
340
        s = s.where(where_clause)
341
        r = self.conn.execute(s)
342
        row = r.fetchone()
343
        r.close()
344
        if not row:
345
            return (), 0, ()
346
        nr, size = row[0], row[1] if row[1] else 0
347
        mtime = time()
348
        self.statistics_update(parent, -nr, -size, mtime, cluster)
349
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
350
                                         update_statistics_ancestors_depth)
351

    
352
        s = select([self.versions.c.hash, self.versions.c.serial])
353
        s = s.where(where_clause)
354
        r = self.conn.execute(s)
355
        hashes = []
356
        serials = []
357
        for row in r.fetchall():
358
            hashes += [row[0]]
359
            serials += [row[1]]
360
        r.close()
361

    
362
        #delete versions
363
        s = self.versions.delete().where(where_clause)
364
        r = self.conn.execute(s)
365
        r.close()
366

    
367
        #delete nodes
368
        s = select([self.nodes.c.node],
369
                   and_(self.nodes.c.parent == parent,
370
                        select([func.count(self.versions.c.serial)],
371
                               self.versions.c.node == self.nodes.c.node).
372
                        as_scalar() == 0))
373
        rp = self.conn.execute(s)
374
        nodes = [row[0] for row in rp.fetchall()]
375
        rp.close()
376
        if nodes:
377
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
378
            self.conn.execute(s).close()
379

    
380
        return hashes, size, serials
381

    
382
    def node_purge(self, node, before=inf, cluster=0,
383
                   update_statistics_ancestors_depth=None):
384
        """Delete all versions with the specified
385
           node and cluster, and return
386
           the hashes and size of versions deleted.
387
           Clears out the node if it has no remaining versions.
388
        """
389

    
390
        #update statistics
391
        s = select([func.count(self.versions.c.serial),
392
                    func.sum(self.versions.c.size)])
393
        where_clause = and_(self.versions.c.node == node,
394
                            self.versions.c.cluster == cluster)
395
        if before != inf:
396
            where_clause = and_(where_clause,
397
                                self.versions.c.mtime <= before)
398
        s = s.where(where_clause)
399
        r = self.conn.execute(s)
400
        row = r.fetchone()
401
        nr, size = row[0], row[1]
402
        r.close()
403
        if not nr:
404
            return (), 0, ()
405
        mtime = time()
406
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
407
                                         update_statistics_ancestors_depth)
408

    
409
        s = select([self.versions.c.hash, self.versions.c.serial])
410
        s = s.where(where_clause)
411
        r = self.conn.execute(s)
412
        hashes = []
413
        serials = []
414
        for row in r.fetchall():
415
            hashes += [row[0]]
416
            serials += [row[1]]
417
        r.close()
418

    
419
        #delete versions
420
        s = self.versions.delete().where(where_clause)
421
        r = self.conn.execute(s)
422
        r.close()
423

    
424
        #delete nodes
425
        s = select([self.nodes.c.node],
426
                   and_(self.nodes.c.node == node,
427
                        select([func.count(self.versions.c.serial)],
428
                               self.versions.c.node == self.nodes.c.node).
429
                        as_scalar() == 0))
430
        rp = self.conn.execute(s)
431
        nodes = [row[0] for row in rp.fetchall()]
432
        rp.close()
433
        if nodes:
434
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
435
            self.conn.execute(s).close()
436

    
437
        return hashes, size, serials
438

    
439
    def node_remove(self, node, update_statistics_ancestors_depth=None):
440
        """Remove the node specified.
441
           Return false if the node has children or is not found.
442
        """
443

    
444
        if self.node_count_children(node):
445
            return False
446

    
447
        mtime = time()
448
        s = select([func.count(self.versions.c.serial),
449
                    func.sum(self.versions.c.size),
450
                    self.versions.c.cluster])
451
        s = s.where(self.versions.c.node == node)
452
        s = s.group_by(self.versions.c.cluster)
453
        r = self.conn.execute(s)
454
        for population, size, cluster in r.fetchall():
455
            self.statistics_update_ancestors(
456
                node, -population, -size, mtime, cluster,
457
                update_statistics_ancestors_depth)
458
        r.close()
459

    
460
        s = self.nodes.delete().where(self.nodes.c.node == node)
461
        self.conn.execute(s).close()
462
        return True
463

    
464
    def node_accounts(self, accounts=()):
465
        s = select([self.nodes.c.path, self.nodes.c.node])
466
        s = s.where(and_(self.nodes.c.node != 0,
467
                         self.nodes.c.parent == 0))
468
        if accounts:
469
            s = s.where(self.nodes.c.path.in_(accounts))
470
        r = self.conn.execute(s)
471
        rows = r.fetchall()
472
        r.close()
473
        return rows
474

    
475
    def node_account_quotas(self):
476
        s = select([self.nodes.c.path, self.policy.c.value])
477
        s = s.where(and_(self.nodes.c.node != 0,
478
                         self.nodes.c.parent == 0))
479
        s = s.where(self.nodes.c.node == self.policy.c.node)
480
        s = s.where(self.policy.c.key == 'quota')
481
        r = self.conn.execute(s)
482
        rows = r.fetchall()
483
        r.close()
484
        return dict(rows)
485

    
486
    def node_account_usage(self, account=None, cluster=0):
487
        """Return usage for a specific account.
488

489
        Keyword arguments:
490
        account -- (default None: list usage for all the accounts)
491
        cluster -- list current, history or deleted usage (default 0: normal)
492
        """
493

    
494
        n1 = self.nodes.alias('n1')
495
        n2 = self.nodes.alias('n2')
496
        n3 = self.nodes.alias('n3')
497

    
498
        s = select([n3.c.path, func.sum(self.versions.c.size)])
499
        s = s.where(n1.c.node == self.versions.c.node)
500
        s = s.where(self.versions.c.cluster == cluster)
501
        s = s.where(n1.c.parent == n2.c.node)
502
        s = s.where(n2.c.parent == n3.c.node)
503
        s = s.where(n3.c.parent == 0)
504
        s = s.where(n3.c.node != 0)
505
        if account:
506
            s = s.where(n3.c.path == account)
507
        s = s.group_by(n3.c.path)
508
        r = self.conn.execute(s)
509
        usage = r.fetchall()
510
        r.close()
511
        return dict(usage)
512

    
513
    def policy_get(self, node):
514
        s = select([self.policy.c.key, self.policy.c.value],
515
                   self.policy.c.node == node)
516
        r = self.conn.execute(s)
517
        d = dict(r.fetchall())
518
        r.close()
519
        return d
520

    
521
    def policy_set(self, node, policy):
522
        #insert or replace
523
        for k, v in policy.iteritems():
524
            s = self.policy.update().where(and_(self.policy.c.node == node,
525
                                                self.policy.c.key == k))
526
            s = s.values(value=v)
527
            rp = self.conn.execute(s)
528
            rp.close()
529
            if rp.rowcount == 0:
530
                s = self.policy.insert()
531
                values = {'node': node, 'key': k, 'value': v}
532
                r = self.conn.execute(s, values)
533
                r.close()
534

    
535
    def statistics_get(self, node, cluster=0):
536
        """Return population, total size and last mtime
537
           for all versions under node that belong to the cluster.
538
        """
539

    
540
        s = select([self.statistics.c.population,
541
                    self.statistics.c.size,
542
                    self.statistics.c.mtime])
543
        s = s.where(and_(self.statistics.c.node == node,
544
                         self.statistics.c.cluster == cluster))
545
        r = self.conn.execute(s)
546
        row = r.fetchone()
547
        r.close()
548
        return row
549

    
550
    def statistics_update(self, node, population, size, mtime, cluster=0):
551
        """Update the statistics of the given node.
552
           Statistics keep track the population, total
553
           size of objects and mtime in the node's namespace.
554
           May be zero or positive or negative numbers.
555
        """
556
        s = select([self.statistics.c.population, self.statistics.c.size],
557
                   and_(self.statistics.c.node == node,
558
                        self.statistics.c.cluster == cluster))
559
        rp = self.conn.execute(s)
560
        r = rp.fetchone()
561
        rp.close()
562
        if not r:
563
            prepopulation, presize = (0, 0)
564
        else:
565
            prepopulation, presize = r
566
        population += prepopulation
567
        population = max(population, 0)
568
        size += presize
569

    
570
        #insert or replace
571
        #TODO better upsert
572
        u = self.statistics.update().where(and_(
573
            self.statistics.c.node == node,
574
            self.statistics.c.cluster == cluster))
575
        u = u.values(population=population, size=size, mtime=mtime)
576
        rp = self.conn.execute(u)
577
        rp.close()
578
        if rp.rowcount == 0:
579
            ins = self.statistics.insert()
580
            ins = ins.values(node=node, population=population, size=size,
581
                             mtime=mtime, cluster=cluster)
582
            self.conn.execute(ins).close()
583

    
584
    def statistics_update_ancestors(self, node, population, size, mtime,
585
                                    cluster=0, recursion_depth=None):
586
        """Update the statistics of the given node's parent.
587
           Then recursively update all parents up to the root
588
           or up to the ``recursion_depth`` (if not None).
589
           Population is not recursive.
590
        """
591

    
592
        i = 0
593
        while True:
594
            if node == ROOTNODE:
595
                break
596
            if recursion_depth and recursion_depth <= i:
597
                break
598
            props = self.node_get_properties(node)
599
            if props is None:
600
                break
601
            parent, path = props
602
            self.statistics_update(parent, population, size, mtime, cluster)
603
            node = parent
604
            population = 0  # Population isn't recursive
605
            i += 1
606

    
607
    def statistics_latest(self, node, before=inf, except_cluster=0):
608
        """Return population, total size and last mtime
609
           for all latest versions under node that
610
           do not belong to the cluster.
611
        """
612

    
613
        # The node.
614
        props = self.node_get_properties(node)
615
        if props is None:
616
            return None
617
        parent, path = props
618

    
619
        # The latest version.
620
        s = select([self.versions.c.serial,
621
                    self.versions.c.node,
622
                    self.versions.c.hash,
623
                    self.versions.c.size,
624
                    self.versions.c.type,
625
                    self.versions.c.source,
626
                    self.versions.c.mtime,
627
                    self.versions.c.muser,
628
                    self.versions.c.uuid,
629
                    self.versions.c.checksum,
630
                    self.versions.c.cluster])
631
        if before != inf:
632
            filtered = select([func.max(self.versions.c.serial)],
633
                              self.versions.c.node == node)
634
            filtered = filtered.where(self.versions.c.mtime < before)
635
        else:
636
            filtered = select([self.nodes.c.latest_version],
637
                              self.nodes.c.node == node)
638
        s = s.where(and_(self.versions.c.cluster != except_cluster,
639
                         self.versions.c.serial == filtered))
640
        r = self.conn.execute(s)
641
        props = r.fetchone()
642
        r.close()
643
        if not props:
644
            return None
645
        mtime = props[MTIME]
646

    
647
        # First level, just under node (get population).
648
        v = self.versions.alias('v')
649
        s = select([func.count(v.c.serial),
650
                    func.sum(v.c.size),
651
                    func.max(v.c.mtime)])
652
        if before != inf:
653
            c1 = select([func.max(self.versions.c.serial)],
654
                        and_(self.versions.c.mtime < before,
655
                             self.versions.c.node == v.c.node))
656
        else:
657
            c1 = select([self.nodes.c.latest_version])
658
            c1 = c1.where(self.nodes.c.node == v.c.node)
659
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
660
        s = s.where(and_(v.c.serial == c1,
661
                         v.c.cluster != except_cluster,
662
                         v.c.node.in_(c2)))
663
        rp = self.conn.execute(s)
664
        r = rp.fetchone()
665
        rp.close()
666
        if not r:
667
            return None
668
        count = r[0]
669
        mtime = max(mtime, r[2])
670
        if count == 0:
671
            return (0, 0, mtime)
672

    
673
        # All children (get size and mtime).
674
        # This is why the full path is stored.
675
        if before != inf:
676
            s = select([func.count(v.c.serial),
677
                       func.sum(v.c.size),
678
                       func.max(v.c.mtime)])
679
            c1 = select([func.max(self.versions.c.serial)],
680
                        and_(self.versions.c.mtime < before,
681
                             self.versions.c.node == v.c.node))
682
        else:
683
            inner_join = \
684
                self.versions.join(self.nodes, onclause=
685
                                   self.versions.c.serial ==
686
                                   self.nodes.c.latest_version)
687
            s = select([func.count(self.versions.c.serial),
688
                       func.sum(self.versions.c.size),
689
                       func.max(self.versions.c.mtime)],
690
                       from_obj=[inner_join])
691

    
692
        c2 = select([self.nodes.c.node],
693
                    self.nodes.c.path.like(self.escape_like(path) + '%',
694
                                           escape=ESCAPE_CHAR))
695
        if before != inf:
696
            s = s.where(and_(v.c.serial == c1,
697
                        v.c.cluster != except_cluster,
698
                        v.c.node.in_(c2)))
699
        else:
700
            s = s.where(and_(self.versions.c.cluster != except_cluster,
701
                        self.versions.c.node.in_(c2)))
702

    
703
        rp = self.conn.execute(s)
704
        r = rp.fetchone()
705
        rp.close()
706
        if not r:
707
            return None
708
        size = long(r[1] - props[SIZE])
709
        mtime = max(mtime, r[2])
710
        return (count, size, mtime)
711

    
712
    def nodes_set_latest_version(self, node, serial):
713
        s = self.nodes.update().where(self.nodes.c.node == node)
714
        s = s.values(latest_version=serial)
715
        self.conn.execute(s).close()
716

    
717
    def version_create(self, node, hash, size, type, source, muser, uuid,
718
                       checksum, cluster=0,
719
                       update_statistics_ancestors_depth=None):
720
        """Create a new version from the given properties.
721
           Return the (serial, mtime) of the new version.
722
        """
723

    
724
        mtime = time()
725
        s = self.versions.insert().values(
726
            node=node, hash=hash, size=size, type=type, source=source,
727
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
728
            cluster=cluster)
729
        serial = self.conn.execute(s).inserted_primary_key[0]
730
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
731
                                         update_statistics_ancestors_depth)
732

    
733
        self.nodes_set_latest_version(node, serial)
734

    
735
        return serial, mtime
736

    
737
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
738
        """Lookup the current version of the given node.
739
           Return a list with its properties:
740
           (serial, node, hash, size, type, source, mtime,
741
            muser, uuid, checksum, cluster)
742
           or None if the current version is not found in the given cluster.
743
        """
744

    
745
        v = self.versions.alias('v')
746
        if not all_props:
747
            s = select([v.c.serial])
748
        else:
749
            s = select([v.c.serial, v.c.node, v.c.hash,
750
                        v.c.size, v.c.type, v.c.source,
751
                        v.c.mtime, v.c.muser, v.c.uuid,
752
                        v.c.checksum, v.c.cluster])
753
        if before != inf:
754
            c = select([func.max(self.versions.c.serial)],
755
                       self.versions.c.node == node)
756
            c = c.where(self.versions.c.mtime < before)
757
        else:
758
            c = select([self.nodes.c.latest_version],
759
                       self.nodes.c.node == node)
760
        s = s.where(and_(v.c.serial == c,
761
                         v.c.cluster == cluster))
762
        r = self.conn.execute(s)
763
        props = r.fetchone()
764
        r.close()
765
        if props:
766
            return props
767
        return None
768

    
769
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
770
                            all_props=True, order_by_path=False):
771
        """Lookup the current versions of the given nodes.
772
           Return a list with their properties:
773
           (serial, node, hash, size, type, source, mtime, muser, uuid,
774
            checksum, cluster).
775
        """
776
        if not nodes:
777
            return ()
778
        v = self.versions.alias('v')
779
        n = self.nodes.alias('n')
780
        if not all_props:
781
            s = select([v.c.serial])
782
        else:
783
            s = select([v.c.serial, v.c.node, v.c.hash,
784
                        v.c.size, v.c.type, v.c.source,
785
                        v.c.mtime, v.c.muser, v.c.uuid,
786
                        v.c.checksum, v.c.cluster])
787
        if before != inf:
788
            c = select([func.max(self.versions.c.serial)],
789
                       self.versions.c.node.in_(nodes))
790
            c = c.where(self.versions.c.mtime < before)
791
            c = c.group_by(self.versions.c.node)
792
        else:
793
            c = select([self.nodes.c.latest_version],
794
                       self.nodes.c.node.in_(nodes))
795
        s = s.where(and_(v.c.serial.in_(c),
796
                         v.c.cluster == cluster))
797
        if order_by_path:
798
            s = s.where(v.c.node == n.c.node)
799
            s = s.order_by(n.c.path)
800
        else:
801
            s = s.order_by(v.c.node)
802
        r = self.conn.execute(s)
803
        rproxy = r.fetchall()
804
        r.close()
805
        return (tuple(row.values()) for row in rproxy)
806

    
807
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
808
                               node=None):
809
        """Return a sequence of values for the properties of
810
           the version specified by serial and the keys, in the order given.
811
           If keys is empty, return all properties in the order
812
           (serial, node, hash, size, type, source, mtime, muser, uuid,
813
            checksum, cluster).
814
        """
815

    
816
        v = self.versions.alias()
817
        s = select([v.c.serial, v.c.node, v.c.hash,
818
                    v.c.size, v.c.type, v.c.source,
819
                    v.c.mtime, v.c.muser, v.c.uuid,
820
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
821
        if node is not None:
822
            s = s.where(v.c.node == node)
823
        rp = self.conn.execute(s)
824
        r = rp.fetchone()
825
        rp.close()
826
        if r is None:
827
            return r
828

    
829
        if not keys:
830
            return r
831
        return [r[propnames[k]] for k in keys if k in propnames]
832

    
833
    def version_put_property(self, serial, key, value):
834
        """Set value for the property of version specified by key."""
835

    
836
        if key not in _propnames:
837
            return
838
        s = self.versions.update()
839
        s = s.where(self.versions.c.serial == serial)
840
        s = s.values(**{key: value})
841
        self.conn.execute(s).close()
842

    
843
    def version_recluster(self, serial, cluster,
844
                          update_statistics_ancestors_depth=None):
845
        """Move the version into another cluster."""
846

    
847
        props = self.version_get_properties(serial)
848
        if not props:
849
            return
850
        node = props[NODE]
851
        size = props[SIZE]
852
        oldcluster = props[CLUSTER]
853
        if cluster == oldcluster:
854
            return
855

    
856
        mtime = time()
857
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
858
                                         update_statistics_ancestors_depth)
859
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
860
                                         update_statistics_ancestors_depth)
861

    
862
        s = self.versions.update()
863
        s = s.where(self.versions.c.serial == serial)
864
        s = s.values(cluster=cluster)
865
        self.conn.execute(s).close()
866

    
867
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
868
        """Remove the serial specified."""
869

    
870
        props = self.version_get_properties(serial)
871
        if not props:
872
            return
873
        node = props[NODE]
874
        hash = props[HASH]
875
        size = props[SIZE]
876
        cluster = props[CLUSTER]
877

    
878
        mtime = time()
879
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
880
                                         update_statistics_ancestors_depth)
881

    
882
        s = self.versions.delete().where(self.versions.c.serial == serial)
883
        self.conn.execute(s).close()
884

    
885
        props = self.version_lookup(node, cluster=cluster, all_props=False)
886
        if props:
887
            self.nodes_set_latest_version(node, serial)
888

    
889
        return hash, size
890

    
891
    def attribute_get(self, serial, domain, keys=()):
892
        """Return a list of (key, value) pairs of the specific version.
893

894
        If keys is empty, return all attributes.
895
        Othwerise, return only those specified.
896
        """
897

    
898
        if keys:
899
            attrs = self.attributes.alias()
900
            s = select([attrs.c.key, attrs.c.value])
901
            s = s.where(and_(attrs.c.key.in_(keys),
902
                             attrs.c.serial == serial,
903
                             attrs.c.domain == domain))
904
        else:
905
            attrs = self.attributes.alias()
906
            s = select([attrs.c.key, attrs.c.value])
907
            s = s.where(and_(attrs.c.serial == serial,
908
                             attrs.c.domain == domain))
909
        r = self.conn.execute(s)
910
        l = r.fetchall()
911
        r.close()
912
        return l
913

    
914
    def attribute_set(self, serial, domain, node, items, is_latest=True):
915
        """Set the attributes of the version specified by serial.
916
           Receive attributes as an iterable of (key, value) pairs.
917
        """
918
        #insert or replace
919
        #TODO better upsert
920
        for k, v in items:
921
            s = self.attributes.update()
922
            s = s.where(and_(self.attributes.c.serial == serial,
923
                             self.attributes.c.domain == domain,
924
                             self.attributes.c.key == k))
925
            s = s.values(value=v)
926
            rp = self.conn.execute(s)
927
            rp.close()
928
            if rp.rowcount == 0:
929
                s = self.attributes.insert()
930
                s = s.values(serial=serial, domain=domain, node=node,
931
                             is_latest=is_latest, key=k, value=v)
932
                self.conn.execute(s).close()
933

    
934
    def attribute_del(self, serial, domain, keys=()):
935
        """Delete attributes of the version specified by serial.
936
           If keys is empty, delete all attributes.
937
           Otherwise delete those specified.
938
        """
939

    
940
        if keys:
941
            #TODO more efficient way to do this?
942
            for key in keys:
943
                s = self.attributes.delete()
944
                s = s.where(and_(self.attributes.c.serial == serial,
945
                                 self.attributes.c.domain == domain,
946
                                 self.attributes.c.key == key))
947
                self.conn.execute(s).close()
948
        else:
949
            s = self.attributes.delete()
950
            s = s.where(and_(self.attributes.c.serial == serial,
951
                             self.attributes.c.domain == domain))
952
            self.conn.execute(s).close()
953

    
954
    def attribute_copy(self, source, dest):
955
        s = select(
956
            [dest, self.attributes.c.domain, self.attributes.c.node,
957
             self.attributes.c.key, self.attributes.c.value],
958
            self.attributes.c.serial == source)
959
        rp = self.conn.execute(s)
960
        attributes = rp.fetchall()
961
        rp.close()
962
        for dest, domain, node, k, v in attributes:
963
            select_src_node = select(
964
                [self.versions.c.node],
965
                self.versions.c.serial == dest)
966
            # insert or replace
967
            s = self.attributes.update().where(and_(
968
                self.attributes.c.serial == dest,
969
                self.attributes.c.domain == domain,
970
                self.attributes.c.key == k))
971
            s = s.values(node=select_src_node, value=v)
972
            rp = self.conn.execute(s)
973
            rp.close()
974
            if rp.rowcount == 0:
975
                s = self.attributes.insert()
976
                s = s.values(serial=dest, domain=domain, node=select_src_node,
977
                             is_latest=True, key=k, value=v)
978
            self.conn.execute(s).close()
979

    
980
    def attribute_unset_is_latest(self, node, exclude):
981
        u = self.attributes.update().where(and_(
982
            self.attributes.c.node == node,
983
            self.attributes.c.serial != exclude)).values({'is_latest': False})
984
        self.conn.execute(u)
985

    
986
    def latest_attribute_keys(self, parent, domain, before=inf,
987
                              except_cluster=0, pathq=None):
988
        """Return a list with all keys pairs defined
989
           for all latest versions under parent that
990
           do not belong to the cluster.
991
        """
992

    
993
        pathq = pathq or []
994

    
995
        # TODO: Use another table to store before=inf results.
996
        v = self.versions.alias('v')
997
        s = select([self.attributes.c.key]).distinct()
998
        if before != inf:
999
            filtered = select([func.max(v.c.serial)],
1000
                              and_(v.c.mtime < before,
1001
                                   v.c.node == self.versions.c.node))
1002
        else:
1003
            filtered = select([self.nodes.c.latest_version])
1004
            filtered = filtered.where(self.nodes.c.node ==
1005
                                      self.versions.c.node
1006
                                      ).correlate(self.versions)
1007
        s = s.where(self.versions.c.serial == filtered)
1008
        s = s.where(self.versions.c.cluster != except_cluster)
1009
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1010
                                             self.nodes.c.parent == parent)))
1011
        s = s.where(self.attributes.c.serial == self.versions.c.serial)
1012
        s = s.where(self.attributes.c.domain == domain)
1013
        s = s.where(self.nodes.c.node == self.versions.c.node)
1014
        s = s.order_by(self.attributes.c.key)
1015
        conja = []
1016
        conjb = []
1017
        for path, match in pathq:
1018
            if match == MATCH_PREFIX:
1019
                conja.append(self.nodes.c.path.like(self.escape_like(path) +
1020
                                                    '%', escape=ESCAPE_CHAR))
1021
            elif match == MATCH_EXACT:
1022
                conjb.append(path)
1023
        if conja or conjb:
1024
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1025
        rp = self.conn.execute(s)
1026
        rows = rp.fetchall()
1027
        rp.close()
1028
        return [r[0] for r in rows]
1029

    
1030
    def latest_version_list(self, parent, prefix='', delimiter=None,
1031
                            start='', limit=10000, before=inf,
1032
                            except_cluster=0, pathq=[], domain=None,
1033
                            filterq=[], sizeq=None, all_props=False):
1034
        """Return a (list of (path, serial) tuples, list of common prefixes)
1035
           for the current versions of the paths with the given parent,
1036
           matching the following criteria.
1037

1038
           The property tuple for a version is returned if all
1039
           of these conditions are true:
1040

1041
                a. parent matches
1042

1043
                b. path > start
1044

1045
                c. path starts with prefix (and paths in pathq)
1046

1047
                d. version is the max up to before
1048

1049
                e. version is not in cluster
1050

1051
                f. the path does not have the delimiter occuring
1052
                   after the prefix, or ends with the delimiter
1053

1054
                g. serial matches the attribute filter query.
1055

1056
                   A filter query is a comma-separated list of
1057
                   terms in one of these three forms:
1058

1059
                   key
1060
                       an attribute with this key must exist
1061

1062
                   !key
1063
                       an attribute with this key must not exist
1064

1065
                   key ?op value
1066
                       the attribute with this key satisfies the value
1067
                       where ?op is one of ==, != <=, >=, <, >.
1068

1069
                h. the size is in the range set by sizeq
1070

1071
           The list of common prefixes includes the prefixes
1072
           matching up to the first delimiter after prefix,
1073
           and are reported only once, as "virtual directories".
1074
           The delimiter is included in the prefixes.
1075

1076
           If arguments are None, then the corresponding matching rule
1077
           will always match.
1078

1079
           Limit applies to the first list of tuples returned.
1080

1081
           If all_props is True, return all properties after path,
1082
           not just serial.
1083
        """
1084

    
1085
        if not start or start < prefix:
1086
            start = strprevling(prefix)
1087
        nextling = strnextling(prefix)
1088

    
1089
        v = self.versions.alias('v')
1090
        n = self.nodes.alias('n')
1091
        if before != inf:
1092
            filtered = select([func.max(v.c.serial)],
1093
                              and_(v.c.mtime < before,
1094
                                   v.c.node == self.versions.c.node))
1095
            inner_join = \
1096
                self.nodes.join(self.versions,
1097
                                onclause=self.versions.c.serial == filtered)
1098
        else:
1099
            filtered = select([self.nodes.c.latest_version])
1100
            filtered = filtered.where(self.nodes.c.node ==
1101
                                      self.versions.c.node
1102
                                      ).correlate(self.versions)
1103
            inner_join = \
1104
                self.nodes.join(self.versions,
1105
                                onclause=
1106
                                self.versions.c.serial == filtered)
1107
        if not all_props:
1108
            s = select([self.nodes.c.path,
1109
                       self.versions.c.serial],
1110
                       from_obj=[inner_join]).distinct()
1111
        else:
1112
            s = select([self.nodes.c.path,
1113
                       self.versions.c.serial, self.versions.c.node,
1114
                       self.versions.c.hash,
1115
                       self.versions.c.size, self.versions.c.type,
1116
                       self.versions.c.source,
1117
                       self.versions.c.mtime, self.versions.c.muser,
1118
                       self.versions.c.uuid,
1119
                       self.versions.c.checksum,
1120
                       self.versions.c.cluster],
1121
                       from_obj=[inner_join]).distinct()
1122

    
1123
        s = s.where(self.versions.c.cluster != except_cluster)
1124
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1125
                                             self.nodes.c.parent == parent)))
1126

    
1127
        s = s.where(self.versions.c.node == self.nodes.c.node)
1128
        s = s.where(and_(self.nodes.c.path > bindparam('start'),
1129
                    self.nodes.c.path < nextling))
1130
        conja = []
1131
        conjb = []
1132
        for path, match in pathq:
1133
            if match == MATCH_PREFIX:
1134
                conja.append(self.nodes.c.path.like(self.escape_like(path) +
1135
                             '%', escape=ESCAPE_CHAR))
1136
            elif match == MATCH_EXACT:
1137
                conjb.append(path)
1138
        if conja or conjb:
1139
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1140

    
1141
        if sizeq and len(sizeq) == 2:
1142
            if sizeq[0]:
1143
                s = s.where(self.versions.c.size >= sizeq[0])
1144
            if sizeq[1]:
1145
                s = s.where(self.versions.c.size < sizeq[1])
1146

    
1147
        if domain and filterq:
1148
            a = self.attributes.alias('a')
1149
            included, excluded, opers = parse_filters(filterq)
1150
            if included:
1151
                subs = select([1])
1152
                subs = subs.where(self.attributes.c.serial ==
1153
                                  self.versions.c.serial
1154
                                  ).correlate(self.versions)
1155
                subs = subs.where(self.attributes.c.domain == domain)
1156
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x)
1157
                                  for x in included]))
1158
                s = s.where(exists(subs))
1159
            if excluded:
1160
                subs = select([1])
1161
                subs = subs.where(self.attributes.c.serial ==
1162
                                  self.versions.c.serial
1163
                                  ).correlate(self.versions)
1164
                subs = subs.where(self.attributes.c.domain == domain)
1165
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x)
1166
                                  for x in excluded]))
1167
                s = s.where(not_(exists(subs)))
1168
            if opers:
1169
                for k, o, val in opers:
1170
                    subs = select([1])
1171
                    subs = subs.where(self.attributes.c.serial ==
1172
                                      self.versions.c.serial
1173
                                      ).correlate(self.versions)
1174
                    subs = subs.where(self.attributes.c.domain == domain)
1175
                    subs = subs.where(
1176
                        and_(self.attributes.c.key.op('=')(k),
1177
                             self.attributes.c.value.op(o)(val)))
1178
                    s = s.where(exists(subs))
1179

    
1180
        s = s.order_by(self.nodes.c.path)
1181

    
1182
        if not delimiter:
1183
            s = s.limit(limit)
1184
            rp = self.conn.execute(s, start=start)
1185
            r = rp.fetchall()
1186
            rp.close()
1187
            return r, ()
1188

    
1189
        pfz = len(prefix)
1190
        dz = len(delimiter)
1191
        count = 0
1192
        prefixes = []
1193
        pappend = prefixes.append
1194
        matches = []
1195
        mappend = matches.append
1196

    
1197
        rp = self.conn.execute(s, start=start)
1198
        while True:
1199
            props = rp.fetchone()
1200
            if props is None:
1201
                break
1202
            path = props[0]
1203
            idx = path.find(delimiter, pfz)
1204

    
1205
            if idx < 0:
1206
                mappend(props)
1207
                count += 1
1208
                if count >= limit:
1209
                    break
1210
                continue
1211

    
1212
            if idx + dz == len(path):
1213
                mappend(props)
1214
                count += 1
1215
                continue  # Get one more, in case there is a path.
1216
            pf = path[:idx + dz]
1217
            pappend(pf)
1218
            if count >= limit:
1219
                break
1220

    
1221
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1222
        rp.close()
1223

    
1224
        return matches, prefixes
1225

    
1226
    def latest_uuid(self, uuid, cluster):
1227
        """Return the latest version of the given uuid and cluster.
1228

1229
        Return a (path, serial) tuple.
1230
        If cluster is None, all clusters are considered.
1231

1232
        """
1233

    
1234
        v = self.versions.alias('v')
1235
        n = self.nodes.alias('n')
1236
        s = select([n.c.path, v.c.serial])
1237
        filtered = select([func.max(self.versions.c.serial)])
1238
        filtered = filtered.where(self.versions.c.uuid == uuid)
1239
        if cluster is not None:
1240
            filtered = filtered.where(self.versions.c.cluster == cluster)
1241
        s = s.where(v.c.serial == filtered)
1242
        s = s.where(n.c.node == v.c.node)
1243

    
1244
        r = self.conn.execute(s)
1245
        l = r.fetchone()
1246
        r.close()
1247
        return l
1248

    
1249
    def domain_object_list(self, domain, paths, cluster=None):
1250
        """Return a list of (path, property list, attribute dictionary)
1251
           for the objects in the specific domain and cluster.
1252
        """
1253

    
1254
        v = self.versions.alias('v')
1255
        n = self.nodes.alias('n')
1256
        a = self.attributes.alias('a')
1257

    
1258
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1259
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1260
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1261
        if cluster:
1262
            s = s.where(v.c.cluster == cluster)
1263
        s = s.where(v.c.serial == a.c.serial)
1264
        s = s.where(a.c.domain == domain)
1265
        s = s.where(a.c.node == n.c.node)
1266
        s = s.where(a.c.is_latest == True)
1267
        if paths:
1268
            s = s.where(n.c.path.in_(paths))
1269

    
1270
        r = self.conn.execute(s)
1271
        rows = r.fetchall()
1272
        r.close()
1273

    
1274
        group_by = itemgetter(slice(12))
1275
        rows.sort(key=group_by)
1276
        groups = groupby(rows, group_by)
1277
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1278
                (k, data) in groups]
1279

    
1280
    def get_props(self, paths):
1281
        inner_join = \
1282
            self.nodes.join(self.versions,
1283
                            onclause=self.versions.c.serial ==
1284
                            self.nodes.c.latest_version)
1285
        cc = self.nodes.c.path.in_(paths)
1286
        s = select([self.nodes.c.path, self.versions.c.type],
1287
                   from_obj=[inner_join]).where(cc).distinct()
1288
        r = self.conn.execute(s)
1289
        rows = r.fetchall()
1290
        r.close()
1291
        if rows:
1292
            return rows
1293
        return None