Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ ae0a4858

History | View | Annotate | Download (47.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.exc import NoSuchTableError
43

    
44
from dbworker import DBWorker, ESCAPE_CHAR
45

    
46
from pithos.backends.filter import parse_filters
47

    
48

    
49
ROOTNODE = 0
50

    
51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER) = range(11)
53

    
54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55

    
56
inf = float('inf')
57

    
58

    
59
def strnextling(prefix):
60
    """Return the first unicode string
61
       greater than but not starting with given prefix.
62
       strnextling('hello') -> 'hellp'
63
    """
64
    if not prefix:
65
        ## all strings start with the null string,
66
        ## therefore we have to approximate strnextling('')
67
        ## with the last unicode character supported by python
68
        ## 0x10ffff for wide (32-bit unicode) python builds
69
        ## 0x00ffff for narrow (16-bit unicode) python builds
70
        ## We will not autodetect. 0xffff is safe enough.
71
        return unichr(0xffff)
72
    s = prefix[:-1]
73
    c = ord(prefix[-1])
74
    if c >= 0xffff:
75
        raise RuntimeError
76
    s += unichr(c + 1)
77
    return s
78

    
79

    
80
def strprevling(prefix):
81
    """Return an approximation of the last unicode string
82
       less than but not starting with given prefix.
83
       strprevling(u'hello') -> u'helln\\xffff'
84
    """
85
    if not prefix:
86
        ## There is no prevling for the null string
87
        return prefix
88
    s = prefix[:-1]
89
    c = ord(prefix[-1])
90
    if c > 0:
91
        s += unichr(c - 1) + unichr(0xffff)
92
    return s
93

    
94
_propnames = {
95
    'serial': 0,
96
    'node': 1,
97
    'hash': 2,
98
    'size': 3,
99
    'type': 4,
100
    'source': 5,
101
    'mtime': 6,
102
    'muser': 7,
103
    'uuid': 8,
104
    'checksum': 9,
105
    'cluster': 10,
106
}
107

    
108

    
109
def create_tables(engine):
110
    metadata = MetaData()
111

    
112
    #create nodes table
113
    columns = []
114
    columns.append(Column('node', Integer, primary_key=True))
115
    columns.append(Column('parent', Integer,
116
                          ForeignKey('nodes.node',
117
                                     ondelete='CASCADE',
118
                                     onupdate='CASCADE'),
119
                          autoincrement=False))
120
    columns.append(Column('latest_version', Integer))
121
    columns.append(Column('path', String(2048), default='', nullable=False))
122
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123
    Index('idx_nodes_path', nodes.c.path, unique=True)
124
    Index('idx_nodes_parent', nodes.c.parent)
125

    
126
    #create policy table
127
    columns = []
128
    columns.append(Column('node', Integer,
129
                          ForeignKey('nodes.node',
130
                                     ondelete='CASCADE',
131
                                     onupdate='CASCADE'),
132
                          primary_key=True))
133
    columns.append(Column('key', String(128), primary_key=True))
134
    columns.append(Column('value', String(256)))
135
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
136

    
137
    #create statistics table
138
    columns = []
139
    columns.append(Column('node', Integer,
140
                          ForeignKey('nodes.node',
141
                                     ondelete='CASCADE',
142
                                     onupdate='CASCADE'),
143
                          primary_key=True))
144
    columns.append(Column('population', Integer, nullable=False, default=0))
145
    columns.append(Column('size', BigInteger, nullable=False, default=0))
146
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
147
    columns.append(Column('cluster', Integer, nullable=False, default=0,
148
                          primary_key=True, autoincrement=False))
149
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
150

    
151
    #create versions table
152
    columns = []
153
    columns.append(Column('serial', Integer, primary_key=True))
154
    columns.append(Column('node', Integer,
155
                          ForeignKey('nodes.node',
156
                                     ondelete='CASCADE',
157
                                     onupdate='CASCADE')))
158
    columns.append(Column('hash', String(256)))
159
    columns.append(Column('size', BigInteger, nullable=False, default=0))
160
    columns.append(Column('type', String(256), nullable=False, default=''))
161
    columns.append(Column('source', Integer))
162
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
163
    columns.append(Column('muser', String(256), nullable=False, default=''))
164
    columns.append(Column('uuid', String(64), nullable=False, default=''))
165
    columns.append(Column('checksum', String(256), nullable=False, default=''))
166
    columns.append(Column('cluster', Integer, nullable=False, default=0))
167
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
168
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
169
    Index('idx_versions_node_uuid', versions.c.uuid)
170

    
171
    #create attributes table
172
    columns = []
173
    columns.append(Column('serial', Integer,
174
                          ForeignKey('versions.serial',
175
                                     ondelete='CASCADE',
176
                                     onupdate='CASCADE'),
177
                          primary_key=True))
178
    columns.append(Column('domain', String(256), primary_key=True))
179
    columns.append(Column('key', String(128), primary_key=True))
180
    columns.append(Column('value', String(256)))
181
    columns.append(Column('node', Integer, nullable=False, default=0))
182
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
183
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
184
    Index('idx_attributes_domain', attributes.c.domain)
185
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
186

    
187
    metadata.create_all(engine)
188
    return metadata.sorted_tables
189

    
190

    
191
class Node(DBWorker):
192
    """Nodes store path organization and have multiple versions.
193
       Versions store object history and have multiple attributes.
194
       Attributes store metadata.
195
    """
196

    
197
    # TODO: Provide an interface for included and excluded clusters.
198

    
199
    def __init__(self, **params):
200
        DBWorker.__init__(self, **params)
201
        try:
202
            metadata = MetaData(self.engine)
