Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 78e1f8da

History | View | Annotate | Download (48.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.exc import NoSuchTableError
43

    
44
from dbworker import DBWorker, ESCAPE_CHAR
45

    
46
from pithos.backends.filter import parse_filters
47

    
48

    
49
ROOTNODE = 0
50

    
51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER) = range(11)
53

    
54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55

    
56
inf = float('inf')
57

    
58

    
59
def strnextling(prefix):
60
    """Return the first unicode string
61
       greater than but not starting with given prefix.
62
       strnextling('hello') -> 'hellp'
63
    """
64
    if not prefix:
65
        ## all strings start with the null string,
66
        ## therefore we have to approximate strnextling('')
67
        ## with the last unicode character supported by python
68
        ## 0x10ffff for wide (32-bit unicode) python builds
69
        ## 0x00ffff for narrow (16-bit unicode) python builds
70
        ## We will not autodetect. 0xffff is safe enough.
71
        return unichr(0xffff)
72
    s = prefix[:-1]
73
    c = ord(prefix[-1])
74
    if c >= 0xffff:
75
        raise RuntimeError
76
    s += unichr(c + 1)
77
    return s
78

    
79

    
80
def strprevling(prefix):
81
    """Return an approximation of the last unicode string
82
       less than but not starting with given prefix.
83
       strprevling(u'hello') -> u'helln\\xffff'
84
    """
85
    if not prefix:
86
        ## There is no prevling for the null string
87
        return prefix
88
    s = prefix[:-1]
89
    c = ord(prefix[-1])
90
    if c > 0:
91
        s += unichr(c - 1) + unichr(0xffff)
92
    return s
93

    
94
_propnames = {
95
    'serial': 0,
96
    'node': 1,
97
    'hash': 2,
98
    'size': 3,
99
    'type': 4,
100
    'source': 5,
101
    'mtime': 6,
102
    'muser': 7,
103
    'uuid': 8,
104
    'checksum': 9,
105
    'cluster': 10,
106
}
107

    
108

    
109
def create_tables(engine):
110
    metadata = MetaData()
111

    
112
    #create nodes table
113
    columns = []
114
    columns.append(Column('node', Integer, primary_key=True))
115
    columns.append(Column('parent', Integer,
116
                          ForeignKey('nodes.node',
117
                                     ondelete='CASCADE',
118
                                     onupdate='CASCADE'),
119
                          autoincrement=False))
120
    columns.append(Column('latest_version', Integer))
121
    columns.append(Column('path', String(2048), default='', nullable=False))
122
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123
    Index('idx_nodes_path', nodes.c.path, unique=True)
124
    Index('idx_nodes_parent', nodes.c.parent)
125

    
126
    #create policy table
127
    columns = []
128
    columns.append(Column('node', Integer,
129
                          ForeignKey('nodes.node',
130
                                     ondelete='CASCADE',
131
                                     onupdate='CASCADE'),
132
                          primary_key=True))
133
    columns.append(Column('key', String(128), primary_key=True))
134
    columns.append(Column('value', String(256)))
135
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
136

    
137
    #create statistics table
138
    columns = []
139
    columns.append(Column('node', Integer,
140
                          ForeignKey('nodes.node',
141
                                     ondelete='CASCADE',
142
                                     onupdate='CASCADE'),
143
                          primary_key=True))
144
    columns.append(Column('population', Integer, nullable=False, default=0))
145
    columns.append(Column('size', BigInteger, nullable=False, default=0))
146
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
147
    columns.append(Column('cluster', Integer, nullable=False, default=0,
148
                          primary_key=True, autoincrement=False))
149
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
150

    
151
    #create versions table
152
    columns = []
153
    columns.append(Column('serial', Integer, primary_key=True))
154
    columns.append(Column('node', Integer,
155
                          ForeignKey('nodes.node',
156
                                     ondelete='CASCADE',
157
                                     onupdate='CASCADE')))
158
    columns.append(Column('hash', String(256)))
159
    columns.append(Column('size', BigInteger, nullable=False, default=0))
160
    columns.append(Column('type', String(256), nullable=False, default=''))
161
    columns.append(Column('source', Integer))
162
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
163
    columns.append(Column('muser', String(256), nullable=False, default=''))
164
    columns.append(Column('uuid', String(64), nullable=False, default=''))
165
    columns.append(Column('checksum', String(256), nullable=False, default=''))
166
    columns.append(Column('cluster', Integer, nullable=False, default=0))
167
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
168
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
169
    Index('idx_versions_node_uuid', versions.c.uuid)
170

    
171
    #create attributes table
172
    columns = []
173
    columns.append(Column('serial', Integer,
174
                          ForeignKey('versions.serial',
175
                                     ondelete='CASCADE',
176
                                     onupdate='CASCADE'),
177
                          primary_key=True))
178
    columns.append(Column('domain', String(256), primary_key=True))
179
    columns.append(Column('key', String(128), primary_key=True))
180
    columns.append(Column('value', String(256)))
181
    columns.append(Column('node', Integer, nullable=False, default=0))
182
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
183
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
184
    Index('idx_attributes_domain', attributes.c.domain)
185
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
186

    
187
    metadata.create_all(engine)
188
    return metadata.sorted_tables
189

    
190

    
191
class Node(DBWorker):
192
    """Nodes store path organization and have multiple versions.
193
       Versions store object history and have multiple attributes.
194
       Attributes store metadata.
195
    """
196

    
197
    # TODO: Provide an interface for included and excluded clusters.
198

    
199
    def __init__(self, **params):
200
        DBWorker.__init__(self, **params)
201
        try:
202
            metadata = MetaData(self.engine)
203
            self.nodes = Table('nodes', metadata, autoload=True)
