Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 5f9426d9

History | View | Annotate | Download (49.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.exc import NoSuchTableError
43

    
44
from dbworker import DBWorker, ESCAPE_CHAR
45

    
46
from pithos.backends.filter import parse_filters
47

    
48

    
49
ROOTNODE = 0
50

    
51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER, AVAILABLE, MAP_CHECK_TIMESTAMP) = range(13)
53

    
54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55

    
56
inf = float('inf')
57

    
58

    
59
def strnextling(prefix):
60
    """Return the first unicode string
61
       greater than but not starting with given prefix.
62
       strnextling('hello') -> 'hellp'
63
    """
64
    if not prefix:
65
        ## all strings start with the null string,
66
        ## therefore we have to approximate strnextling('')
67
        ## with the last unicode character supported by python
68
        ## 0x10ffff for wide (32-bit unicode) python builds
69
        ## 0x00ffff for narrow (16-bit unicode) python builds
70
        ## We will not autodetect. 0xffff is safe enough.
71
        return unichr(0xffff)
72
    s = prefix[:-1]
73
    c = ord(prefix[-1])
74
    if c >= 0xffff:
75
        raise RuntimeError
76
    s += unichr(c + 1)
77
    return s
78

    
79

    
80
def strprevling(prefix):
81
    """Return an approximation of the last unicode string
82
       less than but not starting with given prefix.
83
       strprevling(u'hello') -> u'helln\\xffff'
84
    """
85
    if not prefix:
86
        ## There is no prevling for the null string
87
        return prefix
88
    s = prefix[:-1]
89
    c = ord(prefix[-1])
90
    if c > 0:
91
        s += unichr(c - 1) + unichr(0xffff)
92
    return s
93

    
94
_propnames = {
95
    'serial': 0,
96
    'node': 1,
97
    'hash': 2,
98
    'size': 3,
99
    'type': 4,
100
    'source': 5,
101
    'mtime': 6,
102
    'muser': 7,
103
    'uuid': 8,
104
    'checksum': 9,
105
    'cluster': 10,
106
    'available':11,
107
    'map_check_timestamp':12
108
}
109

    
110

    
111
def create_tables(engine):
112
    metadata = MetaData()
113

    
114
    #create nodes table
115
    columns = []
116
    columns.append(Column('node', Integer, primary_key=True))
117
    columns.append(Column('parent', Integer,
118
                          ForeignKey('nodes.node',
119
                                     ondelete='CASCADE',
120
                                     onupdate='CASCADE'),
121
                          autoincrement=False))
122
    columns.append(Column('latest_version', Integer))
123
    columns.append(Column('path', String(2048), default='', nullable=False))
124
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
125
    Index('idx_nodes_path', nodes.c.path, unique=True)
126
    Index('idx_nodes_parent', nodes.c.parent)
127

    
128
    #create policy table
129
    columns = []
130
    columns.append(Column('node', Integer,
131
                          ForeignKey('nodes.node',
132
                                     ondelete='CASCADE',
133
                                     onupdate='CASCADE'),
134
                          primary_key=True))
135
    columns.append(Column('key', String(128), primary_key=True))
136
    columns.append(Column('value', String(256)))
137
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
138

    
139
    #create statistics table
140
    columns = []
141
    columns.append(Column('node', Integer,
142
                          ForeignKey('nodes.node',
143
                                     ondelete='CASCADE',
144
                                     onupdate='CASCADE'),
145
                          primary_key=True))
146
    columns.append(Column('population', Integer, nullable=False, default=0))
147
    columns.append(Column('size', BigInteger, nullable=False, default=0))
148
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
149
    columns.append(Column('cluster', Integer, nullable=False, default=0,
150
                          primary_key=True, autoincrement=False))
151
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
152

    
153
    #create versions table
154
    columns = []
155
    columns.append(Column('serial', Integer, primary_key=True))
156
    columns.append(Column('node', Integer,
157
                          ForeignKey('nodes.node',
158
                                     ondelete='CASCADE',
159
                                     onupdate='CASCADE')))
160
    columns.append(Column('hash', String(256)))
161
    columns.append(Column('size', BigInteger, nullable=False, default=0))
162
    columns.append(Column('type', String(256), nullable=False, default=''))
163
    columns.append(Column('source', Integer))
164
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
165
    columns.append(Column('muser', String(256), nullable=False, default=''))
166
    columns.append(Column('uuid', String(64), nullable=False, default=''))
167
    columns.append(Column('checksum', String(256), nullable=False, default=''))
168
    columns.append(Column('cluster', Integer, nullable=False, default=0))
169
    columns.append(Column('available', Boolean, nullable=False, default=True))
170
    columns.append(Column('map_check_timestamp', DECIMAL(precision=16,
171
                                                         scale=6)))
172
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
173
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
174
    Index('idx_versions_node_uuid', versions.c.uuid)
175

    
176
    #create attributes table
177
    columns = []
178
    columns.append(Column('serial', Integer,
179
                          ForeignKey('versions.serial',
180
                                     ondelete='CASCADE',
181
                                     onupdate='CASCADE'),
182
                          primary_key=True))
183
    columns.append(Column('domain', String(256), primary_key=True))
184
    columns.append(Column('key', String(128), primary_key=True))
185
    columns.append(Column('value', String(256)))
186
    columns.append(Column('node', Integer, nullable=False, default=0))
187
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
188
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
189
    Index('idx_attributes_domain', attributes.c.domain)
190
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
191

    
192
    metadata.create_all(engine)
193
    return metadata.sorted_tables
194

    
195

    
196
class Node(DBWorker):
197
    """Nodes store path organization and have multiple versions.
198
       Versions store object history and have multiple attributes.
199
       Attributes store metadata.
200
    """
201

    
202
    # TODO: Provide an interface for included and excluded clusters.
203

    
204
    def __init__(self, **params):
