Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ dc88754b

History | View | Annotate | Download (46.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.exc import NoSuchTableError
43

    
44
from dbworker import DBWorker, ESCAPE_CHAR
45

    
46
from pithos.backends.filter import parse_filters
47

    
48

    
49
ROOTNODE = 0
50

    
51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER) = range(11)
53

    
54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55

    
56
inf = float('inf')
57

    
58

    
59
def strnextling(prefix):
60
    """Return the first unicode string
61
       greater than but not starting with given prefix.
62
       strnextling('hello') -> 'hellp'
63
    """
64
    if not prefix:
65
        ## all strings start with the null string,
66
        ## therefore we have to approximate strnextling('')
67
        ## with the last unicode character supported by python
68
        ## 0x10ffff for wide (32-bit unicode) python builds
69
        ## 0x00ffff for narrow (16-bit unicode) python builds
70
        ## We will not autodetect. 0xffff is safe enough.
71
        return unichr(0xffff)
72
    s = prefix[:-1]
73
    c = ord(prefix[-1])
74
    if c >= 0xffff:
75
        raise RuntimeError
76
    s += unichr(c + 1)
77
    return s
78

    
79

    
80
def strprevling(prefix):
81
    """Return an approximation of the last unicode string
82
       less than but not starting with given prefix.
83
       strprevling(u'hello') -> u'helln\\xffff'
84
    """
85
    if not prefix:
86
        ## There is no prevling for the null string
87
        return prefix
88
    s = prefix[:-1]
89
    c = ord(prefix[-1])
90
    if c > 0:
91
        s += unichr(c - 1) + unichr(0xffff)
92
    return s
93

    
94
_propnames = {
95
    'serial': 0,
96
    'node': 1,
97
    'hash': 2,
98
    'size': 3,
99
    'type': 4,
100
    'source': 5,
101
    'mtime': 6,
102
    'muser': 7,
103
    'uuid': 8,
104
    'checksum': 9,
105
    'cluster': 10
106
}
107

    
108

    
109
def create_tables(engine):
110
    metadata = MetaData()
111

    
112
    #create nodes table
113
    columns = []
114
    columns.append(Column('node', Integer, primary_key=True))
115
    columns.append(Column('parent', Integer,
116
                          ForeignKey('nodes.node',
117
                                     ondelete='CASCADE',
118
                                     onupdate='CASCADE'),
119
                          autoincrement=False))
120
    columns.append(Column('latest_version', Integer))
121
    columns.append(Column('path', String(2048), default='', nullable=False))
122
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123
    Index('idx_nodes_path', nodes.c.path, unique=True)
124
    Index('idx_nodes_parent', nodes.c.parent)
125

    
126
    #create policy table
127
    columns = []
128
    columns.append(Column('node', Integer,
129
                          ForeignKey('nodes.node',
130
                                     ondelete='CASCADE',
131
                                     onupdate='CASCADE'),
132
                          primary_key=True))
133
    columns.append(Column('key', String(128), primary_key=True))
134
    columns.append(Column('value', String(256)))
135
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
136

    
137
    #create statistics table
138
    columns = []
139
    columns.append(Column('node', Integer,
140
                          ForeignKey('nodes.node',
141
                                     ondelete='CASCADE',
142
                                     onupdate='CASCADE'),
143
                          primary_key=True))
144
    columns.append(Column('population', Integer, nullable=False, default=0))
145
    columns.append(Column('size', BigInteger, nullable=False, default=0))
146
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
147
    columns.append(Column('cluster', Integer, nullable=False, default=0,
148
                          primary_key=True, autoincrement=False))
149
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
150

    
151
    #create versions table
152
    columns = []
153
    columns.append(Column('serial', Integer, primary_key=True))
154
    columns.append(Column('node', Integer,
155
                          ForeignKey('nodes.node',
156
                                     ondelete='CASCADE',
157
                                     onupdate='CASCADE')))
158
    columns.append(Column('hash', String(256)))
159
    columns.append(Column('size', BigInteger, nullable=False, default=0))
160
    columns.append(Column('type', String(256), nullable=False, default=''))
161
    columns.append(Column('source', Integer))
162
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
163
    columns.append(Column('muser', String(256), nullable=False, default=''))
164
    columns.append(Column('uuid', String(64), nullable=False, default=''))
165
    columns.append(Column('checksum', String(256), nullable=False, default=''))
166
    columns.append(Column('cluster', Integer, nullable=False, default=0))
167
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
168
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
169
    Index('idx_versions_node_uuid', versions.c.uuid)
170

    
171
    #create attributes table
172
    columns = []
173
    columns.append(Column('serial', Integer,
174
                          ForeignKey('versions.serial',
175
                                     ondelete='CASCADE',
176
                                     onupdate='CASCADE'),
177
                          primary_key=True))
178
    columns.append(Column('domain', String(256), primary_key=True))
179
    columns.append(Column('key', String(128), primary_key=True))
180
    columns.append(Column('value', String(256)))
181
    columns.append(Column('node', Integer, nullable=False, default=0))
182
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
183
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
184
    Index('idx_attributes_domain', attributes.c.domain)
185
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
186

    
187
    metadata.create_all(engine)
188
    return metadata.sorted_tables
189

    
190

    
191
class Node(DBWorker):
192
    """Nodes store path organization and have multiple versions.
193
       Versions store object history and have multiple attributes.
194
       Attributes store metadata.
195
    """
