Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 6e3e5c84

History | View | Annotate | Download (49.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.exc import NoSuchTableError
43

    
44
from dbworker import DBWorker, ESCAPE_CHAR
45

    
46
from pithos.backends.filter import parse_filters
47

    
48

    
49
ROOTNODE = 0
50

    
51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER, AVAILABLE, MAP_CHECK_TIMESTAMP) = range(13)
53

    
54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55

    
56
inf = float('inf')
57

    
58

    
59
def strnextling(prefix):
60
    """Return the first unicode string
61
       greater than but not starting with given prefix.
62
       strnextling('hello') -> 'hellp'
63
    """
64
    if not prefix:
65
        ## all strings start with the null string,
66
        ## therefore we have to approximate strnextling('')
67
        ## with the last unicode character supported by python
68
        ## 0x10ffff for wide (32-bit unicode) python builds
69
        ## 0x00ffff for narrow (16-bit unicode) python builds
70
        ## We will not autodetect. 0xffff is safe enough.
71
        return unichr(0xffff)
72
    s = prefix[:-1]
73
    c = ord(prefix[-1])
74
    if c >= 0xffff:
75
        raise RuntimeError
76
    s += unichr(c + 1)
77
    return s
78

    
79

    
80
def strprevling(prefix):
81
    """Return an approximation of the last unicode string
82
       less than but not starting with given prefix.
83
       strprevling(u'hello') -> u'helln\\xffff'
84
    """
85
    if not prefix:
86
        ## There is no prevling for the null string
87
        return prefix
88
    s = prefix[:-1]
89
    c = ord(prefix[-1])
90
    if c > 0:
91
        s += unichr(c - 1) + unichr(0xffff)
92
    return s
93

    
94
_propnames = {
95
    'serial': 0,
96
    'node': 1,
97
    'hash': 2,
98
    'size': 3,
99
    'type': 4,
100
    'source': 5,
101
    'mtime': 6,
102
    'muser': 7,
103
    'uuid': 8,
104
    'checksum': 9,
105
    'cluster': 10,
106
    'available':11,
107
    'map_check_timestamp':12
108
}
109

    
110

    
111
def create_tables(engine):
112
    metadata = MetaData()
113

    
114
    #create nodes table
115
    columns = []
116
    columns.append(Column('node', Integer, primary_key=True))
117
    columns.append(Column('parent', Integer,
118
                          ForeignKey('nodes.node',
119
                                     ondelete='CASCADE',
120
                                     onupdate='CASCADE'),
121
                          autoincrement=False))
122
    columns.append(Column('latest_version', Integer))
123
    columns.append(Column('path', String(2048), default='', nullable=False))
124
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
125
    Index('idx_nodes_path', nodes.c.path, unique=True)
126
    Index('idx_nodes_parent', nodes.c.parent)
127

    
128
    #create policy table
129
    columns = []
130
    columns.append(Column('node', Integer,
131
                          ForeignKey('nodes.node',
132
                                     ondelete='CASCADE',
133
                                     onupdate='CASCADE'),
134
                          primary_key=True))
135
    columns.append(Column('key', String(128), primary_key=True))
136
    columns.append(Column('value', String(256)))
137
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
138

    
139
    #create statistics table
140
    columns = []
141
    columns.append(Column('node', Integer,
142
                          ForeignKey('nodes.node',
143
                                     ondelete='CASCADE',
144
                                     onupdate='CASCADE'),
145
                          primary_key=True))
146
    columns.append(Column('population', Integer, nullable=False, default=0))
147
    columns.append(Column('size', BigInteger, nullable=False, default=0))
148
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
149
    columns.append(Column('cluster', Integer, nullable=False, default=0,
150
                          primary_key=True, autoincrement=False))
151
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
152

    
153
    #create versions table
154
    columns = []
155
    columns.append(Column('serial', Integer, primary_key=True))
156
    columns.append(Column('node', Integer,
157
                          ForeignKey('nodes.node',
158
                                     ondelete='CASCADE',
159
                                     onupdate='CASCADE')))
160
    columns.append(Column('hash', String(256)))
161
    columns.append(Column('size', BigInteger, nullable=False, default=0))
162
    columns.append(Column('type', String(256), nullable=False, default=''))
163
    columns.append(Column('source', Integer))
164
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
165
    columns.append(Column('muser', String(256), nullable=False, default=''))
166
    columns.append(Column('uuid', String(64), nullable=False, default=''))
167
    columns.append(Column('checksum', String(256), nullable=False, default=''))
168
    columns.append(Column('cluster', Integer, nullable=False, default=0))
169
    columns.append(Column('available', Boolean, nullable=False, default=True))
170
    columns.append(Column('map_check_timestamp', DECIMAL(precision=16,
171
                                                         scale=6)))
172
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
173
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
174
    Index('idx_versions_node_uuid', versions.c.uuid)
175

    
176
    #create attributes table
177
    columns = []
178
    columns.append(Column('serial', Integer,
179
                          ForeignKey('versions.serial',
180
                                     ondelete='CASCADE',
181
                                     onupdate='CASCADE'),
182
                          primary_key=True))
183
    columns.append(Column('domain', String(256), primary_key=True))
184
    columns.append(Column('key', String(128), primary_key=True))
185
    columns.append(Column('value', String(256)))
186
    columns.append(Column('node', Integer, nullable=False, default=0))
187
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
188
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
189
    Index('idx_attributes_domain', attributes.c.domain)
190
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
191

    
192
    metadata.create_all(engine)
193
    return metadata.sorted_tables
194

    
195

    
196
class Node(DBWorker):
197
    """Nodes store path organization and have multiple versions.
198
       Versions store object history and have multiple attributes.
199
       Attributes store metadata.
200
    """
201

    
202
    # TODO: Provide an interface for included and excluded clusters.
203

    
204
    def __init__(self, **params):