204
            self.policy = Table('policy', metadata, autoload=True)
205
            self.statistics = Table('statistics', metadata, autoload=True)
206
            self.versions = Table('versions', metadata, autoload=True)
207
            self.attributes = Table('attributes', metadata, autoload=True)
208
        except NoSuchTableError:
209
            tables = create_tables(self.engine)
210
            map(lambda t: self.__setattr__(t.name, t), tables)
211

    
212
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
213
                                           self.nodes.c.parent == ROOTNODE))
214
        wrapper = self.wrapper
215
        wrapper.execute()
216
        try:
217
            rp = self.conn.execute(s)
218
            r = rp.fetchone()
219
            rp.close()
220
            if not r:
221
                s = self.nodes.insert(
222
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
223
                self.conn.execute(s)
224
        finally:
225
            wrapper.commit()
226

    
227
    def node_create(self, parent, path):
228
        """Create a new node from the given properties.
229
           Return the node identifier of the new node.
230
        """
231
        #TODO catch IntegrityError?
232
        s = self.nodes.insert().values(parent=parent, path=path)
233
        r = self.conn.execute(s)
234
        inserted_primary_key = r.inserted_primary_key[0]
235
        r.close()
236
        return inserted_primary_key
237

    
238
    def node_lookup(self, path, for_update=False):
239
        """Lookup the current node of the given path.
240
           Return None if the path is not found.
241
        """
242

    
243
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
244
        s = select([self.nodes.c.node],
245
                   self.nodes.c.path.like(self.escape_like(path),
246
                                          escape=ESCAPE_CHAR),
247
                   for_update=for_update)
248
        r = self.conn.execute(s)
249
        row = r.fetchone()
250
        r.close()
251
        if row:
252
            return row[0]
253
        return None
254

    
255
    def node_lookup_bulk(self, paths):
256
        """Lookup the current nodes for the given paths.
257
           Return () if the path is not found.
258
        """
259

    
260
        if not paths:
261
            return ()
262
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
263
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
264
        r = self.conn.execute(s)
265
        rows = r.fetchall()
266
        r.close()
267
        return [row[0] for row in rows]
268

    
269
    def node_get_properties(self, node):
270
        """Return the node's (parent, path).
271
           Return None if the node is not found.
272
        """
273

    
274
        s = select([self.nodes.c.parent, self.nodes.c.path])
275
        s = s.where(self.nodes.c.node == node)
276
        r = self.conn.execute(s)
277
        l = r.fetchone()
278
        r.close()
279
        return l
280

    
281
    def node_get_parent_path(self, node):
282
        """Return the node's parent path.
283
           Return None if the node is not found.
284
        """
285

    
286
        n1 = self.nodes.alias('n1')
287
        n2 = self.nodes.alias('n2')
288

    
289
        s = select([n2.c.path])
290
        s = s.where(n2.c.node == n1.c.parent)
291
        s = s.where(n1.c.node == node)
292
        r = self.conn.execute(s)
293
        l = r.fetchone()
294
        r.close()
295
        return l[0] if l is not None else None
296

    
297
    def node_get_versions(self, node, keys=(), propnames=_propnames):
298
        """Return the properties of all versions at node.
299
           If keys is empty, return all properties in the order
300
           (serial, node, hash, size, type, source, mtime, muser, uuid,
301
            checksum, cluster).
302
        """
303

    
304
        s = select([self.versions.c.serial,
305
                    self.versions.c.node,
306
                    self.versions.c.hash,
307
                    self.versions.c.size,
308
                    self.versions.c.type,
309
                    self.versions.c.source,
310
                    self.versions.c.mtime,
311
                    self.versions.c.muser,
312
                    self.versions.c.uuid,
313
                    self.versions.c.checksum,
314
                    self.versions.c.cluster], self.versions.c.node == node)
315
        s = s.order_by(self.versions.c.serial)
316
        r = self.conn.execute(s)
317
        rows = r.fetchall()
318
        r.close()
319
        if not rows:
320
            return rows
321

    
322
        if not keys:
323
            return rows
324

    
325
        return [[p[propnames[k]] for k in keys if k in propnames] for
326
                p in rows]
327

    
328
    def node_count_children(self, node):
329
        """Return node's child count."""
330

    
331
        s = select([func.count(self.nodes.c.node)])
332
        s = s.where(and_(self.nodes.c.parent == node,
333
                         self.nodes.c.node != ROOTNODE))
334
        r = self.conn.execute(s)
335
        row = r.fetchone()
336
        r.close()
337
        return row[0]
338

    
339
    def node_purge_children(self, parent, before=inf, cluster=0,
340
                            update_statistics_ancestors_depth=None):
341
        """Delete all versions with the specified
342
           parent and cluster, and return
343
           the hashes, the total size and the serials of versions deleted.
344
           Clears out nodes with no remaining versions.
345
        """
346
        #update statistics
347
        c1 = select([self.nodes.c.node],
348
                    self.nodes.c.parent == parent)
349
        where_clause = and_(self.versions.c.node.in_(c1),
350
                            self.versions.c.cluster == cluster)
351
        if before != inf:
352
            where_clause = and_(where_clause,
353
                                self.versions.c.mtime <= before)
354
        s = select([func.count(self.versions.c.serial),
355
                    func.sum(self.versions.c.size)])
356
        s = s.where(where_clause)
357
        r = self.conn.execute(s)
358
        row = r.fetchone()
359
        r.close()
360
        if not row:
361
            return (), 0, ()
362
        nr, size = row[0], row[1] if row[1] else 0
363
        mtime = time()
364
        self.statistics_update(parent, -nr, -size, mtime, cluster)
365
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
366
                                         update_statistics_ancestors_depth)
367

    
368
        s = select([self.versions.c.hash, self.versions.c.serial])