203
            self.nodes = Table('nodes', metadata, autoload=True)
204
            self.policy = Table('policy', metadata, autoload=True)
205
            self.statistics = Table('statistics', metadata, autoload=True)
206
            self.versions = Table('versions', metadata, autoload=True)
207
            self.attributes = Table('attributes', metadata, autoload=True)
208
        except NoSuchTableError:
209
            tables = create_tables(self.engine)
210
            map(lambda t: self.__setattr__(t.name, t), tables)
211

    
212
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
213
                                           self.nodes.c.parent == ROOTNODE))
214
        wrapper = self.wrapper
215
        wrapper.execute()
216
        try:
217
            rp = self.conn.execute(s)
218
            r = rp.fetchone()
219
            rp.close()
220
            if not r:
221
                s = self.nodes.insert(
222
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
223
                self.conn.execute(s)
224
        finally:
225
            wrapper.commit()
226

    
227
    def node_create(self, parent, path):
228
        """Create a new node from the given properties.
229
           Return the node identifier of the new node.
230
        """
231
        #TODO catch IntegrityError?
232
        s = self.nodes.insert().values(parent=parent, path=path)
233
        r = self.conn.execute(s)
234
        inserted_primary_key = r.inserted_primary_key[0]
235
        r.close()
236
        return inserted_primary_key
237

    
238
    def node_lookup(self, path, for_update=False):
239
        """Lookup the current node of the given path.
240
           Return None if the path is not found.
241
        """
242

    
243
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
244
        s = select([self.nodes.c.node],
245
                   self.nodes.c.path.like(self.escape_like(path),
246
                                          escape=ESCAPE_CHAR),
247
                   for_update=for_update)
248
        r = self.conn.execute(s)
249
        row = r.fetchone()
250
        r.close()
251
        if row:
252
            return row[0]
253
        return None
254

    
255
    def node_lookup_bulk(self, paths):
256
        """Lookup the current nodes for the given paths.
257
           Return () if the path is not found.
258
        """
259

    
260
        if not paths:
261
            return ()
262
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
263
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
264
        r = self.conn.execute(s)
265
        rows = r.fetchall()
266
        r.close()
267
        return [row[0] for row in rows]
268

    
269
    def node_get_properties(self, node):
270
        """Return the node's (parent, path).
271
           Return None if the node is not found.
272
        """
273

    
274
        s = select([self.nodes.c.parent, self.nodes.c.path])
275
        s = s.where(self.nodes.c.node == node)
276
        r = self.conn.execute(s)
277
        l = r.fetchone()
278
        r.close()
279
        return l
280

    
281
    def node_get_versions(self, node, keys=(), propnames=_propnames):
282
        """Return the properties of all versions at node.
283
           If keys is empty, return all properties in the order
284
           (serial, node, hash, size, type, source, mtime, muser, uuid,
285
            checksum, cluster).
286
        """
287

    
288
        s = select([self.versions.c.serial,
289
                    self.versions.c.node,
290
                    self.versions.c.hash,
291
                    self.versions.c.size,
292
                    self.versions.c.type,
293
                    self.versions.c.source,
294
                    self.versions.c.mtime,
295
                    self.versions.c.muser,
296
                    self.versions.c.uuid,
297
                    self.versions.c.checksum,
298
                    self.versions.c.cluster], self.versions.c.node == node)
299
        s = s.order_by(self.versions.c.serial)
300
        r = self.conn.execute(s)
301
        rows = r.fetchall()
302
        r.close()
303
        if not rows:
304
            return rows
305

    
306
        if not keys:
307
            return rows
308

    
309
        return [[p[propnames[k]] for k in keys if k in propnames] for
310
                p in rows]
311

    
312
    def node_count_children(self, node):
313
        """Return node's child count."""
314

    
315
        s = select([func.count(self.nodes.c.node)])
316
        s = s.where(and_(self.nodes.c.parent == node,
317
                         self.nodes.c.node != ROOTNODE))
318
        r = self.conn.execute(s)
319
        row = r.fetchone()
320
        r.close()
321
        return row[0]
322

    
323
    def node_purge_children(self, parent, before=inf, cluster=0,
324
                            update_statistics_ancestors_depth=None):
325
        """Delete all versions with the specified
326
           parent and cluster, and return
327
           the hashes, the total size and the serials of versions deleted.
328
           Clears out nodes with no remaining versions.
329
        """
330
        #update statistics
331
        c1 = select([self.nodes.c.node],
332
                    self.nodes.c.parent == parent)
333
        where_clause = and_(self.versions.c.node.in_(c1),
334
                            self.versions.c.cluster == cluster)
335
        if before != inf:
336
            where_clause = and_(where_clause,
337
                                self.versions.c.mtime <= before)
338
        s = select([func.count(self.versions.c.serial),
339
                    func.sum(self.versions.c.size)])
340
        s = s.where(where_clause)
341
        r = self.conn.execute(s)
342
        row = r.fetchone()
343
        r.close()
344
        if not row:
345
            return (), 0, ()
346
        nr, size = row[0], row[1] if row[1] else 0
347
        mtime = time()
348
        self.statistics_update(parent, -nr, -size, mtime, cluster)
349
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
350
                                         update_statistics_ancestors_depth)
351

    
352
        s = select([self.versions.c.hash, self.versions.c.serial])
353
        s = s.where(where_clause)
354
        r = self.conn.execute(s)
355
        hashes = []
356
        serials = []
357
        for row in r.fetchall():
358
            hashes += [row[0]]
359
            serials += [row[1]]
360
        r.close()
361

    
362
        #delete versions
363
        s = self.versions.delete().where(where_clause)
