Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 876d7486

History | View | Annotate | Download (49.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.exc import NoSuchTableError
43

    
44
from dbworker import DBWorker, ESCAPE_CHAR
45

    
46
from pithos.backends.filter import parse_filters
47

    
48

    
49
ROOTNODE = 0
50

    
51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER, AVAILABLE, MAP_CHECK_TIMESTAMP) = range(13)
53

    
54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55

    
56
inf = float('inf')
57

    
58

    
59
def strnextling(prefix):
60
    """Return the first unicode string
61
       greater than but not starting with given prefix.
62
       strnextling('hello') -> 'hellp'
63
    """
64
    if not prefix:
65
        ## all strings start with the null string,
66
        ## therefore we have to approximate strnextling('')
67
        ## with the last unicode character supported by python
68
        ## 0x10ffff for wide (32-bit unicode) python builds
69
        ## 0x00ffff for narrow (16-bit unicode) python builds
70
        ## We will not autodetect. 0xffff is safe enough.
71
        return unichr(0xffff)
72
    s = prefix[:-1]
73
    c = ord(prefix[-1])
74
    if c >= 0xffff:
75
        raise RuntimeError
76
    s += unichr(c + 1)
77
    return s
78

    
79

    
80
def strprevling(prefix):
81
    """Return an approximation of the last unicode string
82
       less than but not starting with given prefix.
83
       strprevling(u'hello') -> u'helln\\xffff'
84
    """
85
    if not prefix:
86
        ## There is no prevling for the null string
87
        return prefix
88
    s = prefix[:-1]
89
    c = ord(prefix[-1])
90
    if c > 0:
91
        s += unichr(c - 1) + unichr(0xffff)
92
    return s
93

    
94
_propnames = {
95
    'serial': 0,
96
    'node': 1,
97
    'hash': 2,
98
    'size': 3,
99
    'type': 4,
100
    'source': 5,
101
    'mtime': 6,
102
    'muser': 7,
103
    'uuid': 8,
104
    'checksum': 9,
105
    'cluster': 10,
106
    'available':11,
107
    'map_check_timestamp':12
108
}
109

    
110

    
111
def create_tables(engine):
112
    metadata = MetaData()
113

    
114
    #create nodes table
115
    columns = []
116
    columns.append(Column('node', Integer, primary_key=True))
117
    columns.append(Column('parent', Integer,
118
                          ForeignKey('nodes.node',
119
                                     ondelete='CASCADE',
120
                                     onupdate='CASCADE'),
121
                          autoincrement=False))
122
    columns.append(Column('latest_version', Integer))
123
    columns.append(Column('path', String(2048), default='', nullable=False))
124
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
125
    Index('idx_nodes_path', nodes.c.path, unique=True)
126
    Index('idx_nodes_parent', nodes.c.parent)
127

    
128
    #create policy table
129
    columns = []
130
    columns.append(Column('node', Integer,
131
                          ForeignKey('nodes.node',
132
                                     ondelete='CASCADE',
133
                                     onupdate='CASCADE'),
134
                          primary_key=True))
135
    columns.append(Column('key', String(128), primary_key=True))
136
    columns.append(Column('value', String(256)))
137
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
138

    
139
    #create statistics table
140
    columns = []
141
    columns.append(Column('node', Integer,
142
                          ForeignKey('nodes.node',
143
                                     ondelete='CASCADE',
144
                                     onupdate='CASCADE'),
145
                          primary_key=True))
146
    columns.append(Column('population', Integer, nullable=False, default=0))
147
    columns.append(Column('size', BigInteger, nullable=False, default=0))
148
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
149
    columns.append(Column('cluster', Integer, nullable=False, default=0,
150
                          primary_key=True, autoincrement=False))
151
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
152

    
153
    #create versions table
154
    columns = []
155
    columns.append(Column('serial', Integer, primary_key=True))
156
    columns.append(Column('node', Integer,
157
                          ForeignKey('nodes.node',
158
                                     ondelete='CASCADE',
159
                                     onupdate='CASCADE')))
160
    columns.append(Column('hash', String(256)))
161
    columns.append(Column('size', BigInteger, nullable=False, default=0))
162
    columns.append(Column('type', String(256), nullable=False, default=''))
163
    columns.append(Column('source', Integer))
164
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
165
    columns.append(Column('muser', String(256), nullable=False, default=''))
166
    columns.append(Column('uuid', String(64), nullable=False, default=''))
167
    columns.append(Column('checksum', String(256), nullable=False, default=''))
168
    columns.append(Column('cluster', Integer, nullable=False, default=0))
169
    columns.append(Column('available', Boolean, nullable=False, default=True))
170
    columns.append(Column('map_check_timestamp', DECIMAL(precision=16,
171
                                                         scale=6)))
172
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
173
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
174
    Index('idx_versions_node_uuid', versions.c.uuid)
175

    
176
    #create attributes table
177
    columns = []
178
    columns.append(Column('serial', Integer,
179
                          ForeignKey('versions.serial',
180
                                     ondelete='CASCADE',
181
                                     onupdate='CASCADE'),
182
                          primary_key=True))
183
    columns.append(Column('domain', String(256), primary_key=True))
184
    columns.append(Column('key', String(128), primary_key=True))
185
    columns.append(Column('value', String(256)))
186
    columns.append(Column('node', Integer, nullable=False, default=0))
187
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
188
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
189
    Index('idx_attributes_domain', attributes.c.domain)
190
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
191

    
192
    metadata.create_all(engine)
193
    return metadata.sorted_tables
194

    
195

    
196
class Node(DBWorker):
197
    """Nodes store path organization and have multiple versions.
198
       Versions store object history and have multiple attributes.
199
       Attributes store metadata.
200
    """
201

    
202
    # TODO: Provide an interface for included and excluded clusters.
203

    
204
    def __init__(self, **params):