205
        DBWorker.__init__(self, **params)
206
        try:
207
            metadata = MetaData(self.engine)
208
            self.nodes = Table('nodes', metadata, autoload=True)
209
            self.policy = Table('policy', metadata, autoload=True)
210
            self.statistics = Table('statistics', metadata, autoload=True)
211
            self.versions = Table('versions', metadata, autoload=True)
212
            self.attributes = Table('attributes', metadata, autoload=True)
213
        except NoSuchTableError:
214
            tables = create_tables(self.engine)
215
            map(lambda t: self.__setattr__(t.name, t), tables)
216

    
217
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
218
                                           self.nodes.c.parent == ROOTNODE))
219
        wrapper = self.wrapper
220
        wrapper.execute()
221
        try:
222
            rp = self.conn.execute(s)
223
            r = rp.fetchone()
224
            rp.close()
225
            if not r:
226
                s = self.nodes.insert(
227
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
228
                self.conn.execute(s)
229
        finally:
230
            wrapper.commit()
231

    
232
    def node_create(self, parent, path):
233
        """Create a new node from the given properties.
234
           Return the node identifier of the new node.
235
        """
236
        #TODO catch IntegrityError?
237
        s = self.nodes.insert().values(parent=parent, path=path)
238
        r = self.conn.execute(s)
239
        inserted_primary_key = r.inserted_primary_key[0]
240
        r.close()
241
        return inserted_primary_key
242

    
243
    def node_lookup(self, path, for_update=False):
244
        """Lookup the current node of the given path.
245
           Return None if the path is not found.
246
        """
247

    
248
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
249
        s = select([self.nodes.c.node],
250
                   self.nodes.c.path.like(self.escape_like(path),
251
                                          escape=ESCAPE_CHAR),
252
                   for_update=for_update)
253
        r = self.conn.execute(s)
254
        row = r.fetchone()
255
        r.close()
256
        if row:
257
            return row[0]
258
        return None
259

    
260
    def node_lookup_bulk(self, paths):
261
        """Lookup the current nodes for the given paths.
262
           Return () if the path is not found.
263
        """
264

    
265
        if not paths:
266
            return ()
267
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
268
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
269
        r = self.conn.execute(s)
270
        rows = r.fetchall()
271
        r.close()
272
        return [row[0] for row in rows]
273

    
274
    def node_get_properties(self, node):
275
        """Return the node's (parent, path).
276
           Return None if the node is not found.
277
        """
278

    
279
        s = select([self.nodes.c.parent, self.nodes.c.path])
280
        s = s.where(self.nodes.c.node == node)
281
        r = self.conn.execute(s)
282
        l = r.fetchone()
283
        r.close()
284
        return l
285

    
286
    def node_get_versions(self, node, keys=(), propnames=_propnames):
287
        """Return the properties of all versions at node.
288
           If keys is empty, return all properties in the order
289
           (serial, node, hash, size, type, source, mtime, muser, uuid,
290
            checksum, cluster, available, map_check_timestamp).
291
        """
292

    
293
        s = select([self.versions.c.serial,
294
                    self.versions.c.node,
295
                    self.versions.c.hash,
296
                    self.versions.c.size,
297
                    self.versions.c.type,
298
                    self.versions.c.source,
299
                    self.versions.c.mtime,
300
                    self.versions.c.muser,
301
                    self.versions.c.uuid,
302
                    self.versions.c.checksum,
303
                    self.versions.c.cluster,
304
                    self.versions.c.available,
305
                    self.versions.c.map_check_timestamp],
306
                   self.versions.c.node == node)
307
        s = s.order_by(self.versions.c.serial)
308
        r = self.conn.execute(s)
309
        rows = r.fetchall()
310
        r.close()
311
        if not rows:
312
            return rows
313

    
314
        if not keys:
315
            return rows
316

    
317
        return [[p[propnames[k]] for k in keys if k in propnames] for
318
                p in rows]
319

    
320
    def node_count_children(self, node):
321
        """Return node's child count."""
322

    
323
        s = select([func.count(self.nodes.c.node)])
324
        s = s.where(and_(self.nodes.c.parent == node,
325
                         self.nodes.c.node != ROOTNODE))
326
        r = self.conn.execute(s)
327
        row = r.fetchone()
328
        r.close()
329
        return row[0]
330

    
331
    def node_purge_children(self, parent, before=inf, cluster=0,
332
                            update_statistics_ancestors_depth=None):
333
        """Delete all versions with the specified
334
           parent and cluster, and return
335
           the hashes, the total size and the serials of versions deleted.
336
           Clears out nodes with no remaining versions.
337
        """
338
        #update statistics
339
        c1 = select([self.nodes.c.node],
340
                    self.nodes.c.parent == parent)
341
        where_clause = and_(self.versions.c.node.in_(c1),
342
                            self.versions.c.cluster == cluster)
343
        if before != inf:
344
            where_clause = and_(where_clause,
345
                                self.versions.c.mtime <= before)
346
        s = select([func.count(self.versions.c.serial),
347
                    func.sum(self.versions.c.size)])
348
        s = s.where(where_clause)
349
        r = self.conn.execute(s)
350
        row = r.fetchone()
351
        r.close()
352
        if not row:
353
            return (), 0, ()
354
        nr, size = row[0], row[1] if row[1] else 0
355
        mtime = time()
356
        self.statistics_update(parent, -nr, -size, mtime, cluster)
357
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
358
                                         update_statistics_ancestors_depth)
359

    
360
        s = select([self.versions.c.hash, self.versions.c.serial])
361
        s = s.where(where_clause)
362
        r = self.conn.execute(s)
363
        hashes = []
364
        serials = []
365
        for row in r.fetchall():
366
            hashes += [row[0]]
367
            serials += [row[1]]
368
        r.close()
369

    
370
        #delete versions