364
        r = self.conn.execute(s)
365
        r.close()
366

    
367
        #delete nodes
368
        s = select([self.nodes.c.node],
369
                   and_(self.nodes.c.parent == parent,
370
                        select([func.count(self.versions.c.serial)],
371
                               self.versions.c.node == self.nodes.c.node).
372
                        as_scalar() == 0))
373
        rp = self.conn.execute(s)
374
        nodes = [row[0] for row in rp.fetchall()]
375
        rp.close()
376
        if nodes:
377
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
378
            self.conn.execute(s).close()
379

    
380
        return hashes, size, serials
381

    
382
    def node_purge(self, node, before=inf, cluster=0,
383
                   update_statistics_ancestors_depth=None):
384
        """Delete all versions with the specified
385
           node and cluster, and return
386
           the hashes and size of versions deleted.
387
           Clears out the node if it has no remaining versions.
388
        """
389

    
390
        #update statistics
391
        s = select([func.count(self.versions.c.serial),
392
                    func.sum(self.versions.c.size)])
393
        where_clause = and_(self.versions.c.node == node,
394
                            self.versions.c.cluster == cluster)
395
        if before != inf:
396
            where_clause = and_(where_clause,
397
                                self.versions.c.mtime <= before)
398
        s = s.where(where_clause)
399
        r = self.conn.execute(s)
400
        row = r.fetchone()
401
        nr, size = row[0], row[1]
402
        r.close()
403
        if not nr:
404
            return (), 0, ()
405
        mtime = time()
406
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
407
                                         update_statistics_ancestors_depth)
408

    
409
        s = select([self.versions.c.hash, self.versions.c.serial])
410
        s = s.where(where_clause)
411
        r = self.conn.execute(s)
412
        hashes = []
413
        serials = []
414
        for row in r.fetchall():
415
            hashes += [row[0]]
416
            serials += [row[1]]
417
        r.close()
418

    
419
        #delete versions
420
        s = self.versions.delete().where(where_clause)
421
        r = self.conn.execute(s)
422
        r.close()
423

    
424
        #delete nodes
425
        s = select([self.nodes.c.node],
426
                   and_(self.nodes.c.node == node,
427
                        select([func.count(self.versions.c.serial)],
428
                               self.versions.c.node == self.nodes.c.node).
429
                        as_scalar() == 0))
430
        rp = self.conn.execute(s)
431
        nodes = [row[0] for row in rp.fetchall()]
432
        rp.close()
433
        if nodes:
434
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
435
            self.conn.execute(s).close()
436

    
437
        return hashes, size, serials
438

    
439
    def node_remove(self, node, update_statistics_ancestors_depth=None):
440
        """Remove the node specified.
441
           Return false if the node has children or is not found.
442
        """
443

    
444
        if self.node_count_children(node):
445
            return False
446

    
447
        mtime = time()
448
        s = select([func.count(self.versions.c.serial),
449
                    func.sum(self.versions.c.size),
450
                    self.versions.c.cluster])
451
        s = s.where(self.versions.c.node == node)
452
        s = s.group_by(self.versions.c.cluster)
453
        r = self.conn.execute(s)
454
        for population, size, cluster in r.fetchall():
455
            self.statistics_update_ancestors(
456
                node, -population, -size, mtime, cluster,
457
                update_statistics_ancestors_depth)
458
        r.close()
459

    
460
        s = self.nodes.delete().where(self.nodes.c.node == node)
461
        self.conn.execute(s).close()
462
        return True
463

    
464
    def node_accounts(self, accounts=()):
465
        s = select([self.nodes.c.path, self.nodes.c.node])
466
        s = s.where(and_(self.nodes.c.node != 0,
467
                         self.nodes.c.parent == 0))
468
        if accounts:
469
            s = s.where(self.nodes.c.path.in_(accounts))
470
        r = self.conn.execute(s)
471
        rows = r.fetchall()
472
        r.close()
473
        return rows
474

    
475
    def node_account_quotas(self):
476
        s = select([self.nodes.c.path, self.policy.c.value])
477
        s = s.where(and_(self.nodes.c.node != 0,
478
                         self.nodes.c.parent == 0))
479
        s = s.where(self.nodes.c.node == self.policy.c.node)
480
        s = s.where(self.policy.c.key == 'quota')
481
        r = self.conn.execute(s)
482
        rows = r.fetchall()
483
        r.close()
484
        return dict(rows)
485

    
486
    def node_account_usage(self, account=None, cluster=0):
487
        """Return usage for a specific account.
488

489
        Keyword arguments:
490
        account -- (default None: list usage for all the accounts)
491
        cluster -- list current, history or deleted usage (default 0: normal)
492
        """
493

    
494
        n1 = self.nodes.alias('n1')
495
        n2 = self.nodes.alias('n2')
496
        n3 = self.nodes.alias('n3')
497

    
498
        s = select([n3.c.path, func.sum(self.versions.c.size)])
499
        s = s.where(n1.c.node == self.versions.c.node)
500
        s = s.where(self.versions.c.cluster == cluster)
501
        s = s.where(n1.c.parent == n2.c.node)
502
        s = s.where(n2.c.parent == n3.c.node)
503
        s = s.where(n3.c.parent == 0)
504
        s = s.where(n3.c.node != 0)
505
        if account:
506
            s = s.where(n3.c.path == account)
507
        s = s.group_by(n3.c.path)
508
        r = self.conn.execute(s)
509
        usage = r.fetchall()
510
        r.close()
511
        return dict(usage)
512

    
513
    def policy_get(self, node):
514
        s = select([self.policy.c.key, self.policy.c.value],
515
                   self.policy.c.node == node)