205
        DBWorker.__init__(self, **params)
206
        try:
207
            metadata = MetaData(self.engine)
208
            self.nodes = Table('nodes', metadata, autoload=True)
209
            self.policy = Table('policy', metadata, autoload=True)
210
            self.statistics = Table('statistics', metadata, autoload=True)
211
            self.versions = Table('versions', metadata, autoload=True)
212
            self.attributes = Table('attributes', metadata, autoload=True)
213
        except NoSuchTableError:
214
            tables = create_tables(self.engine)
215
            map(lambda t: self.__setattr__(t.name, t), tables)
216

    
217
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
218
                                           self.nodes.c.parent == ROOTNODE))
219
        wrapper = self.wrapper
220
        wrapper.execute()
221
        try:
222
            rp = self.conn.execute(s)
223
            r = rp.fetchone()
224
            rp.close()
225
            if not r:
226
                s = self.nodes.insert(
227
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
228
                self.conn.execute(s)
229
        finally:
230
            wrapper.commit()
231

    
232
    def node_create(self, parent, path):
233
        """Create a new node from the given properties.
234
           Return the node identifier of the new node.
235
        """
236
        #TODO catch IntegrityError?
237
        s = self.nodes.insert().values(parent=parent, path=path)
238
        r = self.conn.execute(s)
239
        inserted_primary_key = r.inserted_primary_key[0]
240
        r.close()
241
        return inserted_primary_key
242

    
243
    def node_lookup(self, path, for_update=False):
244
        """Lookup the current node of the given path.
245
           Return None if the path is not found.
246
        """
247

    
248
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
249
        s = select([self.nodes.c.node],
250
                   self.nodes.c.path.like(self.escape_like(path),
251
                                          escape=ESCAPE_CHAR),
252
                   for_update=for_update)
253
        r = self.conn.execute(s)
254
        row = r.fetchone()
255
        r.close()
256
        if row:
257
            return row[0]
258
        return None
259

    
260
    def node_lookup_bulk(self, paths):
261
        """Lookup the current nodes for the given paths.
262
           Return () if the path is not found.
263
        """
264

    
265
        if not paths:
266
            return ()
267
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
268
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
269
        r = self.conn.execute(s)
270
        rows = r.fetchall()
271
        r.close()
272
        return [row[0] for row in rows]
273

    
274
    def node_get_properties(self, node):
275
        """Return the node's (parent, path).
276
           Return None if the node is not found.
277
        """
278

    
279
        s = select([self.nodes.c.parent, self.nodes.c.path])
280
        s = s.where(self.nodes.c.node == node)
281
        r = self.conn.execute(s)
282
        l = r.fetchone()
283
        r.close()
284
        return l
285

    
286
    def node_get_versions(self, node, keys=(), propnames=_propnames):
287
        """Return the properties of all versions at node.
288
           If keys is empty, return all properties in the order
289
           (serial, node, hash, size, type, source, mtime, muser, uuid,
290
            checksum, cluster, available, map_check_timestamp).
291
        """
292

    
293
        s = select([self.versions.c.serial,
294
                    self.versions.c.node,
295
                    self.versions.c.hash,
296
                    self.versions.c.size,
297
                    self.versions.c.type,
298
                    self.versions.c.source,
299
                    self.versions.c.mtime,
300
                    self.versions.c.muser,
301
                    self.versions.c.uuid,
302
                    self.versions.c.checksum,
303
                    self.versions.c.cluster,
304
                    self.versions.c.available,
305
                    self.versions.c.map_check_timestamp],
306
                   self.versions.c.node == node)
307
        s = s.order_by(self.versions.c.serial)
308
        r = self.conn.execute(s)
309
        rows = r.fetchall()
310
        r.close()
311
        if not rows:
312
            return rows
313

    
314
        if not keys:
315
            return rows
316

    
317
        return [[p[propnames[k]] for k in keys if k in propnames] for
318
                p in rows]
319

    
320
    def node_count_children(self, node):
321
        """Return node's child count."""
322

    
323
        s = select([func.count(self.nodes.c.node)])
324
        s = s.where(and_(self.nodes.c.parent == node,
325
                         self.nodes.c.node != ROOTNODE))
326
        r = self.conn.execute(s)
327
        row = r.fetchone()
328
        r.close()
329
        return row[0]
330

    
331
    def node_purge_children(self, parent, before=inf, cluster=0,
332
                            update_statistics_ancestors_depth=None):
333
        """Delete all versions with the specified
334
           parent and cluster, and return
335
           the hashes, the total size and the serials of versions deleted.
336
           Clears out nodes with no remaining versions.
337
        """
338
        #update statistics
339
        c1 = select([self.nodes.c.node],
340
                    self.nodes.c.parent == parent)
341
        where_clause = and_(self.versions.c.node.in_(c1),
342
                            self.versions.c.cluster == cluster)
343
        if before != inf:
344
            where_clause = and_(where_clause,
345
                                self.versions.c.mtime <= before)
346
        s = select([func.count(self.versions.c.serial),
347
                    func.sum(self.versions.c.size)])
348
        s = s.where(where_clause)
349
        r = self.conn.execute(s)
350
        row = r.fetchone()
351
        r.close()
352
        if not row:
353
            return (), 0, ()
354
        nr, size = row[0], row[1] if row[1] else 0
355
        mtime = time()
356
        self.statistics_update(parent, -nr, -size, mtime, cluster)
357
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
358
                                         update_statistics_ancestors_depth)
359

    
360
        s = select([self.versions.c.hash, self.versions.c.serial])
361
        s = s.where(where_clause)
362
        r = self.conn.execute(s)
363
        hashes = []
364
        serials = []
365
        for row in r.fetchall():
366
            hashes += [row[0]]
367
            serials += [row[1]]
368
        r.close()
369

    
370
        #delete versions
371
        s = self.versions.delete().where(where_clause)