369
        s = s.where(where_clause)
370
        r = self.conn.execute(s)
371
        hashes = []
372
        serials = []
373
        for row in r.fetchall():
374
            hashes += [row[0]]
375
            serials += [row[1]]
376
        r.close()
377

    
378
        #delete versions
379
        s = self.versions.delete().where(where_clause)
380
        r = self.conn.execute(s)
381
        r.close()
382

    
383
        #delete nodes
384
        s = select([self.nodes.c.node],
385
                   and_(self.nodes.c.parent == parent,
386
                        select([func.count(self.versions.c.serial)],
387
                               self.versions.c.node == self.nodes.c.node).
388
                        as_scalar() == 0))
389
        rp = self.conn.execute(s)
390
        nodes = [row[0] for row in rp.fetchall()]
391
        rp.close()
392
        if nodes:
393
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
394
            self.conn.execute(s).close()
395

    
396
        return hashes, size, serials
397

    
398
    def node_purge(self, node, before=inf, cluster=0,
399
                   update_statistics_ancestors_depth=None):
400
        """Delete all versions with the specified
401
           node and cluster, and return
402
           the hashes and size of versions deleted.
403
           Clears out the node if it has no remaining versions.
404
        """
405

    
406
        #update statistics
407
        s = select([func.count(self.versions.c.serial),
408
                    func.sum(self.versions.c.size)])
409
        where_clause = and_(self.versions.c.node == node,
410
                            self.versions.c.cluster == cluster)
411
        if before != inf:
412
            where_clause = and_(where_clause,
413
                                self.versions.c.mtime <= before)
414
        s = s.where(where_clause)
415
        r = self.conn.execute(s)
416
        row = r.fetchone()
417
        nr, size = row[0], row[1]
418
        r.close()
419
        if not nr:
420
            return (), 0, ()
421
        mtime = time()
422
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
423
                                         update_statistics_ancestors_depth)
424

    
425
        s = select([self.versions.c.hash, self.versions.c.serial])
426
        s = s.where(where_clause)
427
        r = self.conn.execute(s)
428
        hashes = []
429
        serials = []
430
        for row in r.fetchall():
431
            hashes += [row[0]]
432
            serials += [row[1]]
433
        r.close()
434

    
435
        #delete versions
436
        s = self.versions.delete().where(where_clause)
437
        r = self.conn.execute(s)
438
        r.close()
439

    
440
        #delete nodes
441
        s = select([self.nodes.c.node],
442
                   and_(self.nodes.c.node == node,
443
                        select([func.count(self.versions.c.serial)],
444
                               self.versions.c.node == self.nodes.c.node).
445
                        as_scalar() == 0))
446
        rp = self.conn.execute(s)
447
        nodes = [row[0] for row in rp.fetchall()]
448
        rp.close()
449
        if nodes:
450
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
451
            self.conn.execute(s).close()
452

    
453
        return hashes, size, serials
454

    
455
    def node_remove(self, node, update_statistics_ancestors_depth=None):
456
        """Remove the node specified.
457
           Return false if the node has children or is not found.
458
        """
459

    
460
        if self.node_count_children(node):
461
            return False
462

    
463
        mtime = time()
464
        s = select([func.count(self.versions.c.serial),
465
                    func.sum(self.versions.c.size),
466
                    self.versions.c.cluster])
467
        s = s.where(self.versions.c.node == node)
468
        s = s.group_by(self.versions.c.cluster)
469
        r = self.conn.execute(s)
470
        for population, size, cluster in r.fetchall():
471
            self.statistics_update_ancestors(
472
                node, -population, -size, mtime, cluster,
473
                update_statistics_ancestors_depth)
474
        r.close()
475

    
476
        s = self.nodes.delete().where(self.nodes.c.node == node)
477
        self.conn.execute(s).close()
478
        return True
479

    
480
    def node_accounts(self, accounts=()):
481
        s = select([self.nodes.c.path, self.nodes.c.node])
482
        s = s.where(and_(self.nodes.c.node != 0,
483
                         self.nodes.c.parent == 0))
484
        if accounts:
485
            s = s.where(self.nodes.c.path.in_(accounts))
486
        r = self.conn.execute(s)
487
        rows = r.fetchall()
488
        r.close()
489
        return rows
490

    
491
    def node_account_quotas(self):
492
        s = select([self.nodes.c.path, self.policy.c.value])
493
        s = s.where(and_(self.nodes.c.node != 0,
494
                         self.nodes.c.parent == 0))
495
        s = s.where(self.nodes.c.node == self.policy.c.node)
496
        s = s.where(self.policy.c.key == 'quota')
497
        r = self.conn.execute(s)
498
        rows = r.fetchall()
499
        r.close()
500
        return dict(rows)
501

    
502
    def node_account_usage(self, account=None, cluster=0):
503
        """Return usage for a specific account.
504

505
        Keyword arguments:
506
        account -- (default None: list usage for all the accounts)
507
        cluster -- list current, history or deleted usage (default 0: normal)
508
        """
509

    
510
        n1 = self.nodes.alias('n1')
511
        n2 = self.nodes.alias('n2')
512
        n3 = self.nodes.alias('n3')
513

    
514
        s = select([n3.c.path, func.sum(self.versions.c.size)])
515
        s = s.where(n1.c.node == self.versions.c.node)
516
        s = s.where(self.versions.c.cluster == cluster)
517
        s = s.where(n1.c.parent == n2.c.node)
518
        s = s.where(n2.c.parent == n3.c.node)
519
        s = s.where(n3.c.parent == 0)
520
        s = s.where(n3.c.node != 0)
521
        if account:
522
            s = s.where(n3.c.path == account)
523
        s = s.group_by(n3.c.path)