516
        r = self.conn.execute(s)
517
        d = dict(r.fetchall())
518
        r.close()
519
        return d
520

    
521
    def policy_set(self, node, policy):
522
        #insert or replace
523
        for k, v in policy.iteritems():
524
            s = self.policy.update().where(and_(self.policy.c.node == node,
525
                                                self.policy.c.key == k))
526
            s = s.values(value=v)
527
            rp = self.conn.execute(s)
528
            rp.close()
529
            if rp.rowcount == 0:
530
                s = self.policy.insert()
531
                values = {'node': node, 'key': k, 'value': v}
532
                r = self.conn.execute(s, values)
533
                r.close()
534

    
535
    def statistics_get(self, node, cluster=0):
536
        """Return population, total size and last mtime
537
           for all versions under node that belong to the cluster.
538
        """
539

    
540
        s = select([self.statistics.c.population,
541
                    self.statistics.c.size,
542
                    self.statistics.c.mtime])
543
        s = s.where(and_(self.statistics.c.node == node,
544
                         self.statistics.c.cluster == cluster))
545
        r = self.conn.execute(s)
546
        row = r.fetchone()
547
        r.close()
548
        return row
549

    
550
    def statistics_update(self, node, population, size, mtime, cluster=0):
551
        """Update the statistics of the given node.
552
           Statistics keep track the population, total
553
           size of objects and mtime in the node's namespace.
554
           May be zero or positive or negative numbers.
555
        """
556
        s = select([self.statistics.c.population, self.statistics.c.size],
557
                   and_(self.statistics.c.node == node,
558
                        self.statistics.c.cluster == cluster))
559
        rp = self.conn.execute(s)
560
        r = rp.fetchone()
561
        rp.close()
562
        if not r:
563
            prepopulation, presize = (0, 0)
564
        else:
565
            prepopulation, presize = r
566
        population += prepopulation
567
        population = max(population, 0)
568
        size += presize
569

    
570
        #insert or replace
571
        #TODO better upsert
572
        u = self.statistics.update().where(and_(
573
            self.statistics.c.node == node,
574
            self.statistics.c.cluster == cluster))
575
        u = u.values(population=population, size=size, mtime=mtime)
576
        rp = self.conn.execute(u)
577
        rp.close()
578
        if rp.rowcount == 0:
579
            ins = self.statistics.insert()
580
            ins = ins.values(node=node, population=population, size=size,
581
                             mtime=mtime, cluster=cluster)
582
            self.conn.execute(ins).close()
583

    
584
    def statistics_update_ancestors(self, node, population, size, mtime,
585
                                    cluster=0, recursion_depth=None):
586
        """Update the statistics of the given node's parent.
587
           Then recursively update all parents up to the root
588
           or up to the ``recursion_depth`` (if not None).
589
           Population is not recursive.
590
        """
591

    
592
        i = 0
593
        while True:
594
            if node == ROOTNODE:
595
                break
596
            if recursion_depth and recursion_depth <= i:
597
                break
598
            props = self.node_get_properties(node)
599
            if props is None:
600
                break
601
            parent, path = props
602
            self.statistics_update(parent, population, size, mtime, cluster)
603
            node = parent
604
            population = 0  # Population isn't recursive
605
            i += 1
606

    
607
    def statistics_latest(self, node, before=inf, except_cluster=0):
608
        """Return population, total size and last mtime
609
           for all latest versions under node that
610
           do not belong to the cluster.
611
        """
612

    
613
        # The node.
614
        props = self.node_get_properties(node)
615
        if props is None:
616
            return None
617
        parent, path = props
618

    
619
        # The latest version.
620
        s = select([self.versions.c.serial,
621
                    self.versions.c.node,
622
                    self.versions.c.hash,
623
                    self.versions.c.size,
624
                    self.versions.c.type,
625
                    self.versions.c.source,
626
                    self.versions.c.mtime,
627
                    self.versions.c.muser,
628
                    self.versions.c.uuid,
629
                    self.versions.c.checksum,
630
                    self.versions.c.cluster])
631
        if before != inf:
632
            filtered = select([func.max(self.versions.c.serial)],
633
                              self.versions.c.node == node)
634
            filtered = filtered.where(self.versions.c.mtime < before)
635
        else:
636
            filtered = select([self.nodes.c.latest_version],
637
                              self.nodes.c.node == node)
638
        s = s.where(and_(self.versions.c.cluster != except_cluster,
639
                         self.versions.c.serial == filtered))
640
        r = self.conn.execute(s)
641
        props = r.fetchone()
642
        r.close()
643
        if not props:
644
            return None
645
        mtime = props[MTIME]
646

    
647
        # First level, just under node (get population).
648
        v = self.versions.alias('v')
649
        s = select([func.count(v.c.serial),
650
                    func.sum(v.c.size),
651
                    func.max(v.c.mtime)])
652
        if before != inf:
653
            c1 = select([func.max(self.versions.c.serial)],
654
                        and_(self.versions.c.mtime < before,
655
                             self.versions.c.node == v.c.node))
656
        else:
657
            c1 = select([self.nodes.c.latest_version])
658
            c1 = c1.where(self.nodes.c.node == v.c.node)
659
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
660
        s = s.where(and_(v.c.serial == c1,
661
                         v.c.cluster != except_cluster,
662
                         v.c.node.in_(c2)))
663
        rp = self.conn.execute(s)
664
        r = rp.fetchone()
665
        rp.close()
666
        if not r:
667
            return None
668
        count = r[0]
669
        mtime = max(mtime, r[2])
670
        if count == 0:
671
            return (0, 0, mtime)