371
        s = self.versions.delete().where(where_clause)
372
        r = self.conn.execute(s)
373
        r.close()
374

    
375
        #delete nodes
376
        s = select([self.nodes.c.node],
377
                   and_(self.nodes.c.parent == parent,
378
                        select([func.count(self.versions.c.serial)],
379
                               self.versions.c.node == self.nodes.c.node).
380
                        as_scalar() == 0))
381
        rp = self.conn.execute(s)
382
        nodes = [row[0] for row in rp.fetchall()]
383
        rp.close()
384
        if nodes:
385
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
386
            self.conn.execute(s).close()
387

    
388
        return hashes, size, serials
389

    
390
    def node_purge(self, node, before=inf, cluster=0,
391
                   update_statistics_ancestors_depth=None):
392
        """Delete all versions with the specified
393
           node and cluster, and return
394
           the hashes and size of versions deleted.
395
           Clears out the node if it has no remaining versions.
396
        """
397

    
398
        #update statistics
399
        s = select([func.count(self.versions.c.serial),
400
                    func.sum(self.versions.c.size)])
401
        where_clause = and_(self.versions.c.node == node,
402
                            self.versions.c.cluster == cluster)
403
        if before != inf:
404
            where_clause = and_(where_clause,
405
                                self.versions.c.mtime <= before)
406
        s = s.where(where_clause)
407
        r = self.conn.execute(s)
408
        row = r.fetchone()
409
        nr, size = row[0], row[1]
410
        r.close()
411
        if not nr:
412
            return (), 0, ()
413
        mtime = time()
414
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
415
                                         update_statistics_ancestors_depth)
416

    
417
        s = select([self.versions.c.hash, self.versions.c.serial])
418
        s = s.where(where_clause)
419
        r = self.conn.execute(s)
420
        hashes = []
421
        serials = []
422
        for row in r.fetchall():
423
            hashes += [row[0]]
424
            serials += [row[1]]
425
        r.close()
426

    
427
        #delete versions
428
        s = self.versions.delete().where(where_clause)
429
        r = self.conn.execute(s)
430
        r.close()
431

    
432
        #delete nodes
433
        s = select([self.nodes.c.node],
434
                   and_(self.nodes.c.node == node,
435
                        select([func.count(self.versions.c.serial)],
436
                               self.versions.c.node == self.nodes.c.node).
437
                        as_scalar() == 0))
438
        rp = self.conn.execute(s)
439
        nodes = [row[0] for row in rp.fetchall()]
440
        rp.close()
441
        if nodes:
442
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
443
            self.conn.execute(s).close()
444

    
445
        return hashes, size, serials
446

    
447
    def node_remove(self, node, update_statistics_ancestors_depth=None):
448
        """Remove the node specified.
449
           Return false if the node has children or is not found.
450
        """
451

    
452
        if self.node_count_children(node):
453
            return False
454

    
455
        mtime = time()
456
        s = select([func.count(self.versions.c.serial),
457
                    func.sum(self.versions.c.size),
458
                    self.versions.c.cluster])
459
        s = s.where(self.versions.c.node == node)
460
        s = s.group_by(self.versions.c.cluster)
461
        r = self.conn.execute(s)
462
        for population, size, cluster in r.fetchall():
463
            self.statistics_update_ancestors(
464
                node, -population, -size, mtime, cluster,
465
                update_statistics_ancestors_depth)
466
        r.close()
467

    
468
        s = self.nodes.delete().where(self.nodes.c.node == node)
469
        self.conn.execute(s).close()
470
        return True
471

    
472
    def node_accounts(self, accounts=()):
473
        s = select([self.nodes.c.path, self.nodes.c.node])
474
        s = s.where(and_(self.nodes.c.node != 0,
475
                         self.nodes.c.parent == 0))
476
        if accounts:
477
            s = s.where(self.nodes.c.path.in_(accounts))
478
        r = self.conn.execute(s)
479
        rows = r.fetchall()
480
        r.close()
481
        return rows
482

    
483
    def node_account_quotas(self):
484
        s = select([self.nodes.c.path, self.policy.c.value])
485
        s = s.where(and_(self.nodes.c.node != 0,
486
                         self.nodes.c.parent == 0))
487
        s = s.where(self.nodes.c.node == self.policy.c.node)
488
        s = s.where(self.policy.c.key == 'quota')
489
        r = self.conn.execute(s)
490
        rows = r.fetchall()
491
        r.close()
492
        return dict(rows)
493

    
494
    def node_account_usage(self, account=None, cluster=0):
495
        """Return usage for a specific account.
496

497
        Keyword arguments:
498
        account -- (default None: list usage for all the accounts)
499
        cluster -- list current, history or deleted usage (default 0: normal)
500
        """
501

    
502
        n1 = self.nodes.alias('n1')
503
        n2 = self.nodes.alias('n2')
504
        n3 = self.nodes.alias('n3')
505

    
506
        s = select([n3.c.path, func.sum(self.versions.c.size)])
507
        s = s.where(n1.c.node == self.versions.c.node)
508
        s = s.where(self.versions.c.cluster == cluster)
509
        s = s.where(n1.c.parent == n2.c.node)
510
        s = s.where(n2.c.parent == n3.c.node)
511
        s = s.where(n3.c.parent == 0)
512
        s = s.where(n3.c.node != 0)
513
        if account:
514
            s = s.where(n3.c.path == account)
515
        s = s.group_by(n3.c.path)
516
        r = self.conn.execute(s)
517
        usage = r.fetchall()
518
        r.close()
519
        return dict(usage)
520

    
521
    def policy_get(self, node):
522
        s = select([self.policy.c.key, self.policy.c.value],
523
                   self.policy.c.node == node)
524
        r = self.conn.execute(s)
525
        d = dict(r.fetchall())
526
        r.close()
527
        return d