205
        DBWorker.__init__(self, **params)
206
        try:
207
            metadata = MetaData(self.engine)
208
            self.nodes = Table('nodes', metadata, autoload=True)
209
            self.policy = Table('policy', metadata, autoload=True)
210
            self.statistics = Table('statistics', metadata, autoload=True)
211
            self.versions = Table('versions', metadata, autoload=True)
212
            self.attributes = Table('attributes', metadata, autoload=True)
213
        except NoSuchTableError:
214
            tables = create_tables(self.engine)
215
            map(lambda t: self.__setattr__(t.name, t), tables)
216

    
217
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
218
                                           self.nodes.c.parent == ROOTNODE))
219
        wrapper = self.wrapper
220
        wrapper.execute()
221
        try:
222
            rp = self.conn.execute(s)
223
            r = rp.fetchone()
224
            rp.close()
225
            if not r:
226
                s = self.nodes.insert(
227
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
228
                self.conn.execute(s)
229
        finally:
230
            wrapper.commit()
231

    
232
    def node_create(self, parent, path):
233
        """Create a new node from the given properties.
234
           Return the node identifier of the new node.
235
        """
236
        #TODO catch IntegrityError?
237
        s = self.nodes.insert().values(parent=parent, path=path)
238
        r = self.conn.execute(s)
239
        inserted_primary_key = r.inserted_primary_key[0]
240
        r.close()
241
        return inserted_primary_key
242

    
243
    def node_lookup(self, path, for_update=False):
244
        """Lookup the current node of the given path.
245
           Return None if the path is not found.
246
        """
247

    
248
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
249
        s = select([self.nodes.c.node],
250
                   self.nodes.c.path.like(self.escape_like(path),
251
                                          escape=ESCAPE_CHAR),
252
                   for_update=for_update)
253
        r = self.conn.execute(s)
254
        row = r.fetchone()
255
        r.close()
256
        if row:
257
            return row[0]
258
        return None
259

    
260
    def node_lookup_bulk(self, paths):
261
        """Lookup the current nodes for the given paths.
262
           Return () if the path is not found.
263
        """
264

    
265
        if not paths:
266
            return ()
267
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
268
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
269
        r = self.conn.execute(s)
270
        rows = r.fetchall()
271
        r.close()
272
        return [row[0] for row in rows]
273

    
274
    def node_get_properties(self, node):
275
        """Return the node's (parent, path).
276
           Return None if the node is not found.
277
        """
278

    
279
        s = select([self.nodes.c.parent, self.nodes.c.path])
280
        s = s.where(self.nodes.c.node == node)
281
        r = self.conn.execute(s)
282
        l = r.fetchone()
283
        r.close()
284
        return l
285

    
286
    def node_get_versions(self, node, keys=(), propnames=_propnames):
287
        """Return the properties of all versions at node.
288
           If keys is empty, return all properties in the order
289
           (serial, node, hash, size, type, source, mtime, muser, uuid,
290
            checksum, cluster, available, map_check_timestamp).
291
        """
292

    
293
        s = select([self.versions.c.serial,
294
                    self.versions.c.node,
295
                    self.versions.c.hash,
296
                    self.versions.c.size,
297
                    self.versions.c.type,
298
                    self.versions.c.source,
299
                    self.versions.c.mtime,
300
                    self.versions.c.muser,
301
                    self.versions.c.uuid,
302
                    self.versions.c.checksum,
303
                    self.versions.c.cluster,
304
                    self.versions.c.available,
305
                    self.versions.c.map_check_timestamp],
306
                   self.versions.c.node == node)
307
        s = s.order_by(self.versions.c.serial)
308
        r = self.conn.execute(s)
309
        rows = r.fetchall()
310
        r.close()
311
        if not rows:
312
            return rows
313

    
314
        if not keys:
315
            return rows
316

    
317
        return [[p[propnames[k]] for k in keys if k in propnames] for
318
                p in rows]
319

    
320
    def node_count_children(self, node):
321
        """Return node's child count."""
322

    
323
        s = select([func.count(self.nodes.c.node)])
324
        s = s.where(and_(self.nodes.c.parent == node,
325
                         self.nodes.c.node != ROOTNODE))
326
        r = self.conn.execute(s)
327
        row = r.fetchone()
328
        r.close()
329
        return row[0]
330

    
331
    def node_purge_children(self, parent, before=inf, cluster=0,
332
                            update_statistics_ancestors_depth=None):
333
        """Delete all versions with the specified
334
           parent and cluster, and return
335
           the hashes, the total size and the serials of versions deleted.
336
           Clears out nodes with no remaining versions.
337
        """
338
        #update statistics
339
        c1 = select([self.nodes.c.node],
340
                    self.nodes.c.parent == parent)
341
        where_clause = and_(self.versions.c.node.in_(c1),
342
                            self.versions.c.cluster == cluster)
343
        if before != inf:
344
            where_clause = and_(where_clause,
345
                                self.versions.c.mtime <= before)
346
        s = select([func.count(self.versions.c.serial),
347
                    func.sum(self.versions.c.size)])
348
        s = s.where(where_clause)
349
        r = self.conn.execute(s)
350
        row = r.fetchone()
351
        r.close()
352
        if not row:
353
            return (), 0, ()
354
        nr, size = row[0], row[1] if row[1] else 0
355
        mtime = time()
356
        self.statistics_update(parent, -nr, -size, mtime, cluster)
357
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
358
                                         update_statistics_ancestors_depth)
359

    
360
        s = select([self.versions.c.hash, self.versions.c.serial])
361
        s = s.where(where_clause)
362
        r = self.conn.execute(s)
363
        hashes = []
364
        serials = []
365
        for row in r.fetchall():
366
            hashes += [row[0]]
367
            serials += [row[1]]
368
        r.close()
369

    
370
        #delete versions