196

    
197
    # TODO: Provide an interface for included and excluded clusters.
198

    
199
    def __init__(self, **params):
200
        DBWorker.__init__(self, **params)
201
        try:
202
            metadata = MetaData(self.engine)
203
            self.nodes = Table('nodes', metadata, autoload=True)
204
            self.policy = Table('policy', metadata, autoload=True)
205
            self.statistics = Table('statistics', metadata, autoload=True)
206
            self.versions = Table('versions', metadata, autoload=True)
207
            self.attributes = Table('attributes', metadata, autoload=True)
208
        except NoSuchTableError:
209
            tables = create_tables(self.engine)
210
            map(lambda t: self.__setattr__(t.name, t), tables)
211

    
212
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
213
                                           self.nodes.c.parent == ROOTNODE))
214
        wrapper = self.wrapper
215
        wrapper.execute()
216
        try:
217
            rp = self.conn.execute(s)
218
            r = rp.fetchone()
219
            rp.close()
220
            if not r:
221
                s = self.nodes.insert(
222
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
223
                self.conn.execute(s)
224
        finally:
225
            wrapper.commit()
226

    
227
    def node_create(self, parent, path):
228
        """Create a new node from the given properties.
229
           Return the node identifier of the new node.
230
        """
231
        #TODO catch IntegrityError?
232
        s = self.nodes.insert().values(parent=parent, path=path)
233
        r = self.conn.execute(s)
234
        inserted_primary_key = r.inserted_primary_key[0]
235
        r.close()
236
        return inserted_primary_key
237

    
238
    def node_lookup(self, path, for_update=False):
239
        """Lookup the current node of the given path.
240
           Return None if the path is not found.
241
        """
242

    
243
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
244
        s = select([self.nodes.c.node],
245
                   self.nodes.c.path.like(self.escape_like(path),
246
                                          escape=ESCAPE_CHAR),
247
                   for_update=for_update)
248
        r = self.conn.execute(s)
249
        row = r.fetchone()
250
        r.close()
251
        if row:
252
            return row[0]
253
        return None
254

    
255
    def node_lookup_bulk(self, paths):
256
        """Lookup the current nodes for the given paths.
257
           Return () if the path is not found.
258
        """
259

    
260
        if not paths:
261
            return ()
262
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
263
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
264
        r = self.conn.execute(s)
265
        rows = r.fetchall()
266
        r.close()
267
        return [row[0] for row in rows]
268

    
269
    def node_get_properties(self, node):
270
        """Return the node's (parent, path).
271
           Return None if the node is not found.
272
        """
273

    
274
        s = select([self.nodes.c.parent, self.nodes.c.path])
275
        s = s.where(self.nodes.c.node == node)
276
        r = self.conn.execute(s)
277
        l = r.fetchone()
278
        r.close()
279
        return l
280

    
281
    def node_get_versions(self, node, keys=(), propnames=_propnames):
282
        """Return the properties of all versions at node.
283
           If keys is empty, return all properties in the order
284
           (serial, node, hash, size, type, source, mtime, muser, uuid,
285
            checksum, cluster).
286
        """
287

    
288
        s = select([self.versions.c.serial,
289
                    self.versions.c.node,
290
                    self.versions.c.hash,
291
                    self.versions.c.size,
292
                    self.versions.c.type,
293
                    self.versions.c.source,
294
                    self.versions.c.mtime,
295
                    self.versions.c.muser,
296
                    self.versions.c.uuid,
297
                    self.versions.c.checksum,
298
                    self.versions.c.cluster], self.versions.c.node == node)
299
        s = s.order_by(self.versions.c.serial)
300
        r = self.conn.execute(s)
301
        rows = r.fetchall()
302
        r.close()
303
        if not rows:
304
            return rows
305

    
306
        if not keys:
307
            return rows
308

    
309
        return [[p[propnames[k]] for k in keys if k in propnames] for
310
                p in rows]
311

    
312
    def node_count_children(self, node):
313
        """Return node's child count."""
314

    
315
        s = select([func.count(self.nodes.c.node)])
316
        s = s.where(and_(self.nodes.c.parent == node,
317
                         self.nodes.c.node != ROOTNODE))
318
        r = self.conn.execute(s)
319
        row = r.fetchone()
320
        r.close()
321
        return row[0]
322

    
323
    def node_purge_children(self, parent, before=inf, cluster=0,
324
                            update_statistics_ancestors_depth=None):
325
        """Delete all versions with the specified
326
           parent and cluster, and return
327
           the hashes, the total size and the serials of versions deleted.
328
           Clears out nodes with no remaining versions.
329
        """
330
        #update statistics
331
        c1 = select([self.nodes.c.node],
332
                    self.nodes.c.parent == parent)
333
        where_clause = and_(self.versions.c.node.in_(c1),
334
                            self.versions.c.cluster == cluster)
335
        if before != inf:
336
            where_clause = and_(where_clause,
337
                                self.versions.c.mtime <= before)
338
        s = select([func.count(self.versions.c.serial),
339
                    func.sum(self.versions.c.size)])
340
        s = s.where(where_clause)
341
        r = self.conn.execute(s)
342
        row = r.fetchone()
343
        r.close()
344
        if not row:
345
            return (), 0, ()
346
        nr, size = row[0], row[1] if row[1] else 0
347
        mtime = time()
348
        self.statistics_update(parent, -nr, -size, mtime, cluster)
349
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
350
                                         update_statistics_ancestors_depth)