524
        r = self.conn.execute(s)
525
        usage = r.fetchall()
526
        r.close()
527
        return dict(usage)
528

    
529
    def policy_get(self, node):
530
        s = select([self.policy.c.key, self.policy.c.value],
531
                   self.policy.c.node == node)
532
        r = self.conn.execute(s)
533
        d = dict(r.fetchall())
534
        r.close()
535
        return d
536

    
537
    def policy_set(self, node, policy):
538
        #insert or replace
539
        for k, v in policy.iteritems():
540
            s = self.policy.update().where(and_(self.policy.c.node == node,
541
                                                self.policy.c.key == k))
542
            s = s.values(value=v)
543
            rp = self.conn.execute(s)
544
            rp.close()
545
            if rp.rowcount == 0:
546
                s = self.policy.insert()
547
                values = {'node': node, 'key': k, 'value': v}
548
                r = self.conn.execute(s, values)
549
                r.close()
550

    
551
    def statistics_get(self, node, cluster=0):
552
        """Return population, total size and last mtime
553
           for all versions under node that belong to the cluster.
554
        """
555

    
556
        s = select([self.statistics.c.population,
557
                    self.statistics.c.size,
558
                    self.statistics.c.mtime])
559
        s = s.where(and_(self.statistics.c.node == node,
560
                         self.statistics.c.cluster == cluster))
561
        r = self.conn.execute(s)
562
        row = r.fetchone()
563
        r.close()
564
        return row
565

    
566
    def statistics_update(self, node, population, size, mtime, cluster=0):
567
        """Update the statistics of the given node.
568
           Statistics keep track the population, total
569
           size of objects and mtime in the node's namespace.
570
           May be zero or positive or negative numbers.
571
        """
572
        s = select([self.statistics.c.population, self.statistics.c.size],
573
                   and_(self.statistics.c.node == node,
574
                        self.statistics.c.cluster == cluster))
575
        rp = self.conn.execute(s)
576
        r = rp.fetchone()
577
        rp.close()
578
        if not r:
579
            prepopulation, presize = (0, 0)
580
        else:
581
            prepopulation, presize = r
582
        population += prepopulation
583
        population = max(population, 0)
584
        size += presize
585

    
586
        #insert or replace
587
        #TODO better upsert
588
        u = self.statistics.update().where(and_(
589
            self.statistics.c.node == node,
590
            self.statistics.c.cluster == cluster))
591
        u = u.values(population=population, size=size, mtime=mtime)
592
        rp = self.conn.execute(u)
593
        rp.close()
594
        if rp.rowcount == 0:
595
            ins = self.statistics.insert()
596
            ins = ins.values(node=node, population=population, size=size,
597
                             mtime=mtime, cluster=cluster)
598
            self.conn.execute(ins).close()
599

    
600
    def statistics_update_ancestors(self, node, population, size, mtime,
601
                                    cluster=0, recursion_depth=None):
602
        """Update the statistics of the given node's parent.
603
           Then recursively update all parents up to the root
604
           or up to the ``recursion_depth`` (if not None).
605
           Population is not recursive.
606
        """
607

    
608
        i = 0
609
        while True:
610
            if node == ROOTNODE:
611
                break
612
            if recursion_depth and recursion_depth <= i:
613
                break
614
            props = self.node_get_properties(node)
615
            if props is None:
616
                break
617
            parent, path = props
618
            self.statistics_update(parent, population, size, mtime, cluster)
619
            node = parent
620
            population = 0  # Population isn't recursive
621
            i += 1
622

    
623
    def statistics_latest(self, node, before=inf, except_cluster=0):
624
        """Return population, total size and last mtime
625
           for all latest versions under node that
626
           do not belong to the cluster.
627
        """
628

    
629
        # The node.
630
        props = self.node_get_properties(node)
631
        if props is None:
632
            return None
633
        parent, path = props
634

    
635
        # The latest version.
636
        s = select([self.versions.c.serial,
637
                    self.versions.c.node,
638
                    self.versions.c.hash,
639
                    self.versions.c.size,
640
                    self.versions.c.type,
641
                    self.versions.c.source,
642
                    self.versions.c.mtime,
643
                    self.versions.c.muser,
644
                    self.versions.c.uuid,
645
                    self.versions.c.checksum,
646
                    self.versions.c.cluster])
647
        if before != inf:
648
            filtered = select([func.max(self.versions.c.serial)],
649
                              self.versions.c.node == node)
650
            filtered = filtered.where(self.versions.c.mtime < before)
651
        else:
652
            filtered = select([self.nodes.c.latest_version],
653
                              self.nodes.c.node == node)
654
        s = s.where(and_(self.versions.c.cluster != except_cluster,
655
                         self.versions.c.serial == filtered))
656
        r = self.conn.execute(s)
657
        props = r.fetchone()
658
        r.close()
659
        if not props:
660
            return None
661
        mtime = props[MTIME]
662

    
663
        # First level, just under node (get population).
664
        v = self.versions.alias('v')
665
        s = select([func.count(v.c.serial),
666
                    func.sum(v.c.size),
667
                    func.max(v.c.mtime)])
668
        if before != inf:
669
            c1 = select([func.max(self.versions.c.serial)],
670
                        and_(self.versions.c.mtime < before,
671
                             self.versions.c.node == v.c.node))
672
        else:
673
            c1 = select([self.nodes.c.latest_version])
674
            c1 = c1.where(self.nodes.c.node == v.c.node)
675
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
676
        s = s.where(and_(v.c.serial == c1,
677
                         v.c.cluster != except_cluster,
678
                         v.c.node.in_(c2)))
679
        rp = self.conn.execute(s)
680
        r = rp.fetchone()
681
        rp.close()
682
        if not r:
683
            return None