371
        s = self.versions.delete().where(where_clause)
372
        r = self.conn.execute(s)
373
        r.close()
374

    
375
        #delete nodes
376
        s = select([self.nodes.c.node],
377
                   and_(self.nodes.c.parent == parent,
378
                        select([func.count(self.versions.c.serial)],
379
                               self.versions.c.node == self.nodes.c.node).
380
                        as_scalar() == 0))
381
        rp = self.conn.execute(s)
382
        nodes = [row[0] for row in rp.fetchall()]
383
        rp.close()
384
        if nodes:
385
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
386
            self.conn.execute(s).close()
387

    
388
        return hashes, size, serials
389

    
390
    def node_purge(self, node, before=inf, cluster=0,
391
                   update_statistics_ancestors_depth=None):
392
        """Delete all versions with the specified
393
           node and cluster, and return
394
           the hashes and size of versions deleted.
395
           Clears out the node if it has no remaining versions.
396
        """
397

    
398
        #update statistics
399
        s = select([func.count(self.versions.c.serial),
400
                    func.sum(self.versions.c.size)])
401
        where_clause = and_(self.versions.c.node == node,
402
                            self.versions.c.cluster == cluster)
403
        if before != inf:
404
            where_clause = and_(where_clause,
405
                                self.versions.c.mtime <= before)
406
        s = s.where(where_clause)
407
        r = self.conn.execute(s)
408
        row = r.fetchone()
409
        nr, size = row[0], row[1]
410
        r.close()
411
        if not nr:
412
            return (), 0, ()
413
        mtime = time()
414
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
415
                                         update_statistics_ancestors_depth)
416

    
417
        s = select([self.versions.c.hash, self.versions.c.serial])
418
        s = s.where(where_clause)
419
        r = self.conn.execute(s)
420
        hashes = []
421
        serials = []
422
        for row in r.fetchall():
423
            hashes += [row[0]]
424
            serials += [row[1]]
425
        r.close()
426

    
427
        #delete versions
428
        s = self.versions.delete().where(where_clause)
429
        r = self.conn.execute(s)
430
        r.close()
431

    
432
        #delete nodes
433
        s = select([self.nodes.c.node],
434
                   and_(self.nodes.c.node == node,
435
                        select([func.count(self.versions.c.serial)],
436
                               self.versions.c.node == self.nodes.c.node).
437
                        as_scalar() == 0))
438
        rp = self.conn.execute(s)
439
        nodes = [row[0] for row in rp.fetchall()]
440
        rp.close()
441
        if nodes:
442
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
443
            self.conn.execute(s).close()
444

    
445
        return hashes, size, serials
446

    
447
    def node_remove(self, node, update_statistics_ancestors_depth=None):
448
        """Remove the node specified.
449
           Return false if the node has children or is not found.
450
        """
451

    
452
        if self.node_count_children(node):
453
            return False
454

    
455
        mtime = time()
456
        s = select([func.count(self.versions.c.serial),
457
                    func.sum(self.versions.c.size),
458
                    self.versions.c.cluster])
459
        s = s.where(self.versions.c.node == node)
460
        s = s.group_by(self.versions.c.cluster)
461
        r = self.conn.execute(s)
462
        for population, size, cluster in r.fetchall():
463
            self.statistics_update_ancestors(
464
                node, -population, -size, mtime, cluster,
465
                update_statistics_ancestors_depth)
466
        r.close()
467

    
468
        s = self.nodes.delete().where(self.nodes.c.node == node)
469
        self.conn.execute(s).close()
470
        return True
471

    
472
    def node_accounts(self, accounts=()):
473
        s = select([self.nodes.c.path, self.nodes.c.node])
474
        s = s.where(and_(self.nodes.c.node != 0,
475
                         self.nodes.c.parent == 0))
476
        if accounts:
477
            s = s.where(self.nodes.c.path.in_(accounts))
478
        r = self.conn.execute(s)
479
        rows = r.fetchall()
480
        r.close()
481
        return rows
482

    
483
    def node_account_quotas(self):
484
        s = select([self.nodes.c.path, self.policy.c.value])
485
        s = s.where(and_(self.nodes.c.node != 0,
486
                         self.nodes.c.parent == 0))
487
        s = s.where(self.nodes.c.node == self.policy.c.node)
488
        s = s.where(self.policy.c.key == 'quota')
489
        r = self.conn.execute(s)
490
        rows = r.fetchall()
491
        r.close()
492
        return dict(rows)
493

    
494
    def node_account_usage(self, account=None, cluster=0):
495
        """Return usage for a specific account.
496

497
        Keyword arguments:
498
        account -- (default None: list usage for all the accounts)
499
        cluster -- list current, history or deleted usage (default 0: normal)
500
        """
501

    
502
        n1 = self.nodes.alias('n1')
503
        n2 = self.nodes.alias('n2')
504
        n3 = self.nodes.alias('n3')
505

    
506
        s = select([n3.c.path, func.sum(self.versions.c.size)])
507
        s = s.where(n1.c.node == self.versions.c.node)
508
        s = s.where(self.versions.c.cluster == cluster)
509
        s = s.where(n1.c.parent == n2.c.node)
510
        s = s.where(n2.c.parent == n3.c.node)
511
        s = s.where(n3.c.parent == 0)
512
        s = s.where(n3.c.node != 0)
513
        if account:
514
            s = s.where(n3.c.path == account)
515
        s = s.group_by(n3.c.path)
516
        r = self.conn.execute(s)
517
        usage = r.fetchall()
518
        r.close()
519
        return dict(usage)
520

    
521
    def policy_get(self, node):
522
        s = select([self.policy.c.key, self.policy.c.value],
523
                   self.policy.c.node == node)
524
        r = self.conn.execute(s)
525
        d = dict(r.fetchall())
526
        r.close()