351

    
352
        s = select([self.versions.c.hash, self.versions.c.serial])
353
        s = s.where(where_clause)
354
        r = self.conn.execute(s)
355
        hashes = []
356
        serials = []
357
        for row in r.fetchall():
358
            hashes += [row[0]]
359
            serials += [row[1]]
360
        r.close()
361

    
362
        #delete versions
363
        s = self.versions.delete().where(where_clause)
364
        r = self.conn.execute(s)
365
        r.close()
366

    
367
        #delete nodes
368
        s = select([self.nodes.c.node],
369
                   and_(self.nodes.c.parent == parent,
370
                        select([func.count(self.versions.c.serial)],
371
                               self.versions.c.node == self.nodes.c.node).
372
                        as_scalar() == 0))
373
        rp = self.conn.execute(s)
374
        nodes = [row[0] for row in rp.fetchall()]
375
        rp.close()
376
        if nodes:
377
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
378
            self.conn.execute(s).close()
379

    
380
        return hashes, size, serials
381

    
382
    def node_purge(self, node, before=inf, cluster=0,
383
                   update_statistics_ancestors_depth=None):
384
        """Delete all versions with the specified
385
           node and cluster, and return
386
           the hashes and size of versions deleted.
387
           Clears out the node if it has no remaining versions.
388
        """
389

    
390
        #update statistics
391
        s = select([func.count(self.versions.c.serial),
392
                    func.sum(self.versions.c.size)])
393
        where_clause = and_(self.versions.c.node == node,
394
                            self.versions.c.cluster == cluster)
395
        if before != inf:
396
            where_clause = and_(where_clause,
397
                                self.versions.c.mtime <= before)
398
        s = s.where(where_clause)
399
        r = self.conn.execute(s)
400
        row = r.fetchone()
401
        nr, size = row[0], row[1]
402
        r.close()
403
        if not nr:
404
            return (), 0, ()
405
        mtime = time()
406
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
407
                                         update_statistics_ancestors_depth)
408

    
409
        s = select([self.versions.c.hash, self.versions.c.serial])
410
        s = s.where(where_clause)
411
        r = self.conn.execute(s)
412
        hashes = []
413
        serials = []
414
        for row in r.fetchall():
415
            hashes += [row[0]]
416
            serials += [row[1]]
417
        r.close()
418

    
419
        #delete versions
420
        s = self.versions.delete().where(where_clause)
421
        r = self.conn.execute(s)
422
        r.close()
423

    
424
        #delete nodes
425
        s = select([self.nodes.c.node],
426
                   and_(self.nodes.c.node == node,
427
                        select([func.count(self.versions.c.serial)],
428
                               self.versions.c.node == self.nodes.c.node).
429
                        as_scalar() == 0))
430
        rp = self.conn.execute(s)
431
        nodes = [row[0] for row in rp.fetchall()]
432
        rp.close()
433
        if nodes:
434
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
435
            self.conn.execute(s).close()
436

    
437
        return hashes, size, serials
438

    
439
    def node_remove(self, node, update_statistics_ancestors_depth=None):
440
        """Remove the node specified.
441
           Return false if the node has children or is not found.
442
        """
443

    
444
        if self.node_count_children(node):
445
            return False
446

    
447
        mtime = time()
448
        s = select([func.count(self.versions.c.serial),
449
                    func.sum(self.versions.c.size),
450
                    self.versions.c.cluster])
451
        s = s.where(self.versions.c.node == node)
452
        s = s.group_by(self.versions.c.cluster)
453
        r = self.conn.execute(s)
454
        for population, size, cluster in r.fetchall():
455
            self.statistics_update_ancestors(
456
                node, -population, -size, mtime, cluster,
457
                update_statistics_ancestors_depth)
458
        r.close()
459

    
460
        s = self.nodes.delete().where(self.nodes.c.node == node)
461
        self.conn.execute(s).close()
462
        return True
463

    
464
    def node_accounts(self, accounts=()):
465
        s = select([self.nodes.c.path, self.nodes.c.node])
466
        s = s.where(and_(self.nodes.c.node != 0,
467
                         self.nodes.c.parent == 0))
468
        if accounts:
469
            s = s.where(self.nodes.c.path.in_(accounts))
470
        r = self.conn.execute(s)
471
        rows = r.fetchall()
472
        r.close()
473
        return rows
474

    
475
    def node_account_quotas(self):
476
        s = select([self.nodes.c.path, self.policy.c.value])
477
        s = s.where(and_(self.nodes.c.node != 0,
478
                         self.nodes.c.parent == 0))
479
        s = s.where(self.nodes.c.node == self.policy.c.node)
480
        s = s.where(self.policy.c.key == 'quota')
481
        r = self.conn.execute(s)
482
        rows = r.fetchall()
483
        r.close()
484
        return dict(rows)
485

    
486
    def node_account_usage(self, account=None, cluster=0):
487
        """Return usage for a specific account.
488

489
        Keyword arguments:
490
        account -- (default None: list usage for all the accounts)
491
        cluster -- list current, history or deleted usage (default 0: normal)
492
        """
493

    
494
        n1 = self.nodes.alias('n1')
495
        n2 = self.nodes.alias('n2')
496
        n3 = self.nodes.alias('n3')
497

    
498
        s = select([n3.c.path, func.sum(self.versions.c.size)])
499
        s = s.where(n1.c.node == self.versions.c.node)
500
        s = s.where(self.versions.c.cluster == cluster)