528

    
529
    def policy_set(self, node, policy):
530
        #insert or replace
531
        for k, v in policy.iteritems():
532
            s = self.policy.update().where(and_(self.policy.c.node == node,
533
                                                self.policy.c.key == k))
534
            s = s.values(value=v)
535
            rp = self.conn.execute(s)
536
            rp.close()
537
            if rp.rowcount == 0:
538
                s = self.policy.insert()
539
                values = {'node': node, 'key': k, 'value': v}
540
                r = self.conn.execute(s, values)
541
                r.close()
542

    
543
    def statistics_get(self, node, cluster=0):
544
        """Return population, total size and last mtime
545
           for all versions under node that belong to the cluster.
546
        """
547

    
548
        s = select([self.statistics.c.population,
549
                    self.statistics.c.size,
550
                    self.statistics.c.mtime])
551
        s = s.where(and_(self.statistics.c.node == node,
552
                         self.statistics.c.cluster == cluster))
553
        r = self.conn.execute(s)
554
        row = r.fetchone()
555
        r.close()
556
        return row
557

    
558
    def statistics_update(self, node, population, size, mtime, cluster=0):
559
        """Update the statistics of the given node.
560
           Statistics keep track the population, total
561
           size of objects and mtime in the node's namespace.
562
           May be zero or positive or negative numbers.
563
        """
564
        s = select([self.statistics.c.population, self.statistics.c.size],
565
                   and_(self.statistics.c.node == node,
566
                        self.statistics.c.cluster == cluster))
567
        rp = self.conn.execute(s)
568
        r = rp.fetchone()
569
        rp.close()
570
        if not r:
571
            prepopulation, presize = (0, 0)
572
        else:
573
            prepopulation, presize = r
574
        population += prepopulation
575
        population = max(population, 0)
576
        size += presize
577

    
578
        #insert or replace
579
        #TODO better upsert
580
        u = self.statistics.update().where(and_(
581
            self.statistics.c.node == node,
582
            self.statistics.c.cluster == cluster))
583
        u = u.values(population=population, size=size, mtime=mtime)
584
        rp = self.conn.execute(u)
585
        rp.close()
586
        if rp.rowcount == 0:
587
            ins = self.statistics.insert()
588
            ins = ins.values(node=node, population=population, size=size,
589
                             mtime=mtime, cluster=cluster)
590
            self.conn.execute(ins).close()
591

    
592
    def statistics_update_ancestors(self, node, population, size, mtime,
593
                                    cluster=0, recursion_depth=None):
594
        """Update the statistics of the given node's parent.
595
           Then recursively update all parents up to the root
596
           or up to the ``recursion_depth`` (if not None).
597
           Population is not recursive.
598
        """
599

    
600
        i = 0
601
        while True:
602
            if node == ROOTNODE:
603
                break
604
            if recursion_depth and recursion_depth <= i:
605
                break
606
            props = self.node_get_properties(node)
607
            if props is None:
608
                break
609
            parent, path = props
610
            self.statistics_update(parent, population, size, mtime, cluster)
611
            node = parent
612
            population = 0  # Population isn't recursive
613
            i += 1
614

    
615
    def statistics_latest(self, node, before=inf, except_cluster=0):
616
        """Return population, total size and last mtime
617
           for all latest versions under node that
618
           do not belong to the cluster.
619
        """
620

    
621
        # The node.
622
        props = self.node_get_properties(node)
623
        if props is None:
624
            return None
625
        parent, path = props
626

    
627
        # The latest version.
628
        s = select([self.versions.c.serial,
629
                    self.versions.c.node,
630
                    self.versions.c.hash,
631
                    self.versions.c.size,
632
                    self.versions.c.type,
633
                    self.versions.c.source,
634
                    self.versions.c.mtime,
635
                    self.versions.c.muser,
636
                    self.versions.c.uuid,
637
                    self.versions.c.checksum,
638
                    self.versions.c.cluster])
639
        if before != inf:
640
            filtered = select([func.max(self.versions.c.serial)],
641
                              self.versions.c.node == node)
642
            filtered = filtered.where(self.versions.c.mtime < before)
643
        else:
644
            filtered = select([self.nodes.c.latest_version],
645
                              self.nodes.c.node == node)
646
        s = s.where(and_(self.versions.c.cluster != except_cluster,
647
                         self.versions.c.serial == filtered))
648
        r = self.conn.execute(s)
649
        props = r.fetchone()
650
        r.close()
651
        if not props:
652
            return None
653
        mtime = props[MTIME]
654

    
655
        # First level, just under node (get population).
656
        v = self.versions.alias('v')
657
        s = select([func.count(v.c.serial),
658
                    func.sum(v.c.size),
659
                    func.max(v.c.mtime)])
660
        if before != inf:
661
            c1 = select([func.max(self.versions.c.serial)],
662
                        and_(self.versions.c.mtime < before,
663
                             self.versions.c.node == v.c.node))
664
        else:
665
            c1 = select([self.nodes.c.latest_version])
666
            c1 = c1.where(self.nodes.c.node == v.c.node)
667
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
668
        s = s.where(and_(v.c.serial == c1,
669
                         v.c.cluster != except_cluster,
670
                         v.c.node.in_(c2)))
671
        rp = self.conn.execute(s)
672
        r = rp.fetchone()
673
        rp.close()
674
        if not r:
675
            return None
676
        count = r[0]
677
        mtime = max(mtime, r[2])
678
        if count == 0:
679
            return (0, 0, mtime)
680

    
681
        # All children (get size and mtime).
682
        # This is why the full path is stored.
683
        if before != inf:
684
            s = select([func.count(v.c.serial),
685
                       func.sum(v.c.size),
686
                       func.max(v.c.mtime)])
687
            c1 = select([func.max(self.versions.c.serial)],
688
                        and_(self.versions.c.mtime < before,
689
                             self.versions.c.node == v.c.node))