372
        r = self.conn.execute(s)
373
        r.close()
374

    
375
        #delete nodes
376
        s = select([self.nodes.c.node],
377
                   and_(self.nodes.c.parent == parent,
378
                        select([func.count(self.versions.c.serial)],
379
                               self.versions.c.node == self.nodes.c.node).
380
                        as_scalar() == 0))
381
        rp = self.conn.execute(s)
382
        nodes = [row[0] for row in rp.fetchall()]
383
        rp.close()
384
        if nodes:
385
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
386
            self.conn.execute(s).close()
387

    
388
        return hashes, size, serials
389

    
390
    def node_purge(self, node, before=inf, cluster=0,
391
                   update_statistics_ancestors_depth=None):
392
        """Delete all versions with the specified
393
           node and cluster, and return
394
           the hashes and size of versions deleted.
395
           Clears out the node if it has no remaining versions.
396
        """
397

    
398
        #update statistics
399
        s = select([func.count(self.versions.c.serial),
400
                    func.sum(self.versions.c.size)])
401
        where_clause = and_(self.versions.c.node == node,
402
                            self.versions.c.cluster == cluster)
403
        if before != inf:
404
            where_clause = and_(where_clause,
405
                                self.versions.c.mtime <= before)
406
        s = s.where(where_clause)
407
        r = self.conn.execute(s)
408
        row = r.fetchone()
409
        nr, size = row[0], row[1]
410
        r.close()
411
        if not nr:
412
            return (), 0, ()
413
        mtime = time()
414
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
415
                                         update_statistics_ancestors_depth)
416

    
417
        s = select([self.versions.c.hash, self.versions.c.serial])
418
        s = s.where(where_clause)
419
        r = self.conn.execute(s)
420
        hashes = []
421
        serials = []
422
        for row in r.fetchall():
423
            hashes += [row[0]]
424
            serials += [row[1]]
425
        r.close()
426

    
427
        #delete versions
428
        s = self.versions.delete().where(where_clause)
429
        r = self.conn.execute(s)
430
        r.close()
431

    
432
        #delete nodes
433
        s = select([self.nodes.c.node],
434
                   and_(self.nodes.c.node == node,
435
                        select([func.count(self.versions.c.serial)],
436
                               self.versions.c.node == self.nodes.c.node).
437
                        as_scalar() == 0))
438
        rp = self.conn.execute(s)
439
        nodes = [row[0] for row in rp.fetchall()]
440
        rp.close()
441
        if nodes:
442
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
443
            self.conn.execute(s).close()
444

    
445
        return hashes, size, serials
446

    
447
    def node_remove(self, node, update_statistics_ancestors_depth=None):
448
        """Remove the node specified.
449
           Return false if the node has children or is not found.
450
        """
451

    
452
        if self.node_count_children(node):
453
            return False
454

    
455
        mtime = time()
456
        s = select([func.count(self.versions.c.serial),
457
                    func.sum(self.versions.c.size),
458
                    self.versions.c.cluster])
459
        s = s.where(self.versions.c.node == node)
460
        s = s.group_by(self.versions.c.cluster)
461
        r = self.conn.execute(s)
462
        for population, size, cluster in r.fetchall():
463
            self.statistics_update_ancestors(
464
                node, -population, -size, mtime, cluster,
465
                update_statistics_ancestors_depth)
466
        r.close()
467

    
468
        s = self.nodes.delete().where(self.nodes.c.node == node)
469
        self.conn.execute(s).close()
470
        return True
471

    
472
    def node_accounts(self, accounts=()):
473
        s = select([self.nodes.c.path, self.nodes.c.node])
474
        s = s.where(and_(self.nodes.c.node != 0,
475
                         self.nodes.c.parent == 0))
476
        if accounts:
477
            s = s.where(self.nodes.c.path.in_(accounts))
478
        r = self.conn.execute(s)
479
        rows = r.fetchall()
480
        r.close()
481
        return rows
482

    
483
    def node_account_quotas(self):
484
        s = select([self.nodes.c.path, self.policy.c.value])
485
        s = s.where(and_(self.nodes.c.node != 0,
486
                         self.nodes.c.parent == 0))
487
        s = s.where(self.nodes.c.node == self.policy.c.node)
488
        s = s.where(self.policy.c.key == 'quota')
489
        r = self.conn.execute(s)
490
        rows = r.fetchall()
491
        r.close()
492
        return dict(rows)
493

    
494
    def node_account_usage(self, account=None, cluster=0):
495
        """Return usage for a specific account.
496

497
        Keyword arguments:
498
        account -- (default None: list usage for all the accounts)
499
        cluster -- list current, history or deleted usage (default 0: normal)
500
        """
501

    
502
        n1 = self.nodes.alias('n1')
503
        n2 = self.nodes.alias('n2')
504
        n3 = self.nodes.alias('n3')
505

    
506
        s = select([n3.c.path, func.sum(self.versions.c.size)])
507
        s = s.where(n1.c.node == self.versions.c.node)
508
        s = s.where(self.versions.c.cluster == cluster)
509
        s = s.where(n1.c.parent == n2.c.node)
510
        s = s.where(n2.c.parent == n3.c.node)
511
        s = s.where(n3.c.parent == 0)
512
        s = s.where(n3.c.node != 0)
513
        if account:
514
            s = s.where(n3.c.path == account)
515
        s = s.group_by(n3.c.path)
516
        r = self.conn.execute(s)
517
        usage = r.fetchall()
518
        r.close()
519
        return dict(usage)
520

    
521
    def policy_get(self, node):
522
        s = select([self.policy.c.key, self.policy.c.value],
523
                   self.policy.c.node == node)
524
        r = self.conn.execute(s)
525
        d = dict(r.fetchall())
526
        r.close()
527
        return d
528

    
529
    def policy_set(self, node, policy):
530
        #insert or replace
531
        for k, v in policy.iteritems():