501
        s = s.where(n1.c.parent == n2.c.node)
502
        s = s.where(n2.c.parent == n3.c.node)
503
        s = s.where(n3.c.parent == 0)
504
        s = s.where(n3.c.node != 0)
505
        if account:
506
            s = s.where(n3.c.path == account)
507
        s = s.group_by(n3.c.path)
508
        r = self.conn.execute(s)
509
        usage = r.fetchall()
510
        r.close()
511
        return dict(usage)
512

    
513
    def policy_get(self, node):
514
        s = select([self.policy.c.key, self.policy.c.value],
515
                   self.policy.c.node == node)
516
        r = self.conn.execute(s)
517
        d = dict(r.fetchall())
518
        r.close()
519
        return d
520

    
521
    def policy_set(self, node, policy):
522
        #insert or replace
523
        for k, v in policy.iteritems():
524
            s = self.policy.update().where(and_(self.policy.c.node == node,
525
                                                self.policy.c.key == k))
526
            s = s.values(value=v)
527
            rp = self.conn.execute(s)
528
            rp.close()
529
            if rp.rowcount == 0:
530
                s = self.policy.insert()
531
                values = {'node': node, 'key': k, 'value': v}
532
                r = self.conn.execute(s, values)
533
                r.close()
534

    
535
    def statistics_get(self, node, cluster=0):
536
        """Return population, total size and last mtime
537
           for all versions under node that belong to the cluster.
538
        """
539

    
540
        s = select([self.statistics.c.population,
541
                    self.statistics.c.size,
542
                    self.statistics.c.mtime])
543
        s = s.where(and_(self.statistics.c.node == node,
544
                         self.statistics.c.cluster == cluster))
545
        r = self.conn.execute(s)
546
        row = r.fetchone()
547
        r.close()
548
        return row
549

    
550
    def statistics_update(self, node, population, size, mtime, cluster=0):
551
        """Update the statistics of the given node.
552
           Statistics keep track the population, total
553
           size of objects and mtime in the node's namespace.
554
           May be zero or positive or negative numbers.
555
        """
556
        s = select([self.statistics.c.population, self.statistics.c.size],
557
                   and_(self.statistics.c.node == node,
558
                        self.statistics.c.cluster == cluster))
559
        rp = self.conn.execute(s)
560
        r = rp.fetchone()
561
        rp.close()
562
        if not r:
563
            prepopulation, presize = (0, 0)
564
        else:
565
            prepopulation, presize = r
566
        population += prepopulation
567
        population = max(population, 0)
568
        size += presize
569

    
570
        #insert or replace
571
        #TODO better upsert
572
        u = self.statistics.update().where(and_(
573
            self.statistics.c.node == node,
574
            self.statistics.c.cluster == cluster))
575
        u = u.values(population=population, size=size, mtime=mtime)
576
        rp = self.conn.execute(u)
577
        rp.close()
578
        if rp.rowcount == 0:
579
            ins = self.statistics.insert()
580
            ins = ins.values(node=node, population=population, size=size,
581
                             mtime=mtime, cluster=cluster)
582
            self.conn.execute(ins).close()
583

    
584
    def statistics_update_ancestors(self, node, population, size, mtime,
585
                                    cluster=0, recursion_depth=None):
586
        """Update the statistics of the given node's parent.
587
           Then recursively update all parents up to the root
588
           or up to the ``recursion_depth`` (if not None).
589
           Population is not recursive.
590
        """
591

    
592
        i = 0
593
        while True:
594
            if node == ROOTNODE:
595
                break
596
            if recursion_depth and recursion_depth <= i:
597
                break
598
            props = self.node_get_properties(node)
599
            if props is None:
600
                break
601
            parent, path = props
602
            self.statistics_update(parent, population, size, mtime, cluster)
603
            node = parent
604
            population = 0  # Population isn't recursive
605
            i += 1
606

    
607
    def statistics_latest(self, node, before=inf, except_cluster=0):
608
        """Return population, total size and last mtime
609
           for all latest versions under node that
610
           do not belong to the cluster.
611
        """
612

    
613
        # The node.
614
        props = self.node_get_properties(node)
615
        if props is None:
616
            return None
617
        parent, path = props
618

    
619
        # The latest version.
620
        s = select([self.versions.c.serial,
621
                    self.versions.c.node,
622
                    self.versions.c.hash,
623
                    self.versions.c.size,
624
                    self.versions.c.type,
625
                    self.versions.c.source,
626
                    self.versions.c.mtime,
627
                    self.versions.c.muser,
628
                    self.versions.c.uuid,
629
                    self.versions.c.checksum,
630
                    self.versions.c.cluster])
631
        if before != inf:
632
            filtered = select([func.max(self.versions.c.serial)],
633
                              self.versions.c.node == node)
634
            filtered = filtered.where(self.versions.c.mtime < before)
635
        else:
636
            filtered = select([self.nodes.c.latest_version],
637
                              self.nodes.c.node == node)
638
        s = s.where(and_(self.versions.c.cluster != except_cluster,
639
                         self.versions.c.serial == filtered))
640
        r = self.conn.execute(s)
641
        props = r.fetchone()
642
        r.close()
643
        if not props:
644
            return None
645
        mtime = props[MTIME]
646

    
647
        # First level, just under node (get population).
648
        v = self.versions.alias('v')
649
        s = select([func.count(v.c.serial),
650
                    func.sum(v.c.size),
651
                    func.max(v.c.mtime)])