527
        return d
528

    
529
    def policy_set(self, node, policy):
530
        #insert or replace
531
        for k, v in policy.iteritems():
532
            s = self.policy.update().where(and_(self.policy.c.node == node,
533
                                                self.policy.c.key == k))
534
            s = s.values(value=v)
535
            rp = self.conn.execute(s)
536
            rp.close()
537
            if rp.rowcount == 0:
538
                s = self.policy.insert()
539
                values = {'node': node, 'key': k, 'value': v}
540
                r = self.conn.execute(s, values)
541
                r.close()
542

    
543
    def statistics_get(self, node, cluster=0):
544
        """Return population, total size and last mtime
545
           for all versions under node that belong to the cluster.
546
        """
547

    
548
        s = select([self.statistics.c.population,
549
                    self.statistics.c.size,
550
                    self.statistics.c.mtime])
551
        s = s.where(and_(self.statistics.c.node == node,
552
                         self.statistics.c.cluster == cluster))
553
        r = self.conn.execute(s)
554
        row = r.fetchone()
555
        r.close()
556
        return row
557

    
558
    def statistics_update(self, node, population, size, mtime, cluster=0):
559
        """Update the statistics of the given node.
560
           Statistics keep track the population, total
561
           size of objects and mtime in the node's namespace.
562
           May be zero or positive or negative numbers.
563
        """
564
        s = select([self.statistics.c.population, self.statistics.c.size],
565
                   and_(self.statistics.c.node == node,
566
                        self.statistics.c.cluster == cluster))
567
        rp = self.conn.execute(s)
568
        r = rp.fetchone()
569
        rp.close()
570
        if not r:
571
            prepopulation, presize = (0, 0)
572
        else:
573
            prepopulation, presize = r
574
        population += prepopulation
575
        population = max(population, 0)
576
        size += presize
577

    
578
        #insert or replace
579
        #TODO better upsert
580
        u = self.statistics.update().where(and_(
581
            self.statistics.c.node == node,
582
            self.statistics.c.cluster == cluster))
583
        u = u.values(population=population, size=size, mtime=mtime)
584
        rp = self.conn.execute(u)
585
        rp.close()
586
        if rp.rowcount == 0:
587
            ins = self.statistics.insert()
588
            ins = ins.values(node=node, population=population, size=size,
589
                             mtime=mtime, cluster=cluster)
590
            self.conn.execute(ins).close()
591

    
592
    def statistics_update_ancestors(self, node, population, size, mtime,
593
                                    cluster=0, recursion_depth=None):
594
        """Update the statistics of the given node's parent.
595
           Then recursively update all parents up to the root
596
           or up to the ``recursion_depth`` (if not None).
597
           Population is not recursive.
598
        """
599

    
600
        i = 0
601
        while True:
602
            if node == ROOTNODE:
603
                break
604
            if recursion_depth and recursion_depth <= i:
605
                break
606
            props = self.node_get_properties(node)
607
            if props is None:
608
                break
609
            parent, path = props
610
            self.statistics_update(parent, population, size, mtime, cluster)
611
            node = parent
612
            population = 0  # Population isn't recursive
613
            i += 1
614

    
615
    def statistics_latest(self, node, before=inf, except_cluster=0):
616
        """Return population, total size and last mtime
617
           for all latest versions under node that
618
           do not belong to the cluster.
619
        """
620

    
621
        # The node.
622
        props = self.node_get_properties(node)
623
        if props is None:
624
            return None
625
        parent, path = props
626

    
627
        # The latest version.
628
        s = select([self.versions.c.serial,
629
                    self.versions.c.node,
630
                    self.versions.c.hash,
631
                    self.versions.c.size,
632
                    self.versions.c.type,
633
                    self.versions.c.source,
634
                    self.versions.c.mtime,
635
                    self.versions.c.muser,
636
                    self.versions.c.uuid,
637
                    self.versions.c.checksum,
638
                    self.versions.c.cluster])
639
        if before != inf:
640
            filtered = select([func.max(self.versions.c.serial)],
641
                              self.versions.c.node == node)
642
            filtered = filtered.where(self.versions.c.mtime < before)
643
        else:
644
            filtered = select([self.nodes.c.latest_version],
645
                              self.nodes.c.node == node)
646
        s = s.where(and_(self.versions.c.cluster != except_cluster,
647
                         self.versions.c.serial == filtered))
648
        r = self.conn.execute(s)
649
        props = r.fetchone()
650
        r.close()
651
        if not props:
652
            return None
653
        mtime = props[MTIME]
654

    
655
        # First level, just under node (get population).
656
        v = self.versions.alias('v')
657
        s = select([func.count(v.c.serial),
658
                    func.sum(v.c.size),
659
                    func.max(v.c.mtime)])
660
        if before != inf:
661
            c1 = select([func.max(self.versions.c.serial)],
662
                        and_(self.versions.c.mtime < before,
663
                             self.versions.c.node == v.c.node))
664
        else:
665
            c1 = select([self.nodes.c.latest_version])
666
            c1 = c1.where(self.nodes.c.node == v.c.node)
667
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
668
        s = s.where(and_(v.c.serial == c1,
669
                         v.c.cluster != except_cluster,
670
                         v.c.node.in_(c2)))
671
        rp = self.conn.execute(s)
672
        r = rp.fetchone()
673
        rp.close()
674
        if not r:
675
            return None
676
        count = r[0]
677
        mtime = max(mtime, r[2])
678
        if count == 0:
679
            return (0, 0, mtime)
680

    
681
        # All children (get size and mtime).
682
        # This is why the full path is stored.
683
        if before != inf:
684
            s = select([func.count(v.c.serial),
685
                       func.sum(v.c.size),
686
                       func.max(v.c.mtime)])
687
            c1 = select([func.max(self.versions.c.serial)],
688
                        and_(self.versions.c.mtime < before,
689
                             self.versions.c.node == v.c.node))