532
            s = self.policy.update().where(and_(self.policy.c.node == node,
533
                                                self.policy.c.key == k))
534
            s = s.values(value=v)
535
            rp = self.conn.execute(s)
536
            rp.close()
537
            if rp.rowcount == 0:
538
                s = self.policy.insert()
539
                values = {'node': node, 'key': k, 'value': v}
540
                r = self.conn.execute(s, values)
541
                r.close()
542

    
543
    def statistics_get(self, node, cluster=0):
544
        """Return population, total size and last mtime
545
           for all versions under node that belong to the cluster.
546
        """
547

    
548
        s = select([self.statistics.c.population,
549
                    self.statistics.c.size,
550
                    self.statistics.c.mtime])
551
        s = s.where(and_(self.statistics.c.node == node,
552
                         self.statistics.c.cluster == cluster))
553
        r = self.conn.execute(s)
554
        row = r.fetchone()
555
        r.close()
556
        return row
557

    
558
    def statistics_update(self, node, population, size, mtime, cluster=0):
559
        """Update the statistics of the given node.
560
           Statistics keep track the population, total
561
           size of objects and mtime in the node's namespace.
562
           May be zero or positive or negative numbers.
563
        """
564
        s = select([self.statistics.c.population, self.statistics.c.size],
565
                   and_(self.statistics.c.node == node,
566
                        self.statistics.c.cluster == cluster))
567
        rp = self.conn.execute(s)
568
        r = rp.fetchone()
569
        rp.close()
570
        if not r:
571
            prepopulation, presize = (0, 0)
572
        else:
573
            prepopulation, presize = r
574
        population += prepopulation
575
        population = max(population, 0)
576
        size += presize
577

    
578
        #insert or replace
579
        #TODO better upsert
580
        u = self.statistics.update().where(and_(
581
            self.statistics.c.node == node,
582
            self.statistics.c.cluster == cluster))
583
        u = u.values(population=population, size=size, mtime=mtime)
584
        rp = self.conn.execute(u)
585
        rp.close()
586
        if rp.rowcount == 0:
587
            ins = self.statistics.insert()
588
            ins = ins.values(node=node, population=population, size=size,
589
                             mtime=mtime, cluster=cluster)
590
            self.conn.execute(ins).close()
591

    
592
    def statistics_update_ancestors(self, node, population, size, mtime,
593
                                    cluster=0, recursion_depth=None):
594
        """Update the statistics of the given node's parent.
595
           Then recursively update all parents up to the root
596
           or up to the ``recursion_depth`` (if not None).
597
           Population is not recursive.
598
        """
599

    
600
        i = 0
601
        while True:
602
            if node == ROOTNODE:
603
                break
604
            if recursion_depth and recursion_depth <= i:
605
                break
606
            props = self.node_get_properties(node)
607
            if props is None:
608
                break
609
            parent, path = props
610
            self.statistics_update(parent, population, size, mtime, cluster)
611
            node = parent
612
            population = 0  # Population isn't recursive
613
            i += 1
614

    
615
    def statistics_latest(self, node, before=inf, except_cluster=0):
616
        """Return population, total size and last mtime
617
           for all latest versions under node that
618
           do not belong to the cluster.
619
        """
620

    
621
        # The node.
622
        props = self.node_get_properties(node)
623
        if props is None:
624
            return None
625
        parent, path = props
626

    
627
        # The latest version.
628
        s = select([self.versions.c.serial,
629
                    self.versions.c.node,
630
                    self.versions.c.hash,
631
                    self.versions.c.size,
632
                    self.versions.c.type,
633
                    self.versions.c.source,
634
                    self.versions.c.mtime,
635
                    self.versions.c.muser,
636
                    self.versions.c.uuid,
637
                    self.versions.c.checksum,
638
                    self.versions.c.cluster,
639
                    self.versions.c.available,
640
                    self.versions.c.map_check_timestamp])
641
        if before != inf:
642
            filtered = select([func.max(self.versions.c.serial)],
643
                              self.versions.c.node == node)
644
            filtered = filtered.where(self.versions.c.mtime < before)
645
        else:
646
            filtered = select([self.nodes.c.latest_version],
647
                              self.nodes.c.node == node)
648
        s = s.where(and_(self.versions.c.cluster != except_cluster,
649
                         self.versions.c.serial == filtered))
650
        r = self.conn.execute(s)
651
        props = r.fetchone()
652
        r.close()
653
        if not props:
654
            return None
655
        mtime = props[MTIME]
656

    
657
        # First level, just under node (get population).
658
        v = self.versions.alias('v')
659
        s = select([func.count(v.c.serial),
660
                    func.sum(v.c.size),
661
                    func.max(v.c.mtime)])
662
        if before != inf:
663
            c1 = select([func.max(self.versions.c.serial)],
664
                        and_(self.versions.c.mtime < before,
665
                             self.versions.c.node == v.c.node))
666
        else:
667
            c1 = select([self.nodes.c.latest_version])
668
            c1 = c1.where(self.nodes.c.node == v.c.node)
669
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
670
        s = s.where(and_(v.c.serial == c1,
671
                         v.c.cluster != except_cluster,
672
                         v.c.node.in_(c2)))
673
        rp = self.conn.execute(s)
674
        r = rp.fetchone()
675
        rp.close()
676
        if not r:
677
            return None
678
        count = r[0]
679
        mtime = max(mtime, r[2])
680
        if count == 0:
681
            return (0, 0, mtime)
682

    
683
        # All children (get size and mtime).
684
        # This is why the full path is stored.
685
        if before != inf:
686
            s = select([func.count(v.c.serial),
687
                       func.sum(v.c.size),
688
                       func.max(v.c.mtime)])
689
            c1 = select([func.max(self.versions.c.serial)],
690
                        and_(self.versions.c.mtime < before,
691
                             self.versions.c.node == v.c.node))