652
        if before != inf:
653
            c1 = select([func.max(self.versions.c.serial)])
654
            c1 = c1.where(self.versions.c.mtime < before)
655
            c1.where(self.versions.c.node == v.c.node)
656
        else:
657
            c1 = select([self.nodes.c.latest_version])
658
            c1 = c1.where(self.nodes.c.node == v.c.node)
659
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
660
        s = s.where(and_(v.c.serial == c1,
661
                         v.c.cluster != except_cluster,
662
                         v.c.node.in_(c2)))
663
        rp = self.conn.execute(s)
664
        r = rp.fetchone()
665
        rp.close()
666
        if not r:
667
            return None
668
        count = r[0]
669
        mtime = max(mtime, r[2])
670
        if count == 0:
671
            return (0, 0, mtime)
672

    
673
        # All children (get size and mtime).
674
        # This is why the full path is stored.
675
        s = select([func.count(v.c.serial),
676
                    func.sum(v.c.size),
677
                    func.max(v.c.mtime)])
678
        if before != inf:
679
            c1 = select([func.max(self.versions.c.serial)],
680
                        self.versions.c.node == v.c.node)
681
            c1 = c1.where(self.versions.c.mtime < before)
682
        else:
683
            c1 = select([self.nodes.c.latest_version],
684
                        self.nodes.c.node == v.c.node)
685
        c2 = select([self.nodes.c.node],
686
                    self.nodes.c.path.like(self.escape_like(path) + '%',
687
                                           escape=ESCAPE_CHAR))
688
        s = s.where(and_(v.c.serial == c1,
689
                         v.c.cluster != except_cluster,
690
                         v.c.node.in_(c2)))
691
        rp = self.conn.execute(s)
692
        r = rp.fetchone()
693
        rp.close()
694
        if not r:
695
            return None
696
        size = r[1] - props[SIZE]
697
        mtime = max(mtime, r[2])
698
        return (count, size, mtime)
699

    
700
    def nodes_set_latest_version(self, node, serial):
701
        s = self.nodes.update().where(self.nodes.c.node == node)
702
        s = s.values(latest_version=serial)
703
        self.conn.execute(s).close()
704

    
705
    def version_create(self, node, hash, size, type, source, muser, uuid,
706
                       checksum, cluster=0,
707
                       update_statistics_ancestors_depth=None):
708
        """Create a new version from the given properties.
709
           Return the (serial, mtime) of the new version.
710
        """
711

    
712
        mtime = time()
713
        s = self.versions.insert().values(
714
            node=node, hash=hash, size=size, type=type, source=source,
715
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
716
            cluster=cluster)
717
        serial = self.conn.execute(s).inserted_primary_key[0]
718
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
719
                                         update_statistics_ancestors_depth)
720

    
721
        self.nodes_set_latest_version(node, serial)
722

    
723
        return serial, mtime
724

    
725
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
726
        """Lookup the current version of the given node.
727
           Return a list with its properties:
728
           (serial, node, hash, size, type, source, mtime,
729
            muser, uuid, checksum, cluster)
730
           or None if the current version is not found in the given cluster.
731
        """
732

    
733
        v = self.versions.alias('v')
734
        if not all_props:
735
            s = select([v.c.serial])
736
        else:
737
            s = select([v.c.serial, v.c.node, v.c.hash,
738
                        v.c.size, v.c.type, v.c.source,
739
                        v.c.mtime, v.c.muser, v.c.uuid,
740
                        v.c.checksum, v.c.cluster])
741
        if before != inf:
742
            c = select([func.max(self.versions.c.serial)],
743
                       self.versions.c.node == node)
744
            c = c.where(self.versions.c.mtime < before)
745
        else:
746
            c = select([self.nodes.c.latest_version],
747
                       self.nodes.c.node == node)
748
        s = s.where(and_(v.c.serial == c,
749
                         v.c.cluster == cluster))
750
        r = self.conn.execute(s)
751
        props = r.fetchone()
752
        r.close()
753
        if props:
754
            return props
755
        return None
756

    
757
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
758
                            all_props=True):
759
        """Lookup the current versions of the given nodes.
760
           Return a list with their properties:
761
           (serial, node, hash, size, type, source, mtime, muser, uuid,
762
            checksum, cluster).
763
        """
764
        if not nodes:
765
            return ()
766
        v = self.versions.alias('v')
767
        if not all_props:
768
            s = select([v.c.serial])
769
        else:
770
            s = select([v.c.serial, v.c.node, v.c.hash,
771
                        v.c.size, v.c.type, v.c.source,
772
                        v.c.mtime, v.c.muser, v.c.uuid,
773
                        v.c.checksum, v.c.cluster])
774
        if before != inf:
775
            c = select([func.max(self.versions.c.serial)],
776
                       self.versions.c.node.in_(nodes))
777
            c = c.where(self.versions.c.mtime < before)
778
            c = c.group_by(self.versions.c.node)
779
        else:
780
            c = select([self.nodes.c.latest_version],
781
                       self.nodes.c.node.in_(nodes))
782
        s = s.where(and_(v.c.serial.in_(c),
783
                         v.c.cluster == cluster))
784
        s = s.order_by(v.c.node)
785
        r = self.conn.execute(s)
786
        rproxy = r.fetchall()
787
        r.close()
788
        return (tuple(row.values()) for row in rproxy)
789

    
790
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
791
                               node=None):