690
        else:
691
            inner_join = \
692
                self.versions.join(self.nodes, onclause=
693
                                   self.versions.c.serial ==
694
                                   self.nodes.c.latest_version)
695
            s = select([func.count(self.versions.c.serial),
696
                       func.sum(self.versions.c.size),
697
                       func.max(self.versions.c.mtime)],
698
                       from_obj=[inner_join])
699

    
700
        c2 = select([self.nodes.c.node],
701
                    self.nodes.c.path.like(self.escape_like(path) + '%',
702
                                           escape=ESCAPE_CHAR))
703
        if before != inf:
704
            s = s.where(and_(v.c.serial == c1,
705
                        v.c.cluster != except_cluster,
706
                        v.c.node.in_(c2)))
707
        else:
708
            s = s.where(and_(self.versions.c.cluster != except_cluster,
709
                        self.versions.c.node.in_(c2)))
710

    
711
        rp = self.conn.execute(s)
712
        r = rp.fetchone()
713
        rp.close()
714
        if not r:
715
            return None
716
        size = long(r[1] - props[SIZE])
717
        mtime = max(mtime, r[2])
718
        return (count, size, mtime)
719

    
720
    def nodes_set_latest_version(self, node, serial):
721
        s = self.nodes.update().where(self.nodes.c.node == node)
722
        s = s.values(latest_version=serial)
723
        self.conn.execute(s).close()
724

    
725
    def version_create(self, node, hash, size, type, source, muser, uuid,
726
                       checksum, cluster=0,
727
                       update_statistics_ancestors_depth=None,
728
                       available=True, map_check_timestamp=None):
729
        """Create a new version from the given properties.
730
           Return the (serial, mtime) of the new version.
731
        """
732

    
733
        mtime = time()
734
        s = self.versions.insert().values(
735
            node=node, hash=hash, size=size, type=type, source=source,
736
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
737
            cluster=cluster, available=available,
738
            map_check_timestamp=map_check_timestamp)
739
        serial = self.conn.execute(s).inserted_primary_key[0]
740
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
741
                                         update_statistics_ancestors_depth)
742

    
743
        self.nodes_set_latest_version(node, serial)
744

    
745
        return serial, mtime
746

    
747
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
748
        """Lookup the current version of the given node.
749
           Return a list with its properties:
750
           (serial, node, hash, size, type, source, mtime,
751
            muser, uuid, checksum, cluster, available, map_check_timestamp)
752
           or None if the current version is not found in the given cluster.
753
        """
754

    
755
        v = self.versions.alias('v')
756
        if not all_props:
757
            s = select([v.c.serial])
758
        else:
759
            s = select([v.c.serial, v.c.node, v.c.hash,
760
                        v.c.size, v.c.type, v.c.source,
761
                        v.c.mtime, v.c.muser, v.c.uuid,
762
                        v.c.checksum, v.c.cluster,
763
                        v.c.available, v.c.map_check_timestamp])
764
        if before != inf:
765
            c = select([func.max(self.versions.c.serial)],
766
                       self.versions.c.node == node)
767
            c = c.where(self.versions.c.mtime < before)
768
        else:
769
            c = select([self.nodes.c.latest_version],
770
                       self.nodes.c.node == node)
771
        s = s.where(and_(v.c.serial == c,
772
                         v.c.cluster == cluster))
773
        r = self.conn.execute(s)
774
        props = r.fetchone()
775
        r.close()
776
        if props:
777
            return props
778
        return None
779

    
780
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
781
                            all_props=True, order_by_path=False):
782
        """Lookup the current versions of the given nodes.
783
           Return a list with their properties:
784
           (serial, node, hash, size, type, source, mtime, muser, uuid,
785
            checksum, cluster, available, map_check_timestamp).
786
        """
787
        if not nodes:
788
            return ()
789
        v = self.versions.alias('v')
790
        n = self.nodes.alias('n')
791
        if not all_props:
792
            s = select([v.c.serial])
793
        else:
794
            s = select([v.c.serial, v.c.node, v.c.hash,
795
                        v.c.size, v.c.type, v.c.source,
796
                        v.c.mtime, v.c.muser, v.c.uuid,
797
                        v.c.checksum, v.c.cluster,
798
                        v.c.available, v.c.map_check_timestamp])
799
        if before != inf:
800
            c = select([func.max(self.versions.c.serial)],
801
                       self.versions.c.node.in_(nodes))
802
            c = c.where(self.versions.c.mtime < before)
803
            c = c.group_by(self.versions.c.node)
804
        else:
805
            c = select([self.nodes.c.latest_version],
806
                       self.nodes.c.node.in_(nodes))
807
        s = s.where(and_(v.c.serial.in_(c),
808
                         v.c.cluster == cluster))
809
        if order_by_path:
810
            s = s.where(v.c.node == n.c.node)
811
            s = s.order_by(n.c.path)
812
        else:
813
            s = s.order_by(v.c.node)
814
        r = self.conn.execute(s)
815
        rproxy = r.fetchall()
816
        r.close()
817
        return (tuple(row.values()) for row in rproxy)
818

    
819
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
820
                               node=None):
821
        """Return a sequence of values for the properties of
822
           the version specified by serial and the keys, in the order given.
823
           If keys is empty, return all properties in the order
824
           (serial, node, hash, size, type, source, mtime, muser, uuid,
825
            checksum, cluster, available, map_check_timestamp).
826
        """
827

    
828
        v = self.versions.alias()
829
        s = select([v.c.serial, v.c.node, v.c.hash,
830
                    v.c.size, v.c.type, v.c.source,
831
                    v.c.mtime, v.c.muser, v.c.uuid,
832
                    v.c.checksum, v.c.cluster,
833
                    v.c.available, v.c.map_check_timestamp],
834
                   v.c.serial == serial)