692
        else:
693
            inner_join = \
694
                self.versions.join(self.nodes, onclause=
695
                                   self.versions.c.serial ==
696
                                   self.nodes.c.latest_version)
697
            s = select([func.count(self.versions.c.serial),
698
                       func.sum(self.versions.c.size),
699
                       func.max(self.versions.c.mtime)],
700
                       from_obj=[inner_join])
701

    
702
        c2 = select([self.nodes.c.node],
703
                    self.nodes.c.path.like(self.escape_like(path) + '%',
704
                                           escape=ESCAPE_CHAR))
705
        if before != inf:
706
            s = s.where(and_(v.c.serial == c1,
707
                        v.c.cluster != except_cluster,
708
                        v.c.node.in_(c2)))
709
        else:
710
            s = s.where(and_(self.versions.c.cluster != except_cluster,
711
                        self.versions.c.node.in_(c2)))
712

    
713
        rp = self.conn.execute(s)
714
        r = rp.fetchone()
715
        rp.close()
716
        if not r:
717
            return None
718
        size = long(r[1] - props[SIZE])
719
        mtime = max(mtime, r[2])
720
        return (count, size, mtime)
721

    
722
    def nodes_set_latest_version(self, node, serial):
723
        s = self.nodes.update().where(self.nodes.c.node == node)
724
        s = s.values(latest_version=serial)
725
        self.conn.execute(s).close()
726

    
727
    def version_create(self, node, hash, size, type, source, muser, uuid,
728
                       checksum, cluster=0,
729
                       update_statistics_ancestors_depth=None,
730
                       available=True, map_check_timestamp=None):
731
        """Create a new version from the given properties.
732
           Return the (serial, mtime) of the new version.
733
        """
734

    
735
        mtime = time()
736
        s = self.versions.insert().values(
737
            node=node, hash=hash, size=size, type=type, source=source,
738
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
739
            cluster=cluster, available=available,
740
            map_check_timestamp=map_check_timestamp)
741
        serial = self.conn.execute(s).inserted_primary_key[0]
742
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
743
                                         update_statistics_ancestors_depth)
744

    
745
        self.nodes_set_latest_version(node, serial)
746

    
747
        return serial, mtime
748

    
749
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
750
        """Lookup the current version of the given node.
751
           Return a list with its properties:
752
           (serial, node, hash, size, type, source, mtime,
753
            muser, uuid, checksum, cluster, available, map_check_timestamp)
754
           or None if the current version is not found in the given cluster.
755
        """
756

    
757
        v = self.versions.alias('v')
758
        if not all_props:
759
            s = select([v.c.serial])
760
        else:
761
            s = select([v.c.serial, v.c.node, v.c.hash,
762
                        v.c.size, v.c.type, v.c.source,
763
                        v.c.mtime, v.c.muser, v.c.uuid,
764
                        v.c.checksum, v.c.cluster,
765
                        v.c.available, v.c.map_check_timestamp])
766
        if before != inf:
767
            c = select([func.max(self.versions.c.serial)],
768
                       self.versions.c.node == node)
769
            c = c.where(self.versions.c.mtime < before)
770
        else:
771
            c = select([self.nodes.c.latest_version],
772
                       self.nodes.c.node == node)
773
        s = s.where(and_(v.c.serial == c,
774
                         v.c.cluster == cluster))
775
        r = self.conn.execute(s)
776
        props = r.fetchone()
777
        r.close()
778
        if props:
779
            return props
780
        return None
781

    
782
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
783
                            all_props=True, order_by_path=False):
784
        """Lookup the current versions of the given nodes.
785
           Return a list with their properties:
786
           (serial, node, hash, size, type, source, mtime, muser, uuid,
787
            checksum, cluster, available, map_check_timestamp).
788
        """
789
        if not nodes:
790
            return ()
791
        v = self.versions.alias('v')
792
        n = self.nodes.alias('n')
793
        if not all_props:
794
            s = select([v.c.serial])
795
        else:
796
            s = select([v.c.serial, v.c.node, v.c.hash,
797
                        v.c.size, v.c.type, v.c.source,
798
                        v.c.mtime, v.c.muser, v.c.uuid,
799
                        v.c.checksum, v.c.cluster,
800
                        v.c.available, v.c.map_check_timestamp])
801
        if before != inf:
802
            c = select([func.max(self.versions.c.serial)],
803
                       self.versions.c.node.in_(nodes))
804
            c = c.where(self.versions.c.mtime < before)
805
            c = c.group_by(self.versions.c.node)
806
        else:
807
            c = select([self.nodes.c.latest_version],
808
                       self.nodes.c.node.in_(nodes))
809
        s = s.where(and_(v.c.serial.in_(c),
810
                         v.c.cluster == cluster))
811
        if order_by_path:
812
            s = s.where(v.c.node == n.c.node)
813
            s = s.order_by(n.c.path)
814
        else:
815
            s = s.order_by(v.c.node)
816
        r = self.conn.execute(s)
817
        rproxy = r.fetchall()
818
        r.close()
819
        return (tuple(row.values()) for row in rproxy)
820

    
821
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
822
                               node=None):
823
        """Return a sequence of values for the properties of
824
           the version specified by serial and the keys, in the order given.
825
           If keys is empty, return all properties in the order
826
           (serial, node, hash, size, type, source, mtime, muser, uuid,
827
            checksum, cluster, available, map_check_timestamp).
828
        """
829

    
830
        v = self.versions.alias()
831
        s = select([v.c.serial, v.c.node, v.c.hash,
832
                    v.c.size, v.c.type, v.c.source,
833
                    v.c.mtime, v.c.muser, v.c.uuid,
834
                    v.c.checksum, v.c.cluster,
835
                    v.c.available, v.c.map_check_timestamp],
836
                   v.c.serial == serial)
837
        if node is not None:
838
            s = s.where(v.c.node == node)