684
        count = r[0]
685
        mtime = max(mtime, r[2])
686
        if count == 0:
687
            return (0, 0, mtime)
688

    
689
        # All children (get size and mtime).
690
        # This is why the full path is stored.
691
        if before != inf:
692
            s = select([func.count(v.c.serial),
693
                    func.sum(v.c.size),
694
                    func.max(v.c.mtime)])
695
            c1 = select([func.max(self.versions.c.serial)],
696
                        and_(self.versions.c.mtime < before,
697
                             self.versions.c.node == v.c.node))
698
        else:
699
            inner_join = \
700
                    self.versions.join(self.nodes, onclause=\
701
                    self.versions.c.serial == self.nodes.c.latest_version)
702
            s = select([func.count(self.versions.c.serial),
703
                    func.sum(self.versions.c.size),
704
                    func.max(self.versions.c.mtime)], from_obj=[inner_join])
705

    
706
        c2 = select([self.nodes.c.node],
707
                    self.nodes.c.path.like(self.escape_like(path) + '%',
708
                                           escape=ESCAPE_CHAR))
709
        if before != inf:
710
            s = s.where(and_(v.c.serial == c1,
711
                         v.c.cluster != except_cluster,
712
                         v.c.node.in_(c2)))
713
        else:
714
            s = s.where(and_(self.versions.c.cluster != except_cluster,
715
                        self.versions.c.node.in_(c2)))
716

    
717
        rp = self.conn.execute(s)
718
        r = rp.fetchone()
719
        rp.close()
720
        if not r:
721
            return None
722
        size = long(r[1] - props[SIZE])
723
        mtime = max(mtime, r[2])
724
        return (count, size, mtime)
725

    
726
    def nodes_set_latest_version(self, node, serial):
727
        s = self.nodes.update().where(self.nodes.c.node == node)
728
        s = s.values(latest_version=serial)
729
        self.conn.execute(s).close()
730

    
731
    def version_create(self, node, hash, size, type, source, muser, uuid,
732
                       checksum, cluster=0,
733
                       update_statistics_ancestors_depth=None):
734
        """Create a new version from the given properties.
735
           Return the (serial, mtime) of the new version.
736
        """
737

    
738
        mtime = time()
739
        s = self.versions.insert().values(
740
            node=node, hash=hash, size=size, type=type, source=source,
741
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
742
            cluster=cluster)
743
        serial = self.conn.execute(s).inserted_primary_key[0]
744
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
745
                                         update_statistics_ancestors_depth)
746

    
747
        self.nodes_set_latest_version(node, serial)
748

    
749
        return serial, mtime
750

    
751
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
752
        """Lookup the current version of the given node.
753
           Return a list with its properties:
754
           (serial, node, hash, size, type, source, mtime,
755
            muser, uuid, checksum, cluster)
756
           or None if the current version is not found in the given cluster.
757
        """
758

    
759
        v = self.versions.alias('v')
760
        if not all_props:
761
            s = select([v.c.serial])
762
        else:
763
            s = select([v.c.serial, v.c.node, v.c.hash,
764
                        v.c.size, v.c.type, v.c.source,
765
                        v.c.mtime, v.c.muser, v.c.uuid,
766
                        v.c.checksum, v.c.cluster])
767
        if before != inf:
768
            c = select([func.max(self.versions.c.serial)],
769
                       self.versions.c.node == node)
770
            c = c.where(self.versions.c.mtime < before)
771
        else:
772
            c = select([self.nodes.c.latest_version],
773
                       self.nodes.c.node == node)
774
        s = s.where(and_(v.c.serial == c,
775
                         v.c.cluster == cluster))
776
        r = self.conn.execute(s)
777
        props = r.fetchone()
778
        r.close()
779
        if props:
780
            return props
781
        return None
782

    
783
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
784
                            all_props=True, order_by_path=False):
785
        """Lookup the current versions of the given nodes.
786
           Return a list with their properties:
787
           (serial, node, hash, size, type, source, mtime, muser, uuid,
788
            checksum, cluster).
789
        """
790
        if not nodes:
791
            return ()
792
        v = self.versions.alias('v')
793
        n = self.nodes.alias('n')
794
        if not all_props:
795
            s = select([v.c.serial])
796
        else:
797
            s = select([v.c.serial, v.c.node, v.c.hash,
798
                        v.c.size, v.c.type, v.c.source,
799
                        v.c.mtime, v.c.muser, v.c.uuid,
800
                        v.c.checksum, v.c.cluster])
801
        if before != inf:
802
            c = select([func.max(self.versions.c.serial)],
803
                       self.versions.c.node.in_(nodes))
804
            c = c.where(self.versions.c.mtime < before)
805
            c = c.group_by(self.versions.c.node)
806
        else:
807
            c = select([self.nodes.c.latest_version],
808
                       self.nodes.c.node.in_(nodes))
809
        s = s.where(and_(v.c.serial.in_(c),
810
                         v.c.cluster == cluster))
811
        if order_by_path:
812
            s = s.where(v.c.node == n.c.node)
813
            s = s.order_by(n.c.path)
814
        else:
815
            s = s.order_by(v.c.node)
816
        r = self.conn.execute(s)
817
        rproxy = r.fetchall()
818
        r.close()
819
        return (tuple(row.values()) for row in rproxy)
820

    
821
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
822
                               node=None):
823
        """Return a sequence of values for the properties of
824
           the version specified by serial and the keys, in the order given.
825
           If keys is empty, return all properties in the order
826
           (serial, node, hash, size, type, source, mtime, muser, uuid,
827
            checksum, cluster).
828
        """
829

    
830
        v = self.versions.alias()