835
        if node is not None:
836
            s = s.where(v.c.node == node)
837
        rp = self.conn.execute(s)
838
        r = rp.fetchone()
839
        rp.close()
840
        if r is None:
841
            return r
842

    
843
        if not keys:
844
            return r
845
        return [r[propnames[k]] for k in keys if k in propnames]
846

    
847
    def version_put_property(self, serial, key, value):
848
        """Set value for the property of version specified by key."""
849

    
850
        if key not in _propnames:
851
            return
852
        s = self.versions.update()
853
        s = s.where(self.versions.c.serial == serial)
854
        s = s.values(**{key: value})
855
        self.conn.execute(s).close()
856

    
857
    def version_recluster(self, serial, cluster,
858
                          update_statistics_ancestors_depth=None):
859
        """Move the version into another cluster."""
860

    
861
        props = self.version_get_properties(serial)
862
        if not props:
863
            return
864
        node = props[NODE]
865
        size = props[SIZE]
866
        oldcluster = props[CLUSTER]
867
        if cluster == oldcluster:
868
            return
869

    
870
        mtime = time()
871
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
872
                                         update_statistics_ancestors_depth)
873
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
874
                                         update_statistics_ancestors_depth)
875

    
876
        s = self.versions.update()
877
        s = s.where(self.versions.c.serial == serial)
878
        s = s.values(cluster=cluster)
879
        self.conn.execute(s).close()
880

    
881
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
882
        """Remove the serial specified."""
883

    
884
        props = self.version_get_properties(serial)
885
        if not props:
886
            return
887
        node = props[NODE]
888
        hash = props[HASH]
889
        size = props[SIZE]
890
        cluster = props[CLUSTER]
891

    
892
        mtime = time()
893
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
894
                                         update_statistics_ancestors_depth)
895

    
896
        s = self.versions.delete().where(self.versions.c.serial == serial)
897
        self.conn.execute(s).close()
898

    
899
        props = self.version_lookup(node, cluster=cluster, all_props=False)
900
        if props:
901
            self.nodes_set_latest_version(node, serial)
902

    
903
        return hash, size
904

    
905
    def attribute_get(self, serial, domain, keys=()):
906
        """Return a list of (key, value) pairs of the specific version.
907

908
        If keys is empty, return all attributes.
909
        Othwerise, return only those specified.
910
        """
911

    
912
        if keys:
913
            attrs = self.attributes.alias()
914
            s = select([attrs.c.key, attrs.c.value])
915
            s = s.where(and_(attrs.c.key.in_(keys),
916
                             attrs.c.serial == serial,
917
                             attrs.c.domain == domain))
918
        else:
919
            attrs = self.attributes.alias()
920
            s = select([attrs.c.key, attrs.c.value])
921
            s = s.where(and_(attrs.c.serial == serial,
922
                             attrs.c.domain == domain))
923
        r = self.conn.execute(s)
924
        l = r.fetchall()
925
        r.close()
926
        return l
927

    
928
    def attribute_set(self, serial, domain, node, items, is_latest=True):
929
        """Set the attributes of the version specified by serial.
930
           Receive attributes as an iterable of (key, value) pairs.
931
        """
932
        #insert or replace
933
        #TODO better upsert
934
        for k, v in items:
935
            s = self.attributes.update()
936
            s = s.where(and_(self.attributes.c.serial == serial,
937
                             self.attributes.c.domain == domain,
938
                             self.attributes.c.key == k))
939
            s = s.values(value=v)
940
            rp = self.conn.execute(s)
941
            rp.close()
942
            if rp.rowcount == 0:
943
                s = self.attributes.insert()
944
                s = s.values(serial=serial, domain=domain, node=node,
945
                             is_latest=is_latest, key=k, value=v)
946
                self.conn.execute(s).close()
947

    
948
    def attribute_del(self, serial, domain, keys=()):
949
        """Delete attributes of the version specified by serial.
950
           If keys is empty, delete all attributes.
951
           Otherwise delete those specified.
952
        """
953

    
954
        if keys:
955
            #TODO more efficient way to do this?
956
            for key in keys:
957
                s = self.attributes.delete()
958
                s = s.where(and_(self.attributes.c.serial == serial,
959
                                 self.attributes.c.domain == domain,
960
                                 self.attributes.c.key == key))
961
                self.conn.execute(s).close()
962
        else:
963
            s = self.attributes.delete()
964
            s = s.where(and_(self.attributes.c.serial == serial,
965
                             self.attributes.c.domain == domain))
966
            self.conn.execute(s).close()
967

    
968
    def attribute_copy(self, source, dest):
969
        s = select(
970
            [dest, self.attributes.c.domain, self.attributes.c.node,
971
             self.attributes.c.key, self.attributes.c.value],
972
            self.attributes.c.serial == source)
973
        rp = self.conn.execute(s)
974
        attributes = rp.fetchall()
975
        rp.close()
976
        for dest, domain, node, k, v in attributes:
977
            select_src_node = select(
978
                [self.versions.c.node],
979
                self.versions.c.serial == dest)
980
            # insert or replace
981
            s = self.attributes.update().where(and_(
982
                self.attributes.c.serial == dest,
983
                self.attributes.c.domain == domain,
984
                self.attributes.c.key == k))
985
            s = s.values(node=select_src_node, value=v)
986
            rp = self.conn.execute(s)
987
            rp.close()
988
            if rp.rowcount == 0:
989
                s = self.attributes.insert()
990
                s = s.values(serial=dest, domain=domain, node=select_src_node,
991
                             is_latest=True, key=k, value=v)
992
            self.conn.execute(s).close()
993

    
994
    def attribute_unset_is_latest(self, node, exclude):
995
        u = self.attributes.update().where(and_(
996
            self.attributes.c.node == node,
997
            self.attributes.c.serial != exclude)).values({'is_latest': False})