839
        rp = self.conn.execute(s)
840
        r = rp.fetchone()
841
        rp.close()
842
        if r is None:
843
            return r
844

    
845
        if not keys:
846
            return r
847
        return [r[propnames[k]] for k in keys if k in propnames]
848

    
849
    def version_put_property(self, serial, key, value):
850
        """Set value for the property of version specified by key."""
851

    
852
        if key not in _propnames:
853
            return
854
        s = self.versions.update()
855
        s = s.where(self.versions.c.serial == serial)
856
        s = s.values(**{key: value})
857
        self.conn.execute(s).close()
858

    
859
    def version_recluster(self, serial, cluster,
860
                          update_statistics_ancestors_depth=None):
861
        """Move the version into another cluster."""
862

    
863
        props = self.version_get_properties(serial)
864
        if not props:
865
            return
866
        node = props[NODE]
867
        size = props[SIZE]
868
        oldcluster = props[CLUSTER]
869
        if cluster == oldcluster:
870
            return
871

    
872
        mtime = time()
873
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
874
                                         update_statistics_ancestors_depth)
875
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
876
                                         update_statistics_ancestors_depth)
877

    
878
        s = self.versions.update()
879
        s = s.where(self.versions.c.serial == serial)
880
        s = s.values(cluster=cluster)
881
        self.conn.execute(s).close()
882

    
883
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
884
        """Remove the serial specified."""
885

    
886
        props = self.version_get_properties(serial)
887
        if not props:
888
            return
889
        node = props[NODE]
890
        hash = props[HASH]
891
        size = props[SIZE]
892
        cluster = props[CLUSTER]
893

    
894
        mtime = time()
895
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
896
                                         update_statistics_ancestors_depth)
897

    
898
        s = self.versions.delete().where(self.versions.c.serial == serial)
899
        self.conn.execute(s).close()
900

    
901
        props = self.version_lookup(node, cluster=cluster, all_props=False)
902
        if props:
903
            self.nodes_set_latest_version(node, serial)
904

    
905
        return hash, size
906

    
907
    def attribute_get(self, serial, domain, keys=()):
908
        """Return a list of (key, value) pairs of the specific version.
909

910
        If keys is empty, return all attributes.
911
        Othwerise, return only those specified.
912
        """
913

    
914
        if keys:
915
            attrs = self.attributes.alias()
916
            s = select([attrs.c.key, attrs.c.value])
917
            s = s.where(and_(attrs.c.key.in_(keys),
918
                             attrs.c.serial == serial,
919
                             attrs.c.domain == domain))
920
        else:
921
            attrs = self.attributes.alias()
922
            s = select([attrs.c.key, attrs.c.value])
923
            s = s.where(and_(attrs.c.serial == serial,
924
                             attrs.c.domain == domain))
925
        r = self.conn.execute(s)
926
        l = r.fetchall()
927
        r.close()
928
        return l
929

    
930
    def attribute_set(self, serial, domain, node, items, is_latest=True):
931
        """Set the attributes of the version specified by serial.
932
           Receive attributes as an iterable of (key, value) pairs.
933
        """
934
        #insert or replace
935
        #TODO better upsert
936
        for k, v in items:
937
            s = self.attributes.update()
938
            s = s.where(and_(self.attributes.c.serial == serial,
939
                             self.attributes.c.domain == domain,
940
                             self.attributes.c.key == k))
941
            s = s.values(value=v)
942
            rp = self.conn.execute(s)
943
            rp.close()
944
            if rp.rowcount == 0:
945
                s = self.attributes.insert()
946
                s = s.values(serial=serial, domain=domain, node=node,
947
                             is_latest=is_latest, key=k, value=v)
948
                self.conn.execute(s).close()
949

    
950
    def attribute_del(self, serial, domain, keys=()):
951
        """Delete attributes of the version specified by serial.
952
           If keys is empty, delete all attributes.
953
           Otherwise delete those specified.
954
        """
955

    
956
        if keys:
957
            #TODO more efficient way to do this?
958
            for key in keys:
959
                s = self.attributes.delete()
960
                s = s.where(and_(self.attributes.c.serial == serial,
961
                                 self.attributes.c.domain == domain,
962
                                 self.attributes.c.key == key))
963
                self.conn.execute(s).close()
964
        else:
965
            s = self.attributes.delete()
966
            s = s.where(and_(self.attributes.c.serial == serial,
967
                             self.attributes.c.domain == domain))
968
            self.conn.execute(s).close()
969

    
970
    def attribute_copy(self, source, dest):
971
        s = select(
972
            [dest, self.attributes.c.domain, self.attributes.c.node,
973
             self.attributes.c.key, self.attributes.c.value],
974
            self.attributes.c.serial == source)
975
        rp = self.conn.execute(s)
976
        attributes = rp.fetchall()
977
        rp.close()
978
        for dest, domain, node, k, v in attributes:
979
            select_src_node = select(
980
                [self.versions.c.node],
981
                self.versions.c.serial == dest)
982
            # insert or replace
983
            s = self.attributes.update().where(and_(
984
                self.attributes.c.serial == dest,
985
                self.attributes.c.domain == domain,
986
                self.attributes.c.key == k))
987
            s = s.values(node=select_src_node, value=v)
988
            rp = self.conn.execute(s)
989
            rp.close()
990
            if rp.rowcount == 0:
991
                s = self.attributes.insert()
992
                s = s.values(serial=dest, domain=domain, node=select_src_node,
993
                             is_latest=True, key=k, value=v)
994
            self.conn.execute(s).close()
995

    
996
    def attribute_unset_is_latest(self, node, exclude):
997
        u = self.attributes.update().where(and_(
998
            self.attributes.c.node == node,
999
            self.attributes.c.serial != exclude)).values({'is_latest': False})
1000
        self.conn.execute(u)