831
        s = select([v.c.serial, v.c.node, v.c.hash,
832
                    v.c.size, v.c.type, v.c.source,
833
                    v.c.mtime, v.c.muser, v.c.uuid,
834
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
835
        if node is not None:
836
            s = s.where(v.c.node == node)
837
        rp = self.conn.execute(s)
838
        r = rp.fetchone()
839
        rp.close()
840
        if r is None:
841
            return r
842

    
843
        if not keys:
844
            return r
845
        return [r[propnames[k]] for k in keys if k in propnames]
846

    
847
    def version_put_property(self, serial, key, value):
848
        """Set value for the property of version specified by key."""
849

    
850
        if key not in _propnames:
851
            return
852
        s = self.versions.update()
853
        s = s.where(self.versions.c.serial == serial)
854
        s = s.values(**{key: value})
855
        self.conn.execute(s).close()
856

    
857
    def version_recluster(self, serial, cluster,
858
                          update_statistics_ancestors_depth=None):
859
        """Move the version into another cluster."""
860

    
861
        props = self.version_get_properties(serial)
862
        if not props:
863
            return
864
        node = props[NODE]
865
        size = props[SIZE]
866
        oldcluster = props[CLUSTER]
867
        if cluster == oldcluster:
868
            return
869

    
870
        mtime = time()
871
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
872
                                         update_statistics_ancestors_depth)
873
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
874
                                         update_statistics_ancestors_depth)
875

    
876
        s = self.versions.update()
877
        s = s.where(self.versions.c.serial == serial)
878
        s = s.values(cluster=cluster)
879
        self.conn.execute(s).close()
880

    
881
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
882
        """Remove the serial specified."""
883

    
884
        props = self.version_get_properties(serial)
885
        if not props:
886
            return
887
        node = props[NODE]
888
        hash = props[HASH]
889
        size = props[SIZE]
890
        cluster = props[CLUSTER]
891

    
892
        mtime = time()
893
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
894
                                         update_statistics_ancestors_depth)
895

    
896
        s = self.versions.delete().where(self.versions.c.serial == serial)
897
        self.conn.execute(s).close()
898

    
899
        props = self.version_lookup(node, cluster=cluster, all_props=False)
900
        if props:
901
            self.nodes_set_latest_version(node, serial)
902

    
903
        return hash, size
904

    
905
    def attribute_get(self, serial, domain, keys=()):
906
        """Return a list of (key, value) pairs of the specific version.
907

908
        If keys is empty, return all attributes.
909
        Othwerise, return only those specified.
910
        """
911

    
912
        if keys:
913
            attrs = self.attributes.alias()
914
            s = select([attrs.c.key, attrs.c.value])
915
            s = s.where(and_(attrs.c.key.in_(keys),
916
                             attrs.c.serial == serial,
917
                             attrs.c.domain == domain))
918
        else:
919
            attrs = self.attributes.alias()
920
            s = select([attrs.c.key, attrs.c.value])
921
            s = s.where(and_(attrs.c.serial == serial,
922
                             attrs.c.domain == domain))
923
        r = self.conn.execute(s)
924
        l = r.fetchall()
925
        r.close()
926
        return l
927

    
928
    def attribute_set(self, serial, domain, node, items, is_latest=True):
929
        """Set the attributes of the version specified by serial.
930
           Receive attributes as an iterable of (key, value) pairs.
931
        """
932
        #insert or replace
933
        #TODO better upsert
934
        for k, v in items:
935
            s = self.attributes.update()
936
            s = s.where(and_(self.attributes.c.serial == serial,
937
                             self.attributes.c.domain == domain,
938
                             self.attributes.c.key == k))
939
            s = s.values(value=v)
940
            rp = self.conn.execute(s)
941
            rp.close()
942
            if rp.rowcount == 0:
943
                s = self.attributes.insert()
944
                s = s.values(serial=serial, domain=domain, node=node,
945
                             is_latest=is_latest, key=k, value=v)
946
                self.conn.execute(s).close()
947

    
948
    def attribute_del(self, serial, domain, keys=()):
949
        """Delete attributes of the version specified by serial.
950
           If keys is empty, delete all attributes.
951
           Otherwise delete those specified.
952
        """
953

    
954
        if keys:
955
            #TODO more efficient way to do this?
956
            for key in keys:
957
                s = self.attributes.delete()
958
                s = s.where(and_(self.attributes.c.serial == serial,
959
                                 self.attributes.c.domain == domain,
960
                                 self.attributes.c.key == key))
961
                self.conn.execute(s).close()
962
        else:
963
            s = self.attributes.delete()
964
            s = s.where(and_(self.attributes.c.serial == serial,
965
                             self.attributes.c.domain == domain))
966
            self.conn.execute(s).close()
967

    
968
    def attribute_copy(self, source, dest):
969
        s = select(
970
            [dest, self.attributes.c.domain, self.attributes.c.node,
971
             self.attributes.c.key, self.attributes.c.value],
972
            self.attributes.c.serial == source)
973
        rp = self.conn.execute(s)
974
        attributes = rp.fetchall()
975
        rp.close()
976
        for dest, domain, node, k, v in attributes:
977
            select_src_node = select(
978
                [self.versions.c.node],
979
                self.versions.c.serial == dest)
980
            # insert or replace
981
            s = self.attributes.update().where(and_(
982
                self.attributes.c.serial == dest,
983
                self.attributes.c.domain == domain,
984
                self.attributes.c.key == k))
985
            s = s.values(node=select_src_node, value=v)
986
            rp = self.conn.execute(s)
987
            rp.close()
988
            if rp.rowcount == 0:
989
                s = self.attributes.insert()
990
                s = s.values(serial=dest, domain=domain, node=select_src_node,
991
                             is_latest=True, key=k, value=v)
992
            self.conn.execute(s).close()