998
        self.conn.execute(u)
999

    
1000
    def latest_attribute_keys(self, parent, domain, before=inf,
1001
                              except_cluster=0, pathq=None):
1002
        """Return a list with all keys pairs defined
1003
           for all latest versions under parent that
1004
           do not belong to the cluster.
1005
        """
1006

    
1007
        pathq = pathq or []
1008

    
1009
        # TODO: Use another table to store before=inf results.
1010
        v = self.versions.alias('v')
1011
        s = select([self.attributes.c.key]).distinct()
1012
        if before != inf:
1013
            filtered = select([func.max(v.c.serial)],
1014
                              and_(v.c.mtime < before,
1015
                                   v.c.node == self.versions.c.node))
1016
        else:
1017
            filtered = select([self.nodes.c.latest_version])
1018
            filtered = filtered.where(self.nodes.c.node ==
1019
                                      self.versions.c.node
1020
                                      ).correlate(self.versions)
1021
        s = s.where(self.versions.c.serial == filtered)
1022
        s = s.where(self.versions.c.cluster != except_cluster)
1023
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1024
                                             self.nodes.c.parent == parent)))
1025
        s = s.where(self.attributes.c.serial == self.versions.c.serial)
1026
        s = s.where(self.attributes.c.domain == domain)
1027
        s = s.where(self.nodes.c.node == self.versions.c.node)
1028
        s = s.order_by(self.attributes.c.key)
1029
        conja = []
1030
        conjb = []
1031
        for path, match in pathq:
1032
            if match == MATCH_PREFIX:
1033
                conja.append(self.nodes.c.path.like(self.escape_like(path) +
1034
                                                    '%', escape=ESCAPE_CHAR))
1035
            elif match == MATCH_EXACT:
1036
                conjb.append(path)
1037
        if conja or conjb:
1038
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1039
        rp = self.conn.execute(s)
1040
        rows = rp.fetchall()
1041
        rp.close()
1042
        return [r[0] for r in rows]
1043

    
1044
    def latest_version_list(self, parent, prefix='', delimiter=None,
1045
                            start='', limit=10000, before=inf,
1046
                            except_cluster=0, pathq=[], domain=None,
1047
                            filterq=[], sizeq=None, all_props=False):
1048
        """Return a (list of (path, serial) tuples, list of common prefixes)
1049
           for the current versions of the paths with the given parent,
1050
           matching the following criteria.
1051

1052
           The property tuple for a version is returned if all
1053
           of these conditions are true:
1054

1055
                a. parent matches
1056

1057
                b. path > start
1058

1059
                c. path starts with prefix (and paths in pathq)
1060

1061
                d. version is the max up to before
1062

1063
                e. version is not in cluster
1064

1065
                f. the path does not have the delimiter occuring
1066
                   after the prefix, or ends with the delimiter
1067

1068
                g. serial matches the attribute filter query.
1069

1070
                   A filter query is a comma-separated list of
1071
                   terms in one of these three forms:
1072

1073
                   key
1074
                       an attribute with this key must exist
1075

1076
                   !key
1077
                       an attribute with this key must not exist
1078

1079
                   key ?op value
1080
                       the attribute with this key satisfies the value
1081
                       where ?op is one of ==, != <=, >=, <, >.
1082

1083
                h. the size is in the range set by sizeq
1084

1085
           The list of common prefixes includes the prefixes
1086
           matching up to the first delimiter after prefix,
1087
           and are reported only once, as "virtual directories".
1088
           The delimiter is included in the prefixes.
1089

1090
           If arguments are None, then the corresponding matching rule
1091
           will always match.
1092

1093
           Limit applies to the first list of tuples returned.
1094

1095
           If all_props is True, return all properties after path,
1096
           not just serial.
1097
        """
1098

    
1099
        if not start or start < prefix:
1100
            start = strprevling(prefix)
1101
        nextling = strnextling(prefix)
1102

    
1103
        v = self.versions.alias('v')
1104
        n = self.nodes.alias('n')
1105
        if before != inf:
1106
            filtered = select([func.max(v.c.serial)],
1107
                              and_(v.c.mtime < before,
1108
                                   v.c.node == self.versions.c.node))
1109
            inner_join = \
1110
                self.nodes.join(self.versions,
1111
                                onclause=self.versions.c.serial == filtered)
1112
        else:
1113
            filtered = select([self.nodes.c.latest_version])
1114
            filtered = filtered.where(self.nodes.c.node ==
1115
                                      self.versions.c.node
1116
                                      ).correlate(self.versions)
1117
            inner_join = \
1118
                self.nodes.join(self.versions,
1119
                                onclause=
1120
                                self.versions.c.serial == filtered)
1121
        if not all_props:
1122
            s = select([self.nodes.c.path,
1123
                       self.versions.c.serial],
1124
                       from_obj=[inner_join]).distinct()
1125
        else:
1126
            s = select([self.nodes.c.path,
1127
                       self.versions.c.serial, self.versions.c.node,
1128
                       self.versions.c.hash,
1129
                       self.versions.c.size, self.versions.c.type,
1130
                       self.versions.c.source,
1131
                       self.versions.c.mtime, self.versions.c.muser,
1132
                       self.versions.c.uuid,
1133
                       self.versions.c.checksum,
1134
                       self.versions.c.cluster],
1135
                       from_obj=[inner_join]).distinct()
1136

    
1137
        s = s.where(self.versions.c.cluster != except_cluster)
1138
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1139
                                             self.nodes.c.parent == parent)))
1140

    
1141
        s = s.where(self.versions.c.node == self.nodes.c.node)
1142
        s = s.where(and_(self.nodes.c.path > bindparam('start'),
1143
                    self.nodes.c.path < nextling))
1144
        conja = []
1145
        conjb = []
1146
        for path, match in pathq:
1147
            if match == MATCH_PREFIX:
1148
                conja.append(self.nodes.c.path.like(self.escape_like(path) +
1149
                             '%', escape=ESCAPE_CHAR))
1150
            elif match == MATCH_EXACT:
1151
                conjb.append(path)
1152
        if conja or conjb:
1153
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1154

    
1155
        if sizeq and len(sizeq) == 2:
1156
            if sizeq[0]:
1157
                s = s.where(self.versions.c.size >= sizeq[0])
1158
            if sizeq[1]:
1159
                s = s.where(self.versions.c.size < sizeq[1])
1160

    
1161
        if domain and filterq:
1162
            a = self.attributes.alias('a')
1163
            included, excluded, opers = parse_filters(filterq)
1164
            if included:
1165
                subs = select([1])
1166
                subs = subs.where(self.attributes.c.serial ==
1167
                                  self.versions.c.serial
1168
                                  ).correlate(self.versions)
1169
                subs = subs.where(self.attributes.c.domain == domain)
1170
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x)
1171
                                  for x in included]))
1172
                s = s.where(exists(subs))
1173
            if excluded:
1174
                subs = select([1])
1175
                subs = subs.where(self.attributes.c.serial ==
1176
                                  self.versions.c.serial
1177
                                  ).correlate(self.versions)
1178
                subs = subs.where(self.attributes.c.domain == domain)
1179
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x)
1180
                                  for x in excluded]))
1181
                s = s.where(not_(exists(subs)))
1182
            if opers:
1183
                for k, o, val in opers:
1184
                    subs = select([1])
1185
                    subs = subs.where(self.attributes.c.serial ==
1186
                                      self.versions.c.serial
1187
                                      ).correlate(self.versions)
1188
                    subs = subs.where(self.attributes.c.domain == domain)
1189
                    subs = subs.where(
1190
                        and_(self.attributes.c.key.op('=')(k),
1191
                             self.attributes.c.value.op(o)(val)))
1192
                    s = s.where(exists(subs))
1193

    
1194
        s = s.order_by(self.nodes.c.path)
1195

    
1196
        if not delimiter:
1197
            s = s.limit(limit)
1198
            rp = self.conn.execute(s, start=start)
1199
            r = rp.fetchall()
1200
            rp.close()
1201
            return r, ()
1202

    
1203
        pfz = len(prefix)
1204
        dz = len(delimiter)
1205
        count = 0
1206
        prefixes = []
1207
        pappend = prefixes.append
1208
        matches = []
1209
        mappend = matches.append
1210

    
1211
        rp = self.conn.execute(s, start=start)
1212
        while True:
1213
            props = rp.fetchone()
1214
            if props is None:
1215
                break
1216
            path = props[0]
1217
            idx = path.find(delimiter, pfz)
1218

    
1219
            if idx < 0:
1220
                mappend(props)
1221
                count += 1
1222
                if count >= limit:
1223
                    break
1224
                continue
1225

    
1226
            if idx + dz == len(path):
1227
                mappend(props)
1228
                count += 1
1229
                continue  # Get one more, in case there is a path.
1230
            pf = path[:idx + dz]
1231
            pappend(pf)
1232
            if count >= limit:
1233
                break
1234

    
1235
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1236
        rp.close()
1237

    
1238
        return matches, prefixes
1239

    
1240
    def latest_uuid(self, uuid, cluster):
1241
        """Return the latest version of the given uuid and cluster.
1242

1243
        Return a (path, serial) tuple.
1244
        If cluster is None, all clusters are considered.
1245

1246
        """
1247

    
1248
        v = self.versions.alias('v')
1249
        n = self.nodes.alias('n')
1250
        s = select([n.c.path, v.c.serial])
1251
        filtered = select([func.max(self.versions.c.serial)])
1252
        filtered = filtered.where(self.versions.c.uuid == uuid)
1253
        if cluster is not None:
1254
            filtered = filtered.where(self.versions.c.cluster == cluster)
1255
        s = s.where(v.c.serial == filtered)
1256
        s = s.where(n.c.node == v.c.node)
1257

    
1258
        r = self.conn.execute(s)
1259
        l = r.fetchone()
1260
        r.close()
1261
        return l
1262

    
1263
    def domain_object_list(self, domain, paths, cluster=None):
1264
        """Return a list of (path, property list, attribute dictionary)
1265
           for the objects in the specific domain and cluster.
1266
        """
1267

    
1268
        v = self.versions.alias('v')
1269
        n = self.nodes.alias('n')
1270
        a = self.attributes.alias('a')
1271

    
1272
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1273
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1274
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1275
        if cluster:
1276
            s = s.where(v.c.cluster == cluster)
1277
        s = s.where(v.c.serial == a.c.serial)
1278
        s = s.where(a.c.domain == domain)
1279
        s = s.where(a.c.node == n.c.node)
1280
        s = s.where(a.c.is_latest == True)
1281
        if paths:
1282
            s = s.where(n.c.path.in_(paths))
1283

    
1284
        r = self.conn.execute(s)
1285
        rows = r.fetchall()
1286
        r.close()
1287

    
1288
        group_by = itemgetter(slice(12))
1289
        rows.sort(key=group_by)
1290
        groups = groupby(rows, group_by)
1291
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1292
                (k, data) in groups]
1293

    
1294
    def get_props(self, paths):
1295
        inner_join = \
1296
            self.nodes.join(self.versions,
1297
                            onclause=self.versions.c.serial ==
1298
                            self.nodes.c.latest_version)
1299
        cc = self.nodes.c.path.in_(paths)
1300
        s = select([self.nodes.c.path, self.versions.c.type],
1301
                   from_obj=[inner_join]).where(cc).distinct()
1302
        r = self.conn.execute(s)
1303
        rows = r.fetchall()
1304
        r.close()
1305
        if rows:
1306
            return rows
1307
        return None