672

    
673
        # All children (get size and mtime).
674
        # This is why the full path is stored.
675
        if before != inf:
676
            s = select([func.count(v.c.serial),
677
                    func.sum(v.c.size),
678
                    func.max(v.c.mtime)])
679
            c1 = select([func.max(self.versions.c.serial)],
680
                        and_(self.versions.c.mtime < before,
681
                             self.versions.c.node == v.c.node))
682
        else:
683
            inner_join = \
684
                    self.versions.join(self.nodes, onclause=\
685
                    self.versions.c.serial == self.nodes.c.latest_version)
686
            s = select([func.count(self.versions.c.serial),
687
                    func.sum(self.versions.c.size),
688
                    func.max(self.versions.c.mtime)], from_obj=[inner_join])
689

    
690
        c2 = select([self.nodes.c.node],
691
                    self.nodes.c.path.like(self.escape_like(path) + '%',
692
                                           escape=ESCAPE_CHAR))
693
        if before != inf:
694
            s = s.where(and_(v.c.serial == c1,
695
                         v.c.cluster != except_cluster,
696
                         v.c.node.in_(c2)))
697
        else:
698
            s = s.where(and_(self.versions.c.cluster != except_cluster,
699
                        self.versions.c.node.in_(c2)))
700

    
701
        rp = self.conn.execute(s)
702
        r = rp.fetchone()
703
        rp.close()
704
        if not r:
705
            return None
706
        size = long(r[1] - props[SIZE])
707
        mtime = max(mtime, r[2])
708
        return (count, size, mtime)
709

    
710
    def nodes_set_latest_version(self, node, serial):
711
        s = self.nodes.update().where(self.nodes.c.node == node)
712
        s = s.values(latest_version=serial)
713
        self.conn.execute(s).close()
714

    
715
    def version_create(self, node, hash, size, type, source, muser, uuid,
716
                       checksum, cluster=0,
717
                       update_statistics_ancestors_depth=None):
718
        """Create a new version from the given properties.
719
           Return the (serial, mtime) of the new version.
720
        """
721

    
722
        mtime = time()
723
        s = self.versions.insert().values(
724
            node=node, hash=hash, size=size, type=type, source=source,
725
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
726
            cluster=cluster)
727
        serial = self.conn.execute(s).inserted_primary_key[0]
728
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
729
                                         update_statistics_ancestors_depth)
730

    
731
        self.nodes_set_latest_version(node, serial)
732

    
733
        return serial, mtime
734

    
735
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
736
        """Lookup the current version of the given node.
737
           Return a list with its properties:
738
           (serial, node, hash, size, type, source, mtime,
739
            muser, uuid, checksum, cluster)
740
           or None if the current version is not found in the given cluster.
741
        """
742

    
743
        v = self.versions.alias('v')
744
        if not all_props:
745
            s = select([v.c.serial])
746
        else:
747
            s = select([v.c.serial, v.c.node, v.c.hash,
748
                        v.c.size, v.c.type, v.c.source,
749
                        v.c.mtime, v.c.muser, v.c.uuid,
750
                        v.c.checksum, v.c.cluster])
751
        if before != inf:
752
            c = select([func.max(self.versions.c.serial)],
753
                       self.versions.c.node == node)
754
            c = c.where(self.versions.c.mtime < before)
755
        else:
756
            c = select([self.nodes.c.latest_version],
757
                       self.nodes.c.node == node)
758
        s = s.where(and_(v.c.serial == c,
759
                         v.c.cluster == cluster))
760
        r = self.conn.execute(s)
761
        props = r.fetchone()
762
        r.close()
763
        if props:
764
            return props
765
        return None
766

    
767
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
768
                            all_props=True):
769
        """Lookup the current versions of the given nodes.
770
           Return a list with their properties:
771
           (serial, node, hash, size, type, source, mtime, muser, uuid,
772
            checksum, cluster).
773
        """
774
        if not nodes:
775
            return ()
776
        v = self.versions.alias('v')
777
        if not all_props:
778
            s = select([v.c.serial])
779
        else:
780
            s = select([v.c.serial, v.c.node, v.c.hash,
781
                        v.c.size, v.c.type, v.c.source,
782
                        v.c.mtime, v.c.muser, v.c.uuid,
783
                        v.c.checksum, v.c.cluster])
784
        if before != inf:
785
            c = select([func.max(self.versions.c.serial)],
786
                       self.versions.c.node.in_(nodes))
787
            c = c.where(self.versions.c.mtime < before)
788
            c = c.group_by(self.versions.c.node)
789
        else:
790
            c = select([self.nodes.c.latest_version],
791
                       self.nodes.c.node.in_(nodes))
792
        s = s.where(and_(v.c.serial.in_(c),
793
                         v.c.cluster == cluster))
794
        s = s.order_by(v.c.node)
795
        r = self.conn.execute(s)
796
        rproxy = r.fetchall()
797
        r.close()
798
        return (tuple(row.values()) for row in rproxy)
799

    
800
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
801
                               node=None):
802
        """Return a sequence of values for the properties of
803
           the version specified by serial and the keys, in the order given.
804
           If keys is empty, return all properties in the order
805
           (serial, node, hash, size, type, source, mtime, muser, uuid,
806
            checksum, cluster).
807
        """
808

    
809
        v = self.versions.alias()