993

    
994
    def attribute_unset_is_latest(self, node, exclude):
995
        u = self.attributes.update().where(and_(
996
            self.attributes.c.node == node,
997
            self.attributes.c.serial != exclude)).values({'is_latest': False})
998
        self.conn.execute(u)
999

    
1000
    def latest_attribute_keys(self, parent, domain, before=inf,
1001
                              except_cluster=0, pathq=None):
1002
        """Return a list with all keys pairs defined
1003
           for all latest versions under parent that
1004
           do not belong to the cluster.
1005
        """
1006

    
1007
        pathq = pathq or []
1008

    
1009
        # TODO: Use another table to store before=inf results.
1010
        v = self.versions.alias('v')
1011
        s = select([self.attributes.c.key]).distinct()
1012
        if before != inf:
1013
            filtered = select([func.max(v.c.serial)],
1014
                              and_(v.c.mtime < before,
1015
                                   v.c.node == self.versions.c.node))
1016
        else:
1017
            filtered = select([self.nodes.c.latest_version])
1018
            filtered = filtered.where(self.nodes.c.node == \
1019
                            self.versions.c.node).correlate(self.versions)
1020
        s = s.where(self.versions.c.serial == filtered)
1021
        s = s.where(self.versions.c.cluster != except_cluster)
1022
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1023
                                        self.nodes.c.parent == parent)))
1024
        s = s.where(self.attributes.c.serial == self.versions.c.serial)
1025
        s = s.where(self.attributes.c.domain == domain)
1026
        s = s.where(self.nodes.c.node == self.versions.c.node)
1027
        s = s.order_by(self.attributes.c.key)
1028
        conja = []
1029
        conjb = []
1030
        for path, match in pathq:
1031
            if match == MATCH_PREFIX:
1032
                conja.append(self.nodes.c.path.like(self.escape_like(path) + '%',
1033
                                          escape=ESCAPE_CHAR))
1034
            elif match == MATCH_EXACT:
1035
                conjb.append(path)
1036
        if conja or conjb:
1037
            s = s.where(or_(self.nodes.c.path.in_(conjb),*conja))
1038
        rp = self.conn.execute(s)
1039
        rows = rp.fetchall()
1040
        rp.close()
1041
        return [r[0] for r in rows]
1042

    
1043
    def latest_version_list(self, parent, prefix='', delimiter=None,
1044
                            start='', limit=10000, before=inf,
1045
                            except_cluster=0, pathq=[], domain=None,
1046
                            filterq=[], sizeq=None, all_props=False):
1047
        """Return a (list of (path, serial) tuples, list of common prefixes)
1048
           for the current versions of the paths with the given parent,
1049
           matching the following criteria.
1050

1051
           The property tuple for a version is returned if all
1052
           of these conditions are true:
1053

1054
                a. parent matches
1055

1056
                b. path > start
1057

1058
                c. path starts with prefix (and paths in pathq)
1059

1060
                d. version is the max up to before
1061

1062
                e. version is not in cluster
1063

1064
                f. the path does not have the delimiter occuring
1065
                   after the prefix, or ends with the delimiter
1066

1067
                g. serial matches the attribute filter query.
1068

1069
                   A filter query is a comma-separated list of
1070
                   terms in one of these three forms:
1071

1072
                   key
1073
                       an attribute with this key must exist
1074

1075
                   !key
1076
                       an attribute with this key must not exist
1077

1078
                   key ?op value
1079
                       the attribute with this key satisfies the value
1080
                       where ?op is one of ==, != <=, >=, <, >.
1081

1082
                h. the size is in the range set by sizeq
1083

1084
           The list of common prefixes includes the prefixes
1085
           matching up to the first delimiter after prefix,
1086
           and are reported only once, as "virtual directories".
1087
           The delimiter is included in the prefixes.
1088

1089
           If arguments are None, then the corresponding matching rule
1090
           will always match.
1091

1092
           Limit applies to the first list of tuples returned.
1093

1094
           If all_props is True, return all properties after path,
1095
           not just serial.
1096
        """
1097

    
1098
        if not start or start < prefix:
1099
            start = strprevling(prefix)
1100
        nextling = strnextling(prefix)
1101

    
1102
        v = self.versions.alias('v')
1103
        n = self.nodes.alias('n')
1104
        if before != inf:
1105
            filtered = select([func.max(v.c.serial)],
1106
                              and_(v.c.mtime < before,
1107
                                   v.c.node == self.versions.c.node))
1108
            inner_join = \
1109
                    self.nodes.join(self.versions,
1110
                            onclause=self.versions.c.serial==filtered)
1111
        else:
1112
            filtered = select([self.nodes.c.latest_version])
1113
            filtered = filtered.where(self.nodes.c.node == self.versions.c.node).correlate(self.versions)
1114
            inner_join = \
1115
                    self.nodes.join(self.versions,
1116
                            onclause=\
1117
                            self.versions.c.serial==filtered)
1118
        if not all_props:
1119
            s = select([self.nodes.c.path,
1120
                self.versions.c.serial],from_obj=[inner_join]).distinct()
1121
        else:
1122
            s = select([self.nodes.c.path,
1123
                        self.versions.c.serial, self.versions.c.node,
1124
                        self.versions.c.hash,
1125
                        self.versions.c.size, self.versions.c.type,
1126
                        self.versions.c.source,
1127
                        self.versions.c.mtime, self.versions.c.muser,
1128
                        self.versions.c.uuid,
1129
                        self.versions.c.checksum,
1130
                        self.versions.c.cluster],from_obj=[inner_join]).distinct()
1131

    
1132
        s = s.where(self.versions.c.cluster != except_cluster)
1133
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1134
                                             self.nodes.c.parent == parent)))