1001

    
1002
    def latest_attribute_keys(self, parent, domain, before=inf,
1003
                              except_cluster=0, pathq=None):
1004
        """Return a list with all keys pairs defined
1005
           for all latest versions under parent that
1006
           do not belong to the cluster.
1007
        """
1008

    
1009
        pathq = pathq or []
1010

    
1011
        # TODO: Use another table to store before=inf results.
1012
        v = self.versions.alias('v')
1013
        s = select([self.attributes.c.key]).distinct()
1014
        if before != inf:
1015
            filtered = select([func.max(v.c.serial)],
1016
                              and_(v.c.mtime < before,
1017
                                   v.c.node == self.versions.c.node))
1018
        else:
1019
            filtered = select([self.nodes.c.latest_version])
1020
            filtered = filtered.where(self.nodes.c.node ==
1021
                                      self.versions.c.node
1022
                                      ).correlate(self.versions)
1023
        s = s.where(self.versions.c.serial == filtered)
1024
        s = s.where(self.versions.c.cluster != except_cluster)
1025
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1026
                                             self.nodes.c.parent == parent)))
1027
        s = s.where(self.attributes.c.serial == self.versions.c.serial)
1028
        s = s.where(self.attributes.c.domain == domain)
1029
        s = s.where(self.nodes.c.node == self.versions.c.node)
1030
        s = s.order_by(self.attributes.c.key)
1031
        conja = []
1032
        conjb = []
1033
        for path, match in pathq:
1034
            if match == MATCH_PREFIX:
1035
                conja.append(self.nodes.c.path.like(self.escape_like(path) +
1036
                                                    '%', escape=ESCAPE_CHAR))
1037
            elif match == MATCH_EXACT:
1038
                conjb.append(path)
1039
        if conja or conjb:
1040
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1041
        rp = self.conn.execute(s)
1042
        rows = rp.fetchall()
1043
        rp.close()
1044
        return [r[0] for r in rows]
1045

    
1046
    def latest_version_list(self, parent, prefix='', delimiter=None,
1047
                            start='', limit=10000, before=inf,
1048
                            except_cluster=0, pathq=[], domain=None,
1049
                            filterq=[], sizeq=None, all_props=False):
1050
        """Return a (list of (path, serial) tuples, list of common prefixes)
1051
           for the current versions of the paths with the given parent,
1052
           matching the following criteria.
1053

1054
           The property tuple for a version is returned if all
1055
           of these conditions are true:
1056

1057
                a. parent matches
1058

1059
                b. path > start
1060

1061
                c. path starts with prefix (and paths in pathq)
1062

1063
                d. version is the max up to before
1064

1065
                e. version is not in cluster
1066

1067
                f. the path does not have the delimiter occuring
1068
                   after the prefix, or ends with the delimiter
1069

1070
                g. serial matches the attribute filter query.
1071

1072
                   A filter query is a comma-separated list of
1073
                   terms in one of these three forms:
1074

1075
                   key
1076
                       an attribute with this key must exist
1077

1078
                   !key
1079
                       an attribute with this key must not exist
1080

1081
                   key ?op value
1082
                       the attribute with this key satisfies the value
1083
                       where ?op is one of ==, != <=, >=, <, >.
1084

1085
                h. the size is in the range set by sizeq
1086

1087
           The list of common prefixes includes the prefixes
1088
           matching up to the first delimiter after prefix,
1089
           and are reported only once, as "virtual directories".
1090
           The delimiter is included in the prefixes.
1091

1092
           If arguments are None, then the corresponding matching rule
1093
           will always match.
1094

1095
           Limit applies to the first list of tuples returned.
1096

1097
           If all_props is True, return all properties after path,
1098
           not just serial.
1099
        """
1100

    
1101
        if not start or start < prefix:
1102
            start = strprevling(prefix)
1103
        nextling = strnextling(prefix)
1104

    
1105
        v = self.versions.alias('v')
1106
        n = self.nodes.alias('n')
1107
        if before != inf:
1108
            filtered = select([func.max(v.c.serial)],
1109
                              and_(v.c.mtime < before,
1110
                                   v.c.node == self.versions.c.node))
1111
            inner_join = \
1112
                self.nodes.join(self.versions,
1113
                                onclause=self.versions.c.serial == filtered)
1114
        else:
1115
            filtered = select([self.nodes.c.latest_version])
1116
            filtered = filtered.where(self.nodes.c.node ==
1117
                                      self.versions.c.node
1118
                                      ).correlate(self.versions)
1119
            inner_join = \
1120
                self.nodes.join(self.versions,
1121
                                onclause=
1122
                                self.versions.c.serial == filtered)
1123
        if not all_props:
1124
            s = select([self.nodes.c.path,
1125
                       self.versions.c.serial],
1126
                       from_obj=[inner_join]).distinct()
1127
        else:
1128
            s = select([self.nodes.c.path,
1129
                       self.versions.c.serial, self.versions.c.node,
1130
                       self.versions.c.hash,
1131
                       self.versions.c.size, self.versions.c.type,
1132
                       self.versions.c.source,
1133
                       self.versions.c.mtime, self.versions.c.muser,
1134
                       self.versions.c.uuid,
1135
                       self.versions.c.checksum,
1136
                       self.versions.c.cluster,
1137
                       self.versions.c.available,
1138
                       self.versions.c.map_check_timestamp],
1139
                       from_obj=[inner_join]).distinct()
1140

    
1141
        s = s.where(self.versions.c.cluster != except_cluster)
1142
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1143
                                             self.nodes.c.parent == parent)))
1144

    
1145
        s = s.where(self.versions.c.node == self.nodes.c.node)
1146
        s = s.where(and_(self.nodes.c.path > bindparam('start'),
1147
                    self.nodes.c.path < nextling))
1148
        conja = []
1149
        conjb = []
1150
        for path, match in pathq:
1151
            if match == MATCH_PREFIX:
1152
                conja.append(self.nodes.c.path.like(self.escape_like(path) +
1153
                             '%', escape=ESCAPE_CHAR))
1154
            elif match == MATCH_EXACT:
1155
                conjb.append(path)
1156
        if conja or conjb:
1157
            s = s.where(or_(self.nodes.c.path.in_(conjb), *conja))
1158

    
1159
        if sizeq and len(sizeq) == 2:
1160
            if sizeq[0]:
1161
                s = s.where(self.versions.c.size >= sizeq[0])
1162
            if sizeq[1]:
1163
                s = s.where(self.versions.c.size < sizeq[1])
1164

    
1165
        if domain and filterq:
1166
            a = self.attributes.alias('a')
1167
            included, excluded, opers = parse_filters(filterq)
1168
            if included:
1169
                subs = select([1])
1170
                subs = subs.where(self.attributes.c.serial ==
1171
                                  self.versions.c.serial
1172
                                  ).correlate(self.versions)
1173
                subs = subs.where(self.attributes.c.domain == domain)
1174
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x)
1175
                                  for x in included]))
1176
                s = s.where(exists(subs))
1177
            if excluded:
1178
                subs = select([1])
1179
                subs = subs.where(self.attributes.c.serial ==
1180
                                  self.versions.c.serial
1181
                                  ).correlate(self.versions)
1182
                subs = subs.where(self.attributes.c.domain == domain)
1183
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x)
1184
                                  for x in excluded]))
1185
                s = s.where(not_(exists(subs)))
1186
            if opers:
1187
                for k, o, val in opers:
1188
                    subs = select([1])
1189
                    subs = subs.where(self.attributes.c.serial ==
1190
                                      self.versions.c.serial
1191
                                      ).correlate(self.versions)
1192
                    subs = subs.where(self.attributes.c.domain == domain)
1193
                    subs = subs.where(
1194
                        and_(self.attributes.c.key.op('=')(k),
1195
                             self.attributes.c.value.op(o)(val)))
1196
                    s = s.where(exists(subs))
1197

    
1198
        s = s.order_by(self.nodes.c.path)
1199

    
1200
        if not delimiter:
1201
            s = s.limit(limit)
1202
            rp = self.conn.execute(s, start=start)
1203
            r = rp.fetchall()
1204
            rp.close()
1205
            return r, ()
1206

    
1207
        pfz = len(prefix)
1208
        dz = len(delimiter)
1209
        count = 0
1210
        prefixes = []
1211
        pappend = prefixes.append
1212
        matches = []
1213
        mappend = matches.append
1214

    
1215
        rp = self.conn.execute(s, start=start)
1216
        while True:
1217
            props = rp.fetchone()
1218
            if props is None:
1219
                break
1220
            path = props[0]
1221
            idx = path.find(delimiter, pfz)
1222

    
1223
            if idx < 0:
1224
                mappend(props)
1225
                count += 1
1226
                if count >= limit:
1227
                    break
1228
                continue
1229

    
1230
            if idx + dz == len(path):
1231
                mappend(props)
1232
                count += 1
1233
                continue  # Get one more, in case there is a path.
1234
            pf = path[:idx + dz]
1235
            pappend(pf)
1236
            if count >= limit:
1237
                break
1238

    
1239
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1240
        rp.close()
1241

    
1242
        return matches, prefixes
1243

    
1244
    def latest_uuid(self, uuid, cluster):
1245
        """Return the latest version of the given uuid and cluster.
1246

1247
        Return a (path, serial) tuple.
1248
        If cluster is None, all clusters are considered.
1249

1250
        """
1251

    
1252
        v = self.versions.alias('v')
1253
        n = self.nodes.alias('n')
1254
        s = select([n.c.path, v.c.serial])
1255
        filtered = select([func.max(self.versions.c.serial)])
1256
        filtered = filtered.where(self.versions.c.uuid == uuid)
1257
        if cluster is not None:
1258
            filtered = filtered.where(self.versions.c.cluster == cluster)
1259
        s = s.where(v.c.serial == filtered)
1260
        s = s.where(n.c.node == v.c.node)
1261

    
1262
        r = self.conn.execute(s)
1263
        l = r.fetchone()
1264
        r.close()
1265
        return l
1266

    
1267
    def domain_object_list(self, domain, paths, cluster=None):
1268
        """Return a list of (path, property list, attribute dictionary)
1269
           for the objects in the specific domain and cluster.
1270
        """
1271

    
1272
        v = self.versions.alias('v')
1273
        n = self.nodes.alias('n')
1274
        a = self.attributes.alias('a')
1275

    
1276
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1277
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1278
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1279
        if cluster:
1280
            s = s.where(v.c.cluster == cluster)
1281
        s = s.where(v.c.serial == a.c.serial)
1282
        s = s.where(a.c.domain == domain)
1283
        s = s.where(a.c.node == n.c.node)
1284
        s = s.where(a.c.is_latest == True)
1285
        if paths:
1286
            s = s.where(n.c.path.in_(paths))
1287

    
1288
        r = self.conn.execute(s)
1289
        rows = r.fetchall()
1290
        r.close()
1291

    
1292
        group_by = itemgetter(slice(12))
1293
        rows.sort(key=group_by)
1294
        groups = groupby(rows, group_by)
1295
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1296
                (k, data) in groups]
1297

    
1298
    def get_props(self, paths):
1299
        inner_join = \
1300
            self.nodes.join(self.versions,
1301
                            onclause=self.versions.c.serial ==
1302
                            self.nodes.c.latest_version)
1303
        cc = self.nodes.c.path.in_(paths)
1304
        s = select([self.nodes.c.path, self.versions.c.type],
1305
                   from_obj=[inner_join]).where(cc).distinct()
1306
        r = self.conn.execute(s)
1307
        rows = r.fetchall()
1308
        r.close()
1309
        if rows:
1310
            return rows
1311
        return None