792
        """Return a sequence of values for the properties of
793
           the version specified by serial and the keys, in the order given.
794
           If keys is empty, return all properties in the order
795
           (serial, node, hash, size, type, source, mtime, muser, uuid,
796
            checksum, cluster).
797
        """
798

    
799
        v = self.versions.alias()
800
        s = select([v.c.serial, v.c.node, v.c.hash,
801
                    v.c.size, v.c.type, v.c.source,
802
                    v.c.mtime, v.c.muser, v.c.uuid,
803
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
804
        if node is not None:
805
            s = s.where(v.c.node == node)
806
        rp = self.conn.execute(s)
807
        r = rp.fetchone()
808
        rp.close()
809
        if r is None:
810
            return r
811

    
812
        if not keys:
813
            return r
814
        return [r[propnames[k]] for k in keys if k in propnames]
815

    
816
    def version_put_property(self, serial, key, value):
817
        """Set value for the property of version specified by key."""
818

    
819
        if key not in _propnames:
820
            return
821
        s = self.versions.update()
822
        s = s.where(self.versions.c.serial == serial)
823
        s = s.values(**{key: value})
824
        self.conn.execute(s).close()
825

    
826
    def version_recluster(self, serial, cluster,
827
                          update_statistics_ancestors_depth=None):
828
        """Move the version into another cluster."""
829

    
830
        props = self.version_get_properties(serial)
831
        if not props:
832
            return
833
        node = props[NODE]
834
        size = props[SIZE]
835
        oldcluster = props[CLUSTER]
836
        if cluster == oldcluster:
837
            return
838

    
839
        mtime = time()
840
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
841
                                         update_statistics_ancestors_depth)
842
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
843
                                         update_statistics_ancestors_depth)
844

    
845
        s = self.versions.update()
846
        s = s.where(self.versions.c.serial == serial)
847
        s = s.values(cluster=cluster)
848
        self.conn.execute(s).close()
849

    
850
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
851
        """Remove the serial specified."""
852

    
853
        props = self.version_get_properties(serial)
854
        if not props:
855
            return
856
        node = props[NODE]
857
        hash = props[HASH]
858
        size = props[SIZE]
859
        cluster = props[CLUSTER]
860

    
861
        mtime = time()
862
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
863
                                         update_statistics_ancestors_depth)
864

    
865
        s = self.versions.delete().where(self.versions.c.serial == serial)
866
        self.conn.execute(s).close()
867

    
868
        props = self.version_lookup(node, cluster=cluster, all_props=False)
869
        if props:
870
            self.nodes_set_latest_version(node, serial)
871

    
872
        return hash, size
873

    
874
    def attribute_get(self, serial, domain, keys=()):
875
        """Return a list of (key, value) pairs of the specific version.
876

877
        If keys is empty, return all attributes.
878
        Othwerise, return only those specified.
879
        """
880

    
881
        if keys:
882
            attrs = self.attributes.alias()
883
            s = select([attrs.c.key, attrs.c.value])
884
            s = s.where(and_(attrs.c.key.in_(keys),
885
                             attrs.c.serial == serial,
886
                             attrs.c.domain == domain))
887
        else:
888
            attrs = self.attributes.alias()
889
            s = select([attrs.c.key, attrs.c.value])
890
            s = s.where(and_(attrs.c.serial == serial,
891
                             attrs.c.domain == domain))
892
        r = self.conn.execute(s)
893
        l = r.fetchall()
894
        r.close()
895
        return l
896

    
897
    def attribute_set(self, serial, domain, node, items, is_latest=True):
898
        """Set the attributes of the version specified by serial.
899
           Receive attributes as an iterable of (key, value) pairs.
900
        """
901
        #insert or replace
902
        #TODO better upsert
903
        for k, v in items:
904
            s = self.attributes.update()
905
            s = s.where(and_(self.attributes.c.serial == serial,
906
                             self.attributes.c.domain == domain,
907
                             self.attributes.c.key == k))
908
            s = s.values(value=v)
909
            rp = self.conn.execute(s)
910
            rp.close()
911
            if rp.rowcount == 0:
912
                s = self.attributes.insert()
913
                s = s.values(serial=serial, domain=domain, node=node,
914
                             is_latest=is_latest, key=k, value=v)
915
                self.conn.execute(s).close()
916

    
917
    def attribute_del(self, serial, domain, keys=()):
918
        """Delete attributes of the version specified by serial.
919
           If keys is empty, delete all attributes.
920
           Otherwise delete those specified.
921
        """
922

    
923
        if keys:
924
            #TODO more efficient way to do this?
925
            for key in keys:
926
                s = self.attributes.delete()
927
                s = s.where(and_(self.attributes.c.serial == serial,
928
                                 self.attributes.c.domain == domain,
929
                                 self.attributes.c.key == key))
930
                self.conn.execute(s).close()
931
        else:
932
            s = self.attributes.delete()
933
            s = s.where(and_(self.attributes.c.serial == serial,
934
                             self.attributes.c.domain == domain))
935
            self.conn.execute(s).close()
936

    
937
    def attribute_copy(self, source, dest):
938
        s = select(
939
            [dest, self.attributes.c.domain, self.attributes.c.node,
940
             self.attributes.c.key, self.attributes.c.value],
941
            self.attributes.c.serial == source)
942
        rp = self.conn.execute(s)
943
        attributes = rp.fetchall()
944
        rp.close()
945
        for dest, domain, node, k, v in attributes:
946
            select_src_node = select(
947
                [self.versions.c.node],
948
                self.versions.c.serial == dest)
949
            # insert or replace
950
            s = self.attributes.update().where(and_(
951
                self.attributes.c.serial == dest,
952
                self.attributes.c.domain == domain,
953
                self.attributes.c.key == k))
954
            s = s.values(node=select_src_node, value=v)
955
            rp = self.conn.execute(s)
956
            rp.close()
957
            if rp.rowcount == 0:
958
                s = self.attributes.insert()
959
                s = s.values(serial=dest, domain=domain, node=select_src_node,
960
                             is_latest=True, key=k, value=v)
961
            self.conn.execute(s).close()
962

    
963
    def attribute_unset_is_latest(self, node, exclude):
964
        u = self.attributes.update().where(and_(
965
            self.attributes.c.node == node,
966
            self.attributes.c.serial != exclude)).values({'is_latest': False})
967
        self.conn.execute(u)
968

    
969
    def latest_attribute_keys(self, parent, domain, before=inf,
970
                              except_cluster=0, pathq=None):
971
        """Return a list with all keys pairs defined
972
           for all latest versions under parent that
973
           do not belong to the cluster.
974
        """
975

    
976
        pathq = pathq or []
977

    
978
        # TODO: Use another table to store before=inf results.
979
        a = self.attributes.alias('a')
980
        v = self.versions.alias('v')
981
        n = self.nodes.alias('n')
982
        s = select([a.c.key]).distinct()
983
        if before != inf:
984
            filtered = select([func.max(self.versions.c.serial)])
985
            filtered = filtered.where(self.versions.c.mtime < before)
986
            filtered = filtered.where(self.versions.c.node == v.c.node)
987
        else:
988
            filtered = select([self.nodes.c.latest_version])
989
            filtered = filtered.where(self.nodes.c.node == v.c.node)
990
        s = s.where(v.c.serial == filtered)
991
        s = s.where(v.c.cluster != except_cluster)
992
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
993
                                        self.nodes.c.parent == parent)))
994
        s = s.where(a.c.serial == v.c.serial)
995
        s = s.where(a.c.domain == domain)
996
        s = s.where(n.c.node == v.c.node)
997
        conj = []
998
        for path, match in pathq:
999
            if match == MATCH_PREFIX:
1000
                conj.append(n.c.path.like(self.escape_like(path) + '%',
1001
                                          escape=ESCAPE_CHAR))
1002
            elif match == MATCH_EXACT:
1003
                conj.append(n.c.path == path)
1004
        if conj:
1005
            s = s.where(or_(*conj))
1006
        rp = self.conn.execute(s)
1007
        rows = rp.fetchall()
1008
        rp.close()
1009
        return [r[0] for r in rows]
1010

    
1011
    def latest_version_list(self, parent, prefix='', delimiter=None,
1012
                            start='', limit=10000, before=inf,
1013
                            except_cluster=0, pathq=[], domain=None,
1014
                            filterq=[], sizeq=None, all_props=False):
1015
        """Return a (list of (path, serial) tuples, list of common prefixes)
1016
           for the current versions of the paths with the given parent,
1017
           matching the following criteria.
1018

1019
           The property tuple for a version is returned if all
1020
           of these conditions are true:
1021

1022
                a. parent matches
1023

1024
                b. path > start
1025

1026
                c. path starts with prefix (and paths in pathq)
1027

1028
                d. version is the max up to before
1029

1030
                e. version is not in cluster
1031

1032
                f. the path does not have the delimiter occuring
1033
                   after the prefix, or ends with the delimiter
1034

1035
                g. serial matches the attribute filter query.
1036

1037
                   A filter query is a comma-separated list of
1038
                   terms in one of these three forms:
1039

1040
                   key
1041
                       an attribute with this key must exist
1042

1043
                   !key
1044
                       an attribute with this key must not exist
1045

1046
                   key ?op value
1047
                       the attribute with this key satisfies the value
1048
                       where ?op is one of ==, != <=, >=, <, >.
1049

1050
                h. the size is in the range set by sizeq
1051

1052
           The list of common prefixes includes the prefixes
1053
           matching up to the first delimiter after prefix,
1054
           and are reported only once, as "virtual directories".
1055
           The delimiter is included in the prefixes.
1056

1057
           If arguments are None, then the corresponding matching rule
1058
           will always match.
1059

1060
           Limit applies to the first list of tuples returned.
1061

1062
           If all_props is True, return all properties after path,
1063
           not just serial.
1064
        """
1065

    
1066
        if not start or start < prefix:
1067
            start = strprevling(prefix)
1068
        nextling = strnextling(prefix)
1069

    
1070
        v = self.versions.alias('v')
1071
        n = self.nodes.alias('n')
1072
        if not all_props:
1073
            s = select([n.c.path, v.c.serial]).distinct()
1074
        else:
1075
            s = select([n.c.path,
1076
                        v.c.serial, v.c.node, v.c.hash,
1077
                        v.c.size, v.c.type, v.c.source,
1078
                        v.c.mtime, v.c.muser, v.c.uuid,
1079
                        v.c.checksum, v.c.cluster]).distinct()
1080
        if before != inf:
1081
            filtered = select([func.max(self.versions.c.serial)])
1082
            filtered = filtered.where(self.versions.c.mtime < before)
1083
        else:
1084
            filtered = select([self.nodes.c.latest_version])