1135

    
1136
        s = s.where(self.versions.c.node == self.nodes.c.node)
1137
        s = s.where(and_(self.nodes.c.path > bindparam('start'),
1138
                         self.nodes.c.path < nextling))
1139
        conja = []
1140
        conjb = []
1141
        for path, match in pathq:
1142
            if match == MATCH_PREFIX:
1143
                conja.append(self.nodes.c.path.like(self.escape_like(path) + '%',
1144
                             escape=ESCAPE_CHAR))
1145
            elif match == MATCH_EXACT:
1146
                conjb.append(path)
1147
        if conja or conjb:
1148
            s = s.where(or_(self.nodes.c.path.in_(conjb),*conja))
1149

    
1150
        if sizeq and len(sizeq) == 2:
1151
            if sizeq[0]:
1152
                s = s.where(self.versions.c.size >= sizeq[0])
1153
            if sizeq[1]:
1154
                s = s.where(self.versions.c.size < sizeq[1])
1155

    
1156
        if domain and filterq:
1157
            a = self.attributes.alias('a')
1158
            included, excluded, opers = parse_filters(filterq)
1159
            if included:
1160
                subs = select([1])
1161
                subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1162
                subs = subs.where(self.attributes.c.domain == domain)
1163
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in included]))
1164
                s = s.where(exists(subs))
1165
            if excluded:
1166
                subs = select([1])
1167
                subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1168
                subs = subs.where(self.attributes.c.domain == domain)
1169
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in excluded]))
1170
                s = s.where(not_(exists(subs)))
1171
            if opers:
1172
                for k, o, val in opers:
1173
                    subs = select([1])
1174
                    subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1175
                    subs = subs.where(self.attributes.c.domain == domain)
1176
                    subs = subs.where(
1177
                        and_(self.attributes.c.key.op('=')(k), self.attributes.c.value.op(o)(val)))
1178
                    s = s.where(exists(subs))
1179

    
1180
        s = s.order_by(self.nodes.c.path)
1181

    
1182
        if not delimiter:
1183
            s = s.limit(limit)
1184
            rp = self.conn.execute(s, start=start)
1185
            r = rp.fetchall()
1186
            rp.close()
1187
            return r, ()
1188

    
1189
        pfz = len(prefix)
1190
        dz = len(delimiter)
1191
        count = 0
1192
        prefixes = []
1193
        pappend = prefixes.append
1194
        matches = []
1195
        mappend = matches.append
1196

    
1197
        rp = self.conn.execute(s, start=start)
1198
        while True:
1199
            props = rp.fetchone()
1200
            if props is None:
1201
                break
1202
            path = props[0]
1203
            idx = path.find(delimiter, pfz)
1204

    
1205
            if idx < 0:
1206
                mappend(props)
1207
                count += 1
1208
                if count >= limit:
1209
                    break
1210
                continue
1211

    
1212
            if idx + dz == len(path):
1213
                mappend(props)
1214
                count += 1
1215
                continue  # Get one more, in case there is a path.
1216
            pf = path[:idx + dz]
1217
            pappend(pf)
1218
            if count >= limit:
1219
                break
1220

    
1221
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1222
        rp.close()
1223

    
1224
        return matches, prefixes
1225

    
1226
    def latest_uuid(self, uuid, cluster):
1227
        """Return the latest version of the given uuid and cluster.
1228

1229
        Return a (path, serial) tuple.
1230
        If cluster is None, all clusters are considered.
1231

1232
        """
1233

    
1234
        v = self.versions.alias('v')
1235
        n = self.nodes.alias('n')
1236
        s = select([n.c.path, v.c.serial])
1237
        filtered = select([func.max(self.versions.c.serial)])
1238
        filtered = filtered.where(self.versions.c.uuid == uuid)
1239
        if cluster is not None:
1240
            filtered = filtered.where(self.versions.c.cluster == cluster)
1241
        s = s.where(v.c.serial == filtered)
1242
        s = s.where(n.c.node == v.c.node)
1243

    
1244
        r = self.conn.execute(s)
1245
        l = r.fetchone()
1246
        r.close()
1247
        return l
1248

    
1249
    def domain_object_list(self, domain, paths, cluster=None):
1250
        """Return a list of (path, property list, attribute dictionary)
1251
           for the objects in the specific domain and cluster.
1252
        """
1253

    
1254
        v = self.versions.alias('v')
1255
        n = self.nodes.alias('n')
1256
        a = self.attributes.alias('a')
1257

    
1258
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1259
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1260
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1261
        if cluster:
1262
            s = s.where(v.c.cluster == cluster)
1263
        s = s.where(v.c.serial == a.c.serial)
1264
        s = s.where(a.c.domain == domain)
1265
        s = s.where(a.c.node == n.c.node)
1266
        s = s.where(a.c.is_latest == True)
1267
        if paths:
1268
            s = s.where(n.c.path.in_(paths))
1269

    
1270
        r = self.conn.execute(s)
1271
        rows = r.fetchall()
1272
        r.close()
1273

    
1274
        group_by = itemgetter(slice(12))
1275
        rows.sort(key=group_by)
1276
        groups = groupby(rows, group_by)
1277
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1278
                (k, data) in groups]
1279

    
1280
    def get_props(self, paths):
1281
        inner_join = \
1282
            self.nodes.join(self.versions,
1283
                onclause=self.versions.c.serial == self.nodes.c.latest_version)
1284
        cc = self.nodes.c.path.in_(paths)
1285
        s = select([self.nodes.c.path, self.versions.c.type],
1286
                    from_obj=[inner_join]).where(cc).distinct()
1287
        r = self.conn.execute(s)
1288
        rows = r.fetchall()
1289
        r.close()
1290
        if rows:
1291
            return rows
1292
        return None