810
        s = select([v.c.serial, v.c.node, v.c.hash,
811
                    v.c.size, v.c.type, v.c.source,
812
                    v.c.mtime, v.c.muser, v.c.uuid,
813
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
814
        if node is not None:
815
            s = s.where(v.c.node == node)
816
        rp = self.conn.execute(s)
817
        r = rp.fetchone()
818
        rp.close()
819
        if r is None:
820
            return r
821

    
822
        if not keys:
823
            return r
824
        return [r[propnames[k]] for k in keys if k in propnames]
825

    
826
    def version_put_property(self, serial, key, value):
827
        """Set value for the property of version specified by key."""
828

    
829
        if key not in _propnames:
830
            return
831
        s = self.versions.update()
832
        s = s.where(self.versions.c.serial == serial)
833
        s = s.values(**{key: value})
834
        self.conn.execute(s).close()
835

    
836
    def version_recluster(self, serial, cluster,
837
                          update_statistics_ancestors_depth=None):
838
        """Move the version into another cluster."""
839

    
840
        props = self.version_get_properties(serial)
841
        if not props:
842
            return
843
        node = props[NODE]
844
        size = props[SIZE]
845
        oldcluster = props[CLUSTER]
846
        if cluster == oldcluster:
847
            return
848

    
849
        mtime = time()
850
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
851
                                         update_statistics_ancestors_depth)
852
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
853
                                         update_statistics_ancestors_depth)
854

    
855
        s = self.versions.update()
856
        s = s.where(self.versions.c.serial == serial)
857
        s = s.values(cluster=cluster)
858
        self.conn.execute(s).close()
859

    
860
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
861
        """Remove the serial specified."""
862

    
863
        props = self.version_get_properties(serial)
864
        if not props:
865
            return
866
        node = props[NODE]
867
        hash = props[HASH]
868
        size = props[SIZE]
869
        cluster = props[CLUSTER]
870

    
871
        mtime = time()
872
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
873
                                         update_statistics_ancestors_depth)
874

    
875
        s = self.versions.delete().where(self.versions.c.serial == serial)
876
        self.conn.execute(s).close()
877

    
878
        props = self.version_lookup(node, cluster=cluster, all_props=False)
879
        if props:
880
            self.nodes_set_latest_version(node, serial)
881

    
882
        return hash, size
883

    
884
    def attribute_get(self, serial, domain, keys=()):
885
        """Return a list of (key, value) pairs of the specific version.
886

887
        If keys is empty, return all attributes.
888
        Othwerise, return only those specified.
889
        """
890

    
891
        if keys:
892
            attrs = self.attributes.alias()
893
            s = select([attrs.c.key, attrs.c.value])
894
            s = s.where(and_(attrs.c.key.in_(keys),
895
                             attrs.c.serial == serial,
896
                             attrs.c.domain == domain))
897
        else:
898
            attrs = self.attributes.alias()
899
            s = select([attrs.c.key, attrs.c.value])
900
            s = s.where(and_(attrs.c.serial == serial,
901
                             attrs.c.domain == domain))
902
        r = self.conn.execute(s)
903
        l = r.fetchall()
904
        r.close()
905
        return l
906

    
907
    def attribute_set(self, serial, domain, node, items, is_latest=True):
908
        """Set the attributes of the version specified by serial.
909
           Receive attributes as an iterable of (key, value) pairs.
910
        """
911
        #insert or replace
912
        #TODO better upsert
913
        for k, v in items:
914
            s = self.attributes.update()
915
            s = s.where(and_(self.attributes.c.serial == serial,
916
                             self.attributes.c.domain == domain,
917
                             self.attributes.c.key == k))
918
            s = s.values(value=v)
919
            rp = self.conn.execute(s)
920
            rp.close()
921
            if rp.rowcount == 0:
922
                s = self.attributes.insert()
923
                s = s.values(serial=serial, domain=domain, node=node,
924
                             is_latest=is_latest, key=k, value=v)
925
                self.conn.execute(s).close()
926

    
927
    def attribute_del(self, serial, domain, keys=()):
928
        """Delete attributes of the version specified by serial.
929
           If keys is empty, delete all attributes.
930
           Otherwise delete those specified.
931
        """
932

    
933
        if keys:
934
            #TODO more efficient way to do this?
935
            for key in keys:
936
                s = self.attributes.delete()
937
                s = s.where(and_(self.attributes.c.serial == serial,
938
                                 self.attributes.c.domain == domain,
939
                                 self.attributes.c.key == key))
940
                self.conn.execute(s).close()
941
        else:
942
            s = self.attributes.delete()
943
            s = s.where(and_(self.attributes.c.serial == serial,
944
                             self.attributes.c.domain == domain))
945
            self.conn.execute(s).close()
946

    
947
    def attribute_copy(self, source, dest):
948
        s = select(
949
            [dest, self.attributes.c.domain, self.attributes.c.node,
950
             self.attributes.c.key, self.attributes.c.value],
951
            self.attributes.c.serial == source)
952
        rp = self.conn.execute(s)
953
        attributes = rp.fetchall()
954
        rp.close()
955
        for dest, domain, node, k, v in attributes:
956
            select_src_node = select(
957
                [self.versions.c.node],
958
                self.versions.c.serial == dest)
959
            # insert or replace
960
            s = self.attributes.update().where(and_(
961
                self.attributes.c.serial == dest,
962
                self.attributes.c.domain == domain,
963
                self.attributes.c.key == k))
964
            s = s.values(node=select_src_node, value=v)
965
            rp = self.conn.execute(s)
966
            rp.close()
967
            if rp.rowcount == 0:
968
                s = self.attributes.insert()
969
                s = s.values(serial=dest, domain=domain, node=select_src_node,
970
                             is_latest=True, key=k, value=v)
971
            self.conn.execute(s).close()
972

    
973
    def attribute_unset_is_latest(self, node, exclude):
974
        u = self.attributes.update().where(and_(
975
            self.attributes.c.node == node,
976
            self.attributes.c.serial != exclude)).values({'is_latest': False})