1085
        s = s.where(
1086
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
1087
        s = s.where(v.c.cluster != except_cluster)
1088
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
1089
                                        self.nodes.c.parent == parent)))
1090

    
1091
        s = s.where(n.c.node == v.c.node)
1092
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
1093
        conj = []
1094
        for path, match in pathq:
1095
            if match == MATCH_PREFIX:
1096
                conj.append(n.c.path.like(self.escape_like(path) + '%',
1097
                                          escape=ESCAPE_CHAR))
1098
            elif match == MATCH_EXACT:
1099
                conj.append(n.c.path == path)
1100
        if conj:
1101
            s = s.where(or_(*conj))
1102

    
1103
        if sizeq and len(sizeq) == 2:
1104
            if sizeq[0]:
1105
                s = s.where(v.c.size >= sizeq[0])
1106
            if sizeq[1]:
1107
                s = s.where(v.c.size < sizeq[1])
1108

    
1109
        if domain and filterq:
1110
            a = self.attributes.alias('a')
1111
            included, excluded, opers = parse_filters(filterq)
1112
            if included:
1113
                subs = select([1])
1114
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1115
                subs = subs.where(a.c.domain == domain)
1116
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1117
                s = s.where(exists(subs))
1118
            if excluded:
1119
                subs = select([1])
1120
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1121
                subs = subs.where(a.c.domain == domain)
1122
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1123
                s = s.where(not_(exists(subs)))
1124
            if opers:
1125
                for k, o, val in opers:
1126
                    subs = select([1])
1127
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1128
                    subs = subs.where(a.c.domain == domain)
1129
                    subs = subs.where(
1130
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1131
                    s = s.where(exists(subs))
1132

    
1133
        s = s.order_by(n.c.path)
1134

    
1135
        if not delimiter:
1136
            s = s.limit(limit)
1137
            rp = self.conn.execute(s, start=start)
1138
            r = rp.fetchall()
1139
            rp.close()
1140
            return r, ()
1141

    
1142
        pfz = len(prefix)
1143
        dz = len(delimiter)
1144
        count = 0
1145
        prefixes = []
1146
        pappend = prefixes.append
1147
        matches = []
1148
        mappend = matches.append
1149

    
1150
        rp = self.conn.execute(s, start=start)
1151
        while True:
1152
            props = rp.fetchone()
1153
            if props is None:
1154
                break
1155
            path = props[0]
1156
            idx = path.find(delimiter, pfz)
1157

    
1158
            if idx < 0:
1159
                mappend(props)
1160
                count += 1
1161
                if count >= limit:
1162
                    break
1163
                continue
1164

    
1165
            if idx + dz == len(path):
1166
                mappend(props)
1167
                count += 1
1168
                continue  # Get one more, in case there is a path.
1169
            pf = path[:idx + dz]
1170
            pappend(pf)
1171
            if count >= limit:
1172
                break
1173

    
1174
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1175
        rp.close()
1176

    
1177
        return matches, prefixes
1178

    
1179
    def latest_uuid(self, uuid, cluster):
1180
        """Return the latest version of the given uuid and cluster.
1181

1182
        Return a (path, serial) tuple.
1183
        If cluster is None, all clusters are considered.
1184

1185
        """
1186

    
1187
        v = self.versions.alias('v')
1188
        n = self.nodes.alias('n')
1189
        s = select([n.c.path, v.c.serial])
1190
        filtered = select([func.max(self.versions.c.serial)])
1191
        filtered = filtered.where(self.versions.c.uuid == uuid)
1192
        if cluster is not None:
1193
            filtered = filtered.where(self.versions.c.cluster == cluster)
1194
        s = s.where(v.c.serial == filtered)
1195
        s = s.where(n.c.node == v.c.node)
1196

    
1197
        r = self.conn.execute(s)
1198
        l = r.fetchone()
1199
        r.close()
1200
        return l
1201

    
1202
    def domain_object_list(self, domain, paths, cluster=None):
1203
        """Return a list of (path, property list, attribute dictionary)
1204
           for the objects in the specific domain and cluster.
1205
        """
1206

    
1207
        v = self.versions.alias('v')
1208
        n = self.nodes.alias('n')
1209
        a = self.attributes.alias('a')
1210

    
1211
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1212
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1213
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1214
        if cluster:
1215
            s = s.where(v.c.cluster == cluster)
1216
        s = s.where(v.c.serial == a.c.serial)
1217
        s = s.where(a.c.domain == domain)
1218
        s = s.where(a.c.node == n.c.node)
1219
        s = s.where(a.c.is_latest == True)
1220
        if paths:
1221
            s = s.where(n.c.path.in_(paths))
1222

    
1223
        r = self.conn.execute(s)
1224
        rows = r.fetchall()
1225
        r.close()
1226

    
1227
        group_by = itemgetter(slice(12))
1228
        rows.sort(key=group_by)
1229
        groups = groupby(rows, group_by)
1230
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1231
                (k, data) in groups]
1232

    
1233
    def get_props(self, paths):
1234
        inner_join = \
1235
            self.nodes.join(self.versions,
1236
                onclause=self.versions.c.serial==self.nodes.c.latest_version)
1237
        cc = self.nodes.c.path.in_(paths)
1238
        s = select([self.nodes.c.path,self.versions.c.type],
1239
                    from_obj=[inner_join]).where(cc).distinct()
1240
        r = self.conn.execute(s)
1241
        rows = r.fetchall()
1242
        r.close()
1243
        if rows:
1244
            return rows
1245
        return None