690
        else:
691
            inner_join = \
692
                self.versions.join(self.nodes, onclause=
693
                                   self.versions.c.serial ==
694
                                   self.nodes.c.latest_version)
695
            s = select([func.count(self.versions.c.serial),
696
                       func.sum(self.versions.c.size),
697
                       func.max(self.versions.c.mtime)],
698
                       from_obj=[inner_join])
699

    
700
        c2 = select([self.nodes.c.node],
701
                    self.nodes.c.path.like(self.escape_like(path) + '%',
702
                                           escape=ESCAPE_CHAR))
703
        if before != inf:
704
            s = s.where(and_(v.c.serial == c1,
705
                        v.c.cluster != except_cluster,
706
                        v.c.node.in_(c2)))
707
        else:
708
            s = s.where(and_(self.versions.c.cluster != except_cluster,
709
                        self.versions.c.node.in_(c2)))
710

    
711
        rp = self.conn.execute(s)
712
        r = rp.fetchone()
713
        rp.close()
714
        if not r:
715
            return None
716
        size = long(r[1] - props[SIZE])
717
        mtime = max(mtime, r[2])
718
        return (count, size, mtime)
719

    
720
    def nodes_set_latest_version(self, node, serial):
721
        s = self.nodes.update().where(self.nodes.c.node == node)
722
        s = s.values(latest_version=serial)
723
        self.conn.execute(s).close()
724

    
725
    def version_create(self, node, hash, size, type, source, muser, uuid,
726
                       checksum, cluster=0,
727
                       update_statistics_ancestors_depth=None,
728
                       available=True):
729
        """Create a new version from the given properties.
730
           Return the (serial, mtime) of the new version.
731
        """
732

    
733
        mtime = time()
734
        s = self.versions.insert().values(
735
            node=node, hash=hash, size=size, type=type, source=source,
736
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
737
            cluster=cluster, available=available)
738
        serial = self.conn.execute(s).inserted_primary_key[0]
739
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
740
                                         update_statistics_ancestors_depth)
741

    
742
        self.nodes_set_latest_version(node, serial)
743

    
744
        return serial, mtime
745

    
746
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
747
        """Lookup the current version of the given node.
748
           Return a list with its properties:
749
           (serial, node, hash, size, type, source, mtime,
750
            muser, uuid, checksum, cluster, available, map_check_timestamp)
751
           or None if the current version is not found in the given cluster.
752
        """
753

    
754
        v = self.versions.alias('v')
755
        if not all_props:
756
            s = select([v.c.serial])
757
        else:
758
            s = select([v.c.serial, v.c.node, v.c.hash,
759
                        v.c.size, v.c.type, v.c.source,
760
                        v.c.mtime, v.c.muser, v.c.uuid,
761
                        v.c.checksum, v.c.cluster,
762
                        v.c.available, v.c.map_check_timestamp])
763
        if before != inf:
764
            c = select([func.max(self.versions.c.serial)],
765
                       self.versions.c.node == node)
766
            c = c.where(self.versions.c.mtime < before)
767
        else:
768
            c = select([self.nodes.c.latest_version],
769
                       self.nodes.c.node == node)
770
        s = s.where(and_(v.c.serial == c,
771
                         v.c.cluster == cluster))
772
        r = self.conn.execute(s)
773
        props = r.fetchone()
774
        r.close()
775
        if props:
776
            return props
777
        return None
778

    
779
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
780
                            all_props=True, order_by_path=False):
781
        """Lookup the current versions of the given nodes.
782
           Return a list with their properties:
783
           (serial, node, hash, size, type, source, mtime, muser, uuid,
784
            checksum, cluster, available, map_check_timestamp).
785
        """
786
        if not nodes:
787
            return ()
788
        v = self.versions.alias('v')
789
        n = self.nodes.alias('n')
790
        if not all_props:
791
            s = select([v.c.serial])
792
        else:
793
            s = select([v.c.serial, v.c.node, v.c.hash,
794
                        v.c.size, v.c.type, v.c.source,
795
                        v.c.mtime, v.c.muser, v.c.uuid,
796
                        v.c.checksum, v.c.cluster,
797
                        v.c.available, v.c.map_check_timestamp])
798
        if before != inf:
799
            c = select([func.max(self.versions.c.serial)],
800
                       self.versions.c.node.in_(nodes))
801
            c = c.where(self.versions.c.mtime < before)
802
            c = c.group_by(self.versions.c.node)
803
        else:
804
            c = select([self.nodes.c.latest_version],
805
                       self.nodes.c.node.in_(nodes))
806
        s = s.where(and_(v.c.serial.in_(c),
807
                         v.c.cluster == cluster))
808
        if order_by_path:
809
            s = s.where(v.c.node == n.c.node)
810
            s = s.order_by(n.c.path)
811
        else:
812
            s = s.order_by(v.c.node)
813
        r = self.conn.execute(s)
814
        rproxy = r.fetchall()
815
        r.close()
816
        return (tuple(row.values()) for row in rproxy)
817

    
818
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
819
                               node=None):
820
        """Return a sequence of values for the properties of
821
           the version specified by serial and the keys, in the order given.
822
           If keys is empty, return all properties in the order
823
           (serial, node, hash, size, type, source, mtime, muser, uuid,
824
            checksum, cluster, available, map_check_timestamp).
825
        """
826

    
827
        v = self.versions.alias()
828
        s = select([v.c.serial, v.c.node, v.c.hash,
829
                    v.c.size, v.c.type, v.c.source,
830
                    v.c.mtime, v.c.muser, v.c.uuid,
831
                    v.c.checksum, v.c.cluster,
832
                    v.c.available, v.c.map_check_timestamp],
833
                   v.c.serial == serial)
834
        if node is not None:
835
            s = s.where(v.c.node == node)
836
        rp = self.conn.execute(s)
837
        r = rp.fetchone()
838
        rp.close()
839
        if r is None:
840
            return r
841

    
842
        if not keys:
843
            return r
844
        return [r[propnames[k]] for k in keys if k in propnames]
845

    
846
    def version_put_property(self, serial, key, value):
847
        """Set value for the property of version specified by key."""
848

    
849
        if key not in _propnames:
850
            return
851
        s = self.versions.update()
852
        s = s.where(self.versions.c.serial == serial)
853
        s = s.values(**{key: value})
854
        self.conn.execute(s).close()
855

    
856
    def version_recluster(self, serial, cluster,
857
                          update_statistics_ancestors_depth=None):
858
        """Move the version into another cluster."""
859

    
860
        props = self.version_get_properties(serial)
861
        if not props:
862
            return
863
        node = props[NODE]
864
        size = props[SIZE]
865
        oldcluster = props[CLUSTER]
866
        if cluster == oldcluster:
867
            return
868

    
869
        mtime = time()
870
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
871
                                         update_statistics_ancestors_depth)
872
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
873
                                         update_statistics_ancestors_depth)
874

    
875
        s = self.versions.update()
876
        s = s.where(self.versions.c.serial == serial)
877
        s = s.values(cluster=cluster)
878
        self.conn.execute(s).close()
879

    
880
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
881
        """Remove the serial specified."""
882

    
883
        props = self.version_get_properties(serial)
884
        if not props:
885
            return
886
        node = props[NODE]
887
        hash = props[HASH]
888
        size = props[SIZE]
889
        cluster = props[CLUSTER]
890

    
891
        mtime = time()
892
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
893
                                         update_statistics_ancestors_depth)
894

    
895
        s = self.versions.delete().where(self.versions.c.serial == serial)
896
        self.conn.execute(s).close()
897

    
898
        props = self.version_lookup(node, cluster=cluster, all_props=False)
899
        if props:
900
            self.nodes_set_latest_version(node, serial)
901

    
902
        return hash, size
903

    
904
    def attribute_get(self, serial, domain, keys=()):
905
        """Return a list of (key, value) pairs of the specific version.
906

907
        If keys is empty, return all attributes.
908
        Othwerise, return only those specified.
909
        """
910

    
911
        if keys:
912
            attrs = self.attributes.alias()
913
            s = select([attrs.c.key, attrs.c.value])
914
            s = s.where(and_(attrs.c.key.in_(keys),
915
                             attrs.c.serial == serial,
916
                             attrs.c.domain == domain))
917
        else:
918
            attrs = self.attributes.alias()
919
            s = select([attrs.c.key, attrs.c.value])
920
            s = s.where(and_(attrs.c.serial == serial,
921
                             attrs.c.domain == domain))
922
        r = self.conn.execute(s)
923
        l = r.fetchall()
924
        r.close()
925
        return l
926

    
927
    def attribute_set(self, serial, domain, node, items, is_latest=True):
928
        """Set the attributes of the version specified by serial.
929
           Receive attributes as an iterable of (key, value) pairs.
930
        """
931
        #insert or replace
932
        #TODO better upsert
933
        for k, v in items:
934
            s = self.attributes.update()
935
            s = s.where(and_(self.attributes.c.serial == serial,
936
                             self.attributes.c.domain == domain,
937
                             self.attributes.c.key == k))
938
            s = s.values(value=v)
939
            rp = self.conn.execute(s)
940
            rp.close()
941
            if rp.rowcount == 0:
942
                s = self.attributes.insert()
943
                s = s.values(serial=serial, domain=domain, node=node,
944
                             is_latest=is_latest, key=k, value=v)
945
                self.conn.execute(s).close()
946

    
947
    def attribute_del(self, serial, domain, keys=()):
948
        """Delete attributes of the version specified by serial.
949
           If keys is empty, delete all attributes.
950
           Otherwise delete those specified.
951
        """
952

    
953
        if keys:
954
            #TODO more efficient way to do this?
955
            for key in keys:
956
                s = self.attributes.delete()
957
                s = s.where(and_(self.attributes.c.serial == serial,
958
                                 self.attributes.c.domain == domain,
959
                                 self.attributes.c.key == key))
960
                self.conn.execute(s).close()
961
        else:
962
            s = self.attributes.delete()
963
            s = s.where(and_(self.attributes.c.serial == serial,
964
                             self.attributes.c.domain == domain))
965
            self.conn.execute(s).close()
966

    
967
    def attribute_copy(self, source, dest):
968
        s = select(
969
            [dest, self.attributes.c.domain, self.attributes.c.node,
970
             self.attributes.c.key, self.attributes.c.value],
971
            self.attributes.c.serial == source)
972
        rp = self.conn.execute(s)
973
        attributes = rp.fetchall()
974
        rp.close()
975
        for dest, domain, node, k, v in attributes:
976
            select_src_node = select(
977
                [self.versions.c.node],
978
                self.versions.c.serial == dest)
979
            # insert or replace
980
            s = self.attributes.update().where(and_(
981
                self.attributes.c.serial == dest,
982
                self.attributes.c.domain == domain,
983
                self.attributes.c.key == k))
984
            s = s.values(node=select_src_node, value=v)
985
            rp = self.conn.execute(s)
986
            rp.close()
987
            if rp.rowcount == 0:
988
                s = self.attributes.insert()
989
                s = s.values(serial=dest, domain=domain, node=select_src_node,
990
                             is_latest=True, key=k, value=v)
991
            self.conn.execute(s).close()
992

    
993
    def attribute_unset_is_latest(self, node, exclude):
994
        u = self.attributes.update().where(and_(
995
            self.attributes.c.node == node,
996
            self.attributes.c.serial != exclude)).values({'is_latest': False})