977
        self.conn.execute(u)
978

    
979
    def latest_attribute_keys(self, parent, domain, before=inf,
980
                              except_cluster=0, pathq=None):
981
        """Return a list with all keys pairs defined
982
           for all latest versions under parent that
983
           do not belong to the cluster.
984
        """
985

    
986
        pathq = pathq or []
987

    
988
        # TODO: Use another table to store before=inf results.
989
        v = self.versions.alias('v')
990
        s = select([self.attributes.c.key]).distinct()
991
        if before != inf:
992
            filtered = select([func.max(v.c.serial)],
993
                              and_(v.c.mtime < before,
994
                                   v.c.node == self.versions.c.node))
995
        else:
996
            filtered = select([self.nodes.c.latest_version])
997
            filtered = filtered.where(self.nodes.c.node == \
998
                            self.versions.c.node).correlate(self.versions)
999
        s = s.where(self.versions.c.serial == filtered)
1000
        s = s.where(self.versions.c.cluster != except_cluster)
1001
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1002
                                        self.nodes.c.parent == parent)))
1003
        s = s.where(self.attributes.c.serial == self.versions.c.serial)
1004
        s = s.where(self.attributes.c.domain == domain)
1005
        s = s.where(self.nodes.c.node == self.versions.c.node)
1006
        s = s.order_by(self.attributes.c.key)
1007
        conja = []
1008
        conjb = []
1009
        for path, match in pathq:
1010
            if match == MATCH_PREFIX:
1011
                conja.append(self.nodes.c.path.like(self.escape_like(path) + '%',
1012
                                          escape=ESCAPE_CHAR))
1013
            elif match == MATCH_EXACT:
1014
                conjb.append(path)
1015
        if conja or conjb:
1016
            s = s.where(or_(self.nodes.c.path.in_(conjb),*conja))
1017
        rp = self.conn.execute(s)
1018
        rows = rp.fetchall()
1019
        rp.close()
1020
        return [r[0] for r in rows]
1021

    
1022
    def latest_version_list(self, parent, prefix='', delimiter=None,
1023
                            start='', limit=10000, before=inf,
1024
                            except_cluster=0, pathq=[], domain=None,
1025
                            filterq=[], sizeq=None, all_props=False):
1026
        """Return a (list of (path, serial) tuples, list of common prefixes)
1027
           for the current versions of the paths with the given parent,
1028
           matching the following criteria.
1029

1030
           The property tuple for a version is returned if all
1031
           of these conditions are true:
1032

1033
                a. parent matches
1034

1035
                b. path > start
1036

1037
                c. path starts with prefix (and paths in pathq)
1038

1039
                d. version is the max up to before
1040

1041
                e. version is not in cluster
1042

1043
                f. the path does not have the delimiter occuring
1044
                   after the prefix, or ends with the delimiter
1045

1046
                g. serial matches the attribute filter query.
1047

1048
                   A filter query is a comma-separated list of
1049
                   terms in one of these three forms:
1050

1051
                   key
1052
                       an attribute with this key must exist
1053

1054
                   !key
1055
                       an attribute with this key must not exist
1056

1057
                   key ?op value
1058
                       the attribute with this key satisfies the value
1059
                       where ?op is one of ==, != <=, >=, <, >.
1060

1061
                h. the size is in the range set by sizeq
1062

1063
           The list of common prefixes includes the prefixes
1064
           matching up to the first delimiter after prefix,
1065
           and are reported only once, as "virtual directories".
1066
           The delimiter is included in the prefixes.
1067

1068
           If arguments are None, then the corresponding matching rule
1069
           will always match.
1070

1071
           Limit applies to the first list of tuples returned.
1072

1073
           If all_props is True, return all properties after path,
1074
           not just serial.
1075
        """
1076

    
1077
        if not start or start < prefix:
1078
            start = strprevling(prefix)
1079
        nextling = strnextling(prefix)
1080

    
1081
        v = self.versions.alias('v')
1082
        n = self.nodes.alias('n')
1083
        if before != inf:
1084
            filtered = select([func.max(v.c.serial)],
1085
                              and_(v.c.mtime < before,
1086
                                   v.c.node == self.versions.c.node))
1087
            inner_join = \
1088
                    self.nodes.join(self.versions,
1089
                            onclause=self.versions.c.serial==filtered)
1090
        else:
1091
            filtered = select([self.nodes.c.latest_version])
1092
            filtered = filtered.where(self.nodes.c.node == self.versions.c.node).correlate(self.versions)
1093
            inner_join = \
1094
                    self.nodes.join(self.versions,
1095
                            onclause=\
1096
                            self.versions.c.serial==filtered)
1097
        if not all_props:
1098
            s = select([self.nodes.c.path,
1099
                self.versions.c.serial],from_obj=[inner_join]).distinct()
1100
        else:
1101
            s = select([self.nodes.c.path,
1102
                        self.versions.c.serial, self.versions.c.node,
1103
                        self.versions.c.hash,
1104
                        self.versions.c.size, self.versions.c.type,
1105
                        self.versions.c.source,
1106
                        self.versions.c.mtime, self.versions.c.muser,
1107
                        self.versions.c.uuid,
1108
                        self.versions.c.checksum,
1109
                        self.versions.c.cluster],from_obj=[inner_join]).distinct()
1110

    
1111
        s = s.where(self.versions.c.cluster != except_cluster)
1112
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1113
                                             self.nodes.c.parent == parent)))
1114

    
1115
        s = s.where(self.versions.c.node == self.nodes.c.node)
1116
        s = s.where(and_(self.nodes.c.path > bindparam('start'),
1117
                         self.nodes.c.path < nextling))
1118
        conja = []
1119
        conjb = []
1120
        for path, match in pathq:
1121
            if match == MATCH_PREFIX:
1122
                conja.append(self.nodes.c.path.like(self.escape_like(path) + '%',
1123
                             escape=ESCAPE_CHAR))
1124
            elif match == MATCH_EXACT:
1125
                conjb.append(path)
1126
        if conja or conjb:
1127
            s = s.where(or_(self.nodes.c.path.in_(conjb),*conja))
1128

    
1129
        if sizeq and len(sizeq) == 2:
1130
            if sizeq[0]:
1131
                s = s.where(self.versions.c.size >= sizeq[0])
1132
            if sizeq[1]:
1133
                s = s.where(self.versions.c.size < sizeq[1])
1134

    
1135
        if domain and filterq:
1136
            a = self.attributes.alias('a')
1137
            included, excluded, opers = parse_filters(filterq)
1138
            if included:
1139
                subs = select([1])
1140
                subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1141
                subs = subs.where(self.attributes.c.domain == domain)
1142
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in included]))
1143
                s = s.where(exists(subs))
1144
            if excluded:
1145
                subs = select([1])
1146
                subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1147
                subs = subs.where(self.attributes.c.domain == domain)
1148
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in excluded]))
1149
                s = s.where(not_(exists(subs)))
1150
            if opers:
1151
                for k, o, val in opers:
1152
                    subs = select([1])
1153
                    subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1154
                    subs = subs.where(self.attributes.c.domain == domain)
1155
                    subs = subs.where(
1156
                        and_(self.attributes.c.key.op('=')(k), self.attributes.c.value.op(o)(val)))
1157
                    s = s.where(exists(subs))
1158

    
1159
        s = s.order_by(self.nodes.c.path)
1160

    
1161
        if not delimiter:
1162
            s = s.limit(limit)
1163
            rp = self.conn.execute(s, start=start)
1164
            r = rp.fetchall()
1165
            rp.close()
1166
            return r, ()
1167

    
1168
        pfz = len(prefix)
1169
        dz = len(delimiter)
1170
        count = 0
1171
        prefixes = []
1172
        pappend = prefixes.append
1173
        matches = []
1174
        mappend = matches.append
1175

    
1176
        rp = self.conn.execute(s, start=start)
1177
        while True:
1178
            props = rp.fetchone()
1179
            if props is None:
1180
                break
1181
            path = props[0]
1182
            idx = path.find(delimiter, pfz)
1183

    
1184
            if idx < 0:
1185
                mappend(props)
1186
                count += 1
1187
                if count >= limit:
1188
                    break
1189
                continue
1190

    
1191
            if idx + dz == len(path):
1192
                mappend(props)
1193
                count += 1
1194
                continue  # Get one more, in case there is a path.
1195
            pf = path[:idx + dz]
1196
            pappend(pf)
1197
            if count >= limit:
1198
                break
1199

    
1200
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1201
        rp.close()
1202

    
1203
        return matches, prefixes
1204

    
1205
    def latest_uuid(self, uuid, cluster):
1206
        """Return the latest version of the given uuid and cluster.
1207

1208
        Return a (path, serial) tuple.
1209
        If cluster is None, all clusters are considered.
1210

1211
        """
1212

    
1213
        v = self.versions.alias('v')
1214
        n = self.nodes.alias('n')
1215
        s = select([n.c.path, v.c.serial])
1216
        filtered = select([func.max(self.versions.c.serial)])
1217
        filtered = filtered.where(self.versions.c.uuid == uuid)
1218
        if cluster is not None:
1219
            filtered = filtered.where(self.versions.c.cluster == cluster)
1220
        s = s.where(v.c.serial == filtered)
1221
        s = s.where(n.c.node == v.c.node)
1222

    
1223
        r = self.conn.execute(s)
1224
        l = r.fetchone()
1225
        r.close()
1226
        return l
1227

    
1228
    def domain_object_list(self, domain, paths, cluster=None):
1229
        """Return a list of (path, property list, attribute dictionary)
1230
           for the objects in the specific domain and cluster.
1231
        """
1232

    
1233
        v = self.versions.alias('v')
1234
        n = self.nodes.alias('n')
1235
        a = self.attributes.alias('a')
1236

    
1237
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1238
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1239
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1240
        if cluster:
1241
            s = s.where(v.c.cluster == cluster)
1242
        s = s.where(v.c.serial == a.c.serial)
1243
        s = s.where(a.c.domain == domain)
1244
        s = s.where(a.c.node == n.c.node)
1245
        s = s.where(a.c.is_latest == True)
1246
        if paths:
1247
            s = s.where(n.c.path.in_(paths))
1248

    
1249
        r = self.conn.execute(s)
1250
        rows = r.fetchall()
1251
        r.close()
1252

    
1253
        group_by = itemgetter(slice(12))
1254
        rows.sort(key=group_by)
1255
        groups = groupby(rows, group_by)
1256
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1257
                (k, data) in groups]
1258

    
1259
    def get_props(self, paths):
1260
        inner_join = \
1261
            self.nodes.join(self.versions,
1262
                onclause=self.versions.c.serial == self.nodes.c.latest_version)
1263
        cc = self.nodes.c.path.in_(paths)
1264
        s = select([self.nodes.c.path, self.versions.c.type],
1265
                    from_obj=[inner_join]).where(cc).distinct()
1266
        r = self.conn.execute(s)
1267
        rows = r.fetchall()
1268
        r.close()
1269
        if rows:
1270
            return rows
1271
        return None