997
        self.conn.execute(u)
998

    
999
    def latest_attribute_keys(self, parent, domain, before=inf,
1000
                              except_cluster=0, pathq=None):
1001
        """Return a list with all keys pairs defined
1002
           for all latest versions under parent that
1003
           do not belong to the cluster.
1004
        """
1005

    
1006
        pathq = pathq or []
1007

    
1008
        # TODO: Use another table to store before=inf results.
1009
        v = self.versions.alias('v')
1010
        s = select([self.attributes.c.key]).distinct()
1011
        if before != inf:
1012
            filtered = select([func.max(v.c.serial)],
1013
                              and_(v.c.mtime < before,
1014
                                   v.c.node == self.versions.c.node))
1015
        else:
1016
            filtered = select([self.nodes.c.latest_version])
1017
            filtered = filtered.where(self.nodes.c.node ==
1018
                                      self.versions.c.node
1019
                                      ).correlate(self.versions)
1020
        s = s.where(self.versions.c.serial == filtered)
1021
        s = s.where(self.versions.c.cluster != except_cluster)
1022
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1023
                                             self.nodes.c.parent == parent)))
1024
        s = s.where(self.attributes.c.serial == self.versions.c.serial)
1025
        s = s.where(self.attributes.c.domain == domain)
1026
        s = s.where(self.nodes.c.node == self.versions.c.node)
1027
        s = s.order_by(self.attributes.c.key)
1028
        conja = []
1029
        conjb = []
1030
        for path, match in pathq:
1031
            if match == MATCH_PREFIX:
1032
                conja.append(self.nodes.c.path.like(self.escape_like(path) +
1033
                                                    '%', escape=ESCAPE_CHAR))
1034
            elif match == MATCH_EXACT:
1035
                conjb.append(path)
1036
        if conja or conjb:
1037
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1038
        rp = self.conn.execute(s)
1039
        rows = rp.fetchall()
1040
        rp.close()
1041
        return [r[0] for r in rows]
1042

    
1043
    def latest_version_list(self, parent, prefix='', delimiter=None,
1044
                            start='', limit=10000, before=inf,
1045
                            except_cluster=0, pathq=[], domain=None,
1046
                            filterq=[], sizeq=None, all_props=False):
1047
        """Return a (list of (path, serial) tuples, list of common prefixes)
1048
           for the current versions of the paths with the given parent,
1049
           matching the following criteria.
1050

1051
           The property tuple for a version is returned if all
1052
           of these conditions are true:
1053

1054
                a. parent matches
1055

1056
                b. path > start
1057

1058
                c. path starts with prefix (and paths in pathq)
1059

1060
                d. version is the max up to before
1061

1062
                e. version is not in cluster
1063

1064
                f. the path does not have the delimiter occuring
1065
                   after the prefix, or ends with the delimiter
1066

1067
                g. serial matches the attribute filter query.
1068

1069
                   A filter query is a comma-separated list of
1070
                   terms in one of these three forms:
1071

1072
                   key
1073
                       an attribute with this key must exist
1074

1075
                   !key
1076
                       an attribute with this key must not exist
1077

1078
                   key ?op value
1079
                       the attribute with this key satisfies the value
1080
                       where ?op is one of ==, != <=, >=, <, >.
1081

1082
                h. the size is in the range set by sizeq
1083

1084
           The list of common prefixes includes the prefixes
1085
           matching up to the first delimiter after prefix,
1086
           and are reported only once, as "virtual directories".
1087
           The delimiter is included in the prefixes.
1088

1089
           If arguments are None, then the corresponding matching rule
1090
           will always match.
1091

1092
           Limit applies to the first list of tuples returned.
1093

1094
           If all_props is True, return all properties after path,
1095
           not just serial.
1096
        """
1097

    
1098
        if not start or start < prefix:
1099
            start = strprevling(prefix)
1100
        nextling = strnextling(prefix)
1101

    
1102
        v = self.versions.alias('v')
1103
        n = self.nodes.alias('n')
1104
        if before != inf:
1105
            filtered = select([func.max(v.c.serial)],
1106
                              and_(v.c.mtime < before,
1107
                                   v.c.node == self.versions.c.node))
1108
            inner_join = \
1109
                self.nodes.join(self.versions,
1110
                                onclause=self.versions.c.serial == filtered)
1111
        else:
1112
            filtered = select([self.nodes.c.latest_version])
1113
            filtered = filtered.where(self.nodes.c.node ==
1114
                                      self.versions.c.node
1115
                                      ).correlate(self.versions)
1116
            inner_join = \
1117
                self.nodes.join(self.versions,
1118
                                onclause=
1119
                                self.versions.c.serial == filtered)
1120
        if not all_props:
1121
            s = select([self.nodes.c.path,
1122
                       self.versions.c.serial],
1123
                       from_obj=[inner_join]).distinct()
1124
        else:
1125
            s = select([self.nodes.c.path,
1126
                       self.versions.c.serial, self.versions.c.node,
1127
                       self.versions.c.hash,
1128
                       self.versions.c.size, self.versions.c.type,
1129
                       self.versions.c.source,
1130
                       self.versions.c.mtime, self.versions.c.muser,
1131
                       self.versions.c.uuid,
1132
                       self.versions.c.checksum,
1133
                       self.versions.c.cluster],
1134
                       from_obj=[inner_join]).distinct()
1135

    
1136
        s = s.where(self.versions.c.cluster != except_cluster)
1137
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1138
                                             self.nodes.c.parent == parent)))
1139

    
1140
        s = s.where(self.versions.c.node == self.nodes.c.node)
1141
        s = s.where(and_(self.nodes.c.path > bindparam('start'),
1142
                    self.nodes.c.path < nextling))
1143
        conja = []
1144
        conjb = []
1145
        for path, match in pathq:
1146
            if match == MATCH_PREFIX:
1147
                conja.append(self.nodes.c.path.like(self.escape_like(path) +
1148
                             '%', escape=ESCAPE_CHAR))
1149
            elif match == MATCH_EXACT:
1150
                conjb.append(path)
1151
        if conja or conjb:
1152
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1153

    
1154
        if sizeq and len(sizeq) == 2:
1155
            if sizeq[0]:
1156
                s = s.where(self.versions.c.size >= sizeq[0])
1157
            if sizeq[1]:
1158
                s = s.where(self.versions.c.size < sizeq[1])
1159

    
1160
        if domain and filterq:
1161
            a = self.attributes.alias('a')
1162
            included, excluded, opers = parse_filters(filterq)
1163
            if included:
1164
                subs = select([1])
1165
                subs = subs.where(self.attributes.c.serial ==
1166
                                  self.versions.c.serial
1167
                                  ).correlate(self.versions)
1168
                subs = subs.where(self.attributes.c.domain == domain)
1169
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x)
1170
                                  for x in included]))
1171
                s = s.where(exists(subs))
1172
            if excluded:
1173
                subs = select([1])
1174
                subs = subs.where(self.attributes.c.serial ==
1175
                                  self.versions.c.serial
1176
                                  ).correlate(self.versions)
1177
                subs = subs.where(self.attributes.c.domain == domain)
1178
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x)
1179
                                  for x in excluded]))
1180
                s = s.where(not_(exists(subs)))
1181
            if opers:
1182
                for k, o, val in opers:
1183
                    subs = select([1])
1184
                    subs = subs.where(self.attributes.c.serial ==
1185
                                      self.versions.c.serial
1186
                                      ).correlate(self.versions)
1187
                    subs = subs.where(self.attributes.c.domain == domain)
1188
                    subs = subs.where(
1189
                        and_(self.attributes.c.key.op('=')(k),
1190
                             self.attributes.c.value.op(o)(val)))
1191
                    s = s.where(exists(subs))
1192

    
1193
        s = s.order_by(self.nodes.c.path)
1194

    
1195
        if not delimiter:
1196
            s = s.limit(limit)
1197
            rp = self.conn.execute(s, start=start)
1198
            r = rp.fetchall()
1199
            rp.close()
1200
            return r, ()
1201

    
1202
        pfz = len(prefix)
1203
        dz = len(delimiter)
1204
        count = 0
1205
        prefixes = []
1206
        pappend = prefixes.append
1207
        matches = []
1208
        mappend = matches.append
1209

    
1210
        rp = self.conn.execute(s, start=start)
1211
        while True:
1212
            props = rp.fetchone()
1213
            if props is None:
1214
                break
1215
            path = props[0]
1216
            idx = path.find(delimiter, pfz)
1217

    
1218
            if idx < 0:
1219
                mappend(props)
1220
                count += 1
1221
                if count >= limit:
1222
                    break
1223
                continue
1224

    
1225
            if idx + dz == len(path):
1226
                mappend(props)
1227
                count += 1
1228
                continue  # Get one more, in case there is a path.
1229
            pf = path[:idx + dz]
1230
            pappend(pf)
1231
            if count >= limit:
1232
                break
1233

    
1234
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1235
        rp.close()
1236

    
1237
        return matches, prefixes
1238

    
1239
    def latest_uuid(self, uuid, cluster):
1240
        """Return the latest version of the given uuid and cluster.
1241

1242
        Return a (path, serial) tuple.
1243
        If cluster is None, all clusters are considered.
1244

1245
        """
1246

    
1247
        v = self.versions.alias('v')
1248
        n = self.nodes.alias('n')
1249
        s = select([n.c.path, v.c.serial])
1250
        filtered = select([func.max(self.versions.c.serial)])
1251
        filtered = filtered.where(self.versions.c.uuid == uuid)
1252
        if cluster is not None:
1253
            filtered = filtered.where(self.versions.c.cluster == cluster)
1254
        s = s.where(v.c.serial == filtered)
1255
        s = s.where(n.c.node == v.c.node)
1256

    
1257
        r = self.conn.execute(s)
1258
        l = r.fetchone()
1259
        r.close()
1260
        return l
1261

    
1262
    def domain_object_list(self, domain, paths, cluster=None):
1263
        """Return a list of (path, property list, attribute dictionary)
1264
           for the objects in the specific domain and cluster.
1265
        """
1266

    
1267
        v = self.versions.alias('v')
1268
        n = self.nodes.alias('n')
1269
        a = self.attributes.alias('a')
1270

    
1271
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1272
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1273
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1274
        if cluster:
1275
            s = s.where(v.c.cluster == cluster)
1276
        s = s.where(v.c.serial == a.c.serial)
1277
        s = s.where(a.c.domain == domain)
1278
        s = s.where(a.c.node == n.c.node)
1279
        s = s.where(a.c.is_latest == True)
1280
        if paths:
1281
            s = s.where(n.c.path.in_(paths))
1282

    
1283
        r = self.conn.execute(s)
1284
        rows = r.fetchall()
1285
        r.close()
1286

    
1287
        group_by = itemgetter(slice(12))
1288
        rows.sort(key=group_by)
1289
        groups = groupby(rows, group_by)
1290
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1291
                (k, data) in groups]
1292

    
1293
    def get_props(self, paths):
1294
        inner_join = \
1295
            self.nodes.join(self.versions,
1296
                            onclause=self.versions.c.serial ==
1297
                            self.nodes.c.latest_version)
1298
        cc = self.nodes.c.path.in_(paths)
1299
        s = select([self.nodes.c.path, self.versions.c.type],
1300
                   from_obj=[inner_join]).where(cc).distinct()
1301
        r = self.conn.execute(s)
1302
        rows = r.fetchall()
1303
        r.close()
1304
        if rows:
1305
            return rows
1306
        return None