Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 96090728

History | View | Annotate | Download (45.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.exc import NoSuchTableError
43

    
44
from dbworker import DBWorker, ESCAPE_CHAR
45

    
46
from pithos.backends.filter import parse_filters
47

    
48

    
49
ROOTNODE = 0
50

    
51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER) = range(11)
53

    
54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55

    
56
inf = float('inf')
57

    
58

    
59
def strnextling(prefix):
60
    """Return the first unicode string
61
       greater than but not starting with given prefix.
62
       strnextling('hello') -> 'hellp'
63
    """
64
    if not prefix:
65
        ## all strings start with the null string,
66
        ## therefore we have to approximate strnextling('')
67
        ## with the last unicode character supported by python
68
        ## 0x10ffff for wide (32-bit unicode) python builds
69
        ## 0x00ffff for narrow (16-bit unicode) python builds
70
        ## We will not autodetect. 0xffff is safe enough.
71
        return unichr(0xffff)
72
    s = prefix[:-1]
73
    c = ord(prefix[-1])
74
    if c >= 0xffff:
75
        raise RuntimeError
76
    s += unichr(c + 1)
77
    return s
78

    
79

    
80
def strprevling(prefix):
81
    """Return an approximation of the last unicode string
82
       less than but not starting with given prefix.
83
       strprevling(u'hello') -> u'helln\\xffff'
84
    """
85
    if not prefix:
86
        ## There is no prevling for the null string
87
        return prefix
88
    s = prefix[:-1]
89
    c = ord(prefix[-1])
90
    if c > 0:
91
        s += unichr(c - 1) + unichr(0xffff)
92
    return s
93

    
94
_propnames = {
95
    'serial': 0,
96
    'node': 1,
97
    'hash': 2,
98
    'size': 3,
99
    'type': 4,
100
    'source': 5,
101
    'mtime': 6,
102
    'muser': 7,
103
    'uuid': 8,
104
    'checksum': 9,
105
    'cluster': 10
106
}
107

    
108

    
109
def create_tables(engine):
110
    metadata = MetaData()
111

    
112
    #create nodes table
113
    columns = []
114
    columns.append(Column('node', Integer, primary_key=True))
115
    columns.append(Column('parent', Integer,
116
                          ForeignKey('nodes.node',
117
                                     ondelete='CASCADE',
118
                                     onupdate='CASCADE'),
119
                          autoincrement=False))
120
    columns.append(Column('latest_version', Integer))
121
    columns.append(Column('path', String(2048), default='', nullable=False))
122
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123
    Index('idx_nodes_path', nodes.c.path, unique=True)
124
    Index('idx_nodes_parent', nodes.c.parent)
125

    
126
    #create policy table
127
    columns = []
128
    columns.append(Column('node', Integer,
129
                          ForeignKey('nodes.node',
130
                                     ondelete='CASCADE',
131
                                     onupdate='CASCADE'),
132
                          primary_key=True))
133
    columns.append(Column('key', String(128), primary_key=True))
134
    columns.append(Column('value', String(256)))
135
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
136

    
137
    #create statistics table
138
    columns = []
139
    columns.append(Column('node', Integer,
140
                          ForeignKey('nodes.node',
141
                                     ondelete='CASCADE',
142
                                     onupdate='CASCADE'),
143
                          primary_key=True))
144
    columns.append(Column('population', Integer, nullable=False, default=0))
145
    columns.append(Column('size', BigInteger, nullable=False, default=0))
146
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
147
    columns.append(Column('cluster', Integer, nullable=False, default=0,
148
                          primary_key=True, autoincrement=False))
149
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
150

    
151
    #create versions table
152
    columns = []
153
    columns.append(Column('serial', Integer, primary_key=True))
154
    columns.append(Column('node', Integer,
155
                          ForeignKey('nodes.node',
156
                                     ondelete='CASCADE',
157
                                     onupdate='CASCADE')))
158
    columns.append(Column('hash', String(256)))
159
    columns.append(Column('size', BigInteger, nullable=False, default=0))
160
    columns.append(Column('type', String(256), nullable=False, default=''))
161
    columns.append(Column('source', Integer))
162
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
163
    columns.append(Column('muser', String(256), nullable=False, default=''))
164
    columns.append(Column('uuid', String(64), nullable=False, default=''))
165
    columns.append(Column('checksum', String(256), nullable=False, default=''))
166
    columns.append(Column('cluster', Integer, nullable=False, default=0))
167
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
168
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
169
    Index('idx_versions_node_uuid', versions.c.uuid)
170

    
171
    #create attributes table
172
    columns = []
173
    columns.append(Column('serial', Integer,
174
                          ForeignKey('versions.serial',
175
                                     ondelete='CASCADE',
176
                                     onupdate='CASCADE'),
177
                          primary_key=True))
178
    columns.append(Column('domain', String(256), primary_key=True))
179
    columns.append(Column('key', String(128), primary_key=True))
180
    columns.append(Column('value', String(256)))
181
    columns.append(Column('node', Integer, nullable=False, default=0))
182
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
183
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
184
    Index('idx_attributes_domain', attributes.c.domain)
185
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
186

    
187
    metadata.create_all(engine)
188
    return metadata.sorted_tables
189

    
190

    
191
class Node(DBWorker):
192
    """Nodes store path organization and have multiple versions.
193
       Versions store object history and have multiple attributes.
194
       Attributes store metadata.
195
    """
196

    
197
    # TODO: Provide an interface for included and excluded clusters.
198

    
199
    def __init__(self, **params):
200
        DBWorker.__init__(self, **params)
201
        try:
202
            metadata = MetaData(self.engine)
203
            self.nodes = Table('nodes', metadata, autoload=True)
204
            self.policy = Table('policy', metadata, autoload=True)
205
            self.statistics = Table('statistics', metadata, autoload=True)
206
            self.versions = Table('versions', metadata, autoload=True)
207
            self.attributes = Table('attributes', metadata, autoload=True)
208
        except NoSuchTableError:
209
            tables = create_tables(self.engine)
210
            map(lambda t: self.__setattr__(t.name, t), tables)
211

    
212
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
213
                                           self.nodes.c.parent == ROOTNODE))
214
        wrapper = self.wrapper
215
        wrapper.execute()
216
        try:
217
            rp = self.conn.execute(s)
218
            r = rp.fetchone()
219
            rp.close()
220
            if not r:
221
                s = self.nodes.insert(
222
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
223
                self.conn.execute(s)
224
        finally:
225
            wrapper.commit()
226

    
227
    def node_create(self, parent, path):
228
        """Create a new node from the given properties.
229
           Return the node identifier of the new node.
230
        """
231
        #TODO catch IntegrityError?
232
        s = self.nodes.insert().values(parent=parent, path=path)
233
        r = self.conn.execute(s)
234
        inserted_primary_key = r.inserted_primary_key[0]
235
        r.close()
236
        return inserted_primary_key
237

    
238
    def node_lookup(self, path, for_update=False):
239
        """Lookup the current node of the given path.
240
           Return None if the path is not found.
241
        """
242

    
243
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
244
        s = select([self.nodes.c.node],
245
                   self.nodes.c.path.like(self.escape_like(path),
246
                                          escape=ESCAPE_CHAR),
247
                   for_update=for_update)
248
        r = self.conn.execute(s)
249
        row = r.fetchone()
250
        r.close()
251
        if row:
252
            return row[0]
253
        return None
254

    
255
    def node_lookup_bulk(self, paths):
256
        """Lookup the current nodes for the given paths.
257
           Return () if the path is not found.
258
        """
259

    
260
        if not paths:
261
            return ()
262
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
263
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
264
        r = self.conn.execute(s)
265
        rows = r.fetchall()
266
        r.close()
267
        return [row[0] for row in rows]
268

    
269
    def node_get_properties(self, node):
270
        """Return the node's (parent, path).
271
           Return None if the node is not found.
272
        """
273

    
274
        s = select([self.nodes.c.parent, self.nodes.c.path])
275
        s = s.where(self.nodes.c.node == node)
276
        r = self.conn.execute(s)
277
        l = r.fetchone()
278
        r.close()
279
        return l
280

    
281
    def node_get_versions(self, node, keys=(), propnames=_propnames):
282
        """Return the properties of all versions at node.
283
           If keys is empty, return all properties in the order
284
           (serial, node, hash, size, type, source, mtime, muser, uuid,
285
            checksum, cluster).
286
        """
287

    
288
        s = select([self.versions.c.serial,
289
                    self.versions.c.node,
290
                    self.versions.c.hash,
291
                    self.versions.c.size,
292
                    self.versions.c.type,
293
                    self.versions.c.source,
294
                    self.versions.c.mtime,
295
                    self.versions.c.muser,
296
                    self.versions.c.uuid,
297
                    self.versions.c.checksum,
298
                    self.versions.c.cluster], self.versions.c.node == node)
299
        s = s.order_by(self.versions.c.serial)
300
        r = self.conn.execute(s)
301
        rows = r.fetchall()
302
        r.close()
303
        if not rows:
304
            return rows
305

    
306
        if not keys:
307
            return rows
308

    
309
        return [[p[propnames[k]] for k in keys if k in propnames] for
310
                p in rows]
311

    
312
    def node_count_children(self, node):
313
        """Return node's child count."""
314

    
315
        s = select([func.count(self.nodes.c.node)])
316
        s = s.where(and_(self.nodes.c.parent == node,
317
                         self.nodes.c.node != ROOTNODE))
318
        r = self.conn.execute(s)
319
        row = r.fetchone()
320
        r.close()
321
        return row[0]
322

    
323
    def node_purge_children(self, parent, before=inf, cluster=0,
324
                            update_statistics_ancestors_depth=None):
325
        """Delete all versions with the specified
326
           parent and cluster, and return
327
           the hashes, the total size and the serials of versions deleted.
328
           Clears out nodes with no remaining versions.
329
        """
330
        #update statistics
331
        c1 = select([self.nodes.c.node],
332
                    self.nodes.c.parent == parent)
333
        where_clause = and_(self.versions.c.node.in_(c1),
334
                            self.versions.c.cluster == cluster)
335
        if before != inf:
336
            where_clause = and_(where_clause,
337
                                self.versions.c.mtime <= before)
338
        s = select([func.count(self.versions.c.serial),
339
                    func.sum(self.versions.c.size)])
340
        s = s.where(where_clause)
341
        r = self.conn.execute(s)
342
        row = r.fetchone()
343
        r.close()
344
        if not row:
345
            return (), 0, ()
346
        nr, size = row[0], row[1] if row[1] else 0
347
        mtime = time()
348
        self.statistics_update(parent, -nr, -size, mtime, cluster)
349
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
350
                                         update_statistics_ancestors_depth)
351

    
352
        s = select([self.versions.c.hash, self.versions.c.serial])
353
        s = s.where(where_clause)
354
        r = self.conn.execute(s)
355
        hashes = []
356
        serials = []
357
        for row in r.fetchall():
358
            hashes += [row[0]]
359
            serials += [row[1]]
360
        r.close()
361

    
362
        #delete versions
363
        s = self.versions.delete().where(where_clause)
364
        r = self.conn.execute(s)
365
        r.close()
366

    
367
        #delete nodes
368
        s = select([self.nodes.c.node],
369
                   and_(self.nodes.c.parent == parent,
370
                        select([func.count(self.versions.c.serial)],
371
                               self.versions.c.node == self.nodes.c.node).
372
                        as_scalar() == 0))
373
        rp = self.conn.execute(s)
374
        nodes = [row[0] for row in rp.fetchall()]
375
        rp.close()
376
        if nodes:
377
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
378
            self.conn.execute(s).close()
379

    
380
        return hashes, size, serials
381

    
382
    def node_purge(self, node, before=inf, cluster=0,
383
                   update_statistics_ancestors_depth=None):
384
        """Delete all versions with the specified
385
           node and cluster, and return
386
           the hashes and size of versions deleted.
387
           Clears out the node if it has no remaining versions.
388
        """
389

    
390
        #update statistics
391
        s = select([func.count(self.versions.c.serial),
392
                    func.sum(self.versions.c.size)])
393
        where_clause = and_(self.versions.c.node == node,
394
                            self.versions.c.cluster == cluster)
395
        if before != inf:
396
            where_clause = and_(where_clause,
397
                                self.versions.c.mtime <= before)
398
        s = s.where(where_clause)
399
        r = self.conn.execute(s)
400
        row = r.fetchone()
401
        nr, size = row[0], row[1]
402
        r.close()
403
        if not nr:
404
            return (), 0, ()
405
        mtime = time()
406
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
407
                                         update_statistics_ancestors_depth)
408

    
409
        s = select([self.versions.c.hash, self.versions.c.serial])
410
        s = s.where(where_clause)
411
        r = self.conn.execute(s)
412
        hashes = []
413
        serials = []
414
        for row in r.fetchall():
415
            hashes += [row[0]]
416
            serials += [row[1]]
417
        r.close()
418

    
419
        #delete versions
420
        s = self.versions.delete().where(where_clause)
421
        r = self.conn.execute(s)
422
        r.close()
423

    
424
        #delete nodes
425
        s = select([self.nodes.c.node],
426
                   and_(self.nodes.c.node == node,
427
                        select([func.count(self.versions.c.serial)],
428
                               self.versions.c.node == self.nodes.c.node).
429
                        as_scalar() == 0))
430
        rp = self.conn.execute(s)
431
        nodes = [row[0] for row in rp.fetchall()]
432
        rp.close()
433
        if nodes:
434
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
435
            self.conn.execute(s).close()
436

    
437
        return hashes, size, serials
438

    
439
    def node_remove(self, node, update_statistics_ancestors_depth=None):
440
        """Remove the node specified.
441
           Return false if the node has children or is not found.
442
        """
443

    
444
        if self.node_count_children(node):
445
            return False
446

    
447
        mtime = time()
448
        s = select([func.count(self.versions.c.serial),
449
                    func.sum(self.versions.c.size),
450
                    self.versions.c.cluster])
451
        s = s.where(self.versions.c.node == node)
452
        s = s.group_by(self.versions.c.cluster)
453
        r = self.conn.execute(s)
454
        for population, size, cluster in r.fetchall():
455
            self.statistics_update_ancestors(
456
                node, -population, -size, mtime, cluster,
457
                update_statistics_ancestors_depth)
458
        r.close()
459

    
460
        s = self.nodes.delete().where(self.nodes.c.node == node)
461
        self.conn.execute(s).close()
462
        return True
463

    
464
    def node_accounts(self, accounts=()):
465
        s = select([self.nodes.c.path, self.nodes.c.node])
466
        s = s.where(and_(self.nodes.c.node != 0,
467
                         self.nodes.c.parent == 0))
468
        if accounts:
469
            s = s.where(self.nodes.c.path.in_(accounts))
470
        r = self.conn.execute(s)
471
        rows = r.fetchall()
472
        r.close()
473
        return rows
474

    
475
    def node_account_quotas(self):
476
        s = select([self.nodes.c.path, self.policy.c.value])
477
        s = s.where(and_(self.nodes.c.node != 0,
478
                         self.nodes.c.parent == 0))
479
        s = s.where(self.nodes.c.node == self.policy.c.node)
480
        s = s.where(self.policy.c.key == 'quota')
481
        r = self.conn.execute(s)
482
        rows = r.fetchall()
483
        r.close()
484
        return dict(rows)
485

    
486
    def node_account_usage(self, account_node, cluster):
487
        select_children = select(
488
            [self.nodes.c.node]).where(self.nodes.c.parent == account_node)
489
        select_descendants = select([self.nodes.c.node]).where(
490
            or_(self.nodes.c.parent.in_(select_children),
491
                self.nodes.c.node.in_(select_children)))
492
        s = select([func.sum(self.versions.c.size)])
493
        s = s.where(self.nodes.c.node == self.versions.c.node)
494
        s = s.where(self.nodes.c.node.in_(select_descendants))
495
        s = s.where(self.versions.c.cluster == cluster)
496
        r = self.conn.execute(s)
497
        usage = r.fetchone()[0]
498
        r.close()
499
        return usage
500

    
501
    def policy_get(self, node):
502
        s = select([self.policy.c.key, self.policy.c.value],
503
                   self.policy.c.node == node)
504
        r = self.conn.execute(s)
505
        d = dict(r.fetchall())
506
        r.close()
507
        return d
508

    
509
    def policy_set(self, node, policy):
510
        #insert or replace
511
        for k, v in policy.iteritems():
512
            s = self.policy.update().where(and_(self.policy.c.node == node,
513
                                                self.policy.c.key == k))
514
            s = s.values(value=v)
515
            rp = self.conn.execute(s)
516
            rp.close()
517
            if rp.rowcount == 0:
518
                s = self.policy.insert()
519
                values = {'node': node, 'key': k, 'value': v}
520
                r = self.conn.execute(s, values)
521
                r.close()
522

    
523
    def statistics_get(self, node, cluster=0):
524
        """Return population, total size and last mtime
525
           for all versions under node that belong to the cluster.
526
        """
527

    
528
        s = select([self.statistics.c.population,
529
                    self.statistics.c.size,
530
                    self.statistics.c.mtime])
531
        s = s.where(and_(self.statistics.c.node == node,
532
                         self.statistics.c.cluster == cluster))
533
        r = self.conn.execute(s)
534
        row = r.fetchone()
535
        r.close()
536
        return row
537

    
538
    def statistics_update(self, node, population, size, mtime, cluster=0):
539
        """Update the statistics of the given node.
540
           Statistics keep track the population, total
541
           size of objects and mtime in the node's namespace.
542
           May be zero or positive or negative numbers.
543
        """
544
        s = select([self.statistics.c.population, self.statistics.c.size],
545
                   and_(self.statistics.c.node == node,
546
                        self.statistics.c.cluster == cluster))
547
        rp = self.conn.execute(s)
548
        r = rp.fetchone()
549
        rp.close()
550
        if not r:
551
            prepopulation, presize = (0, 0)
552
        else:
553
            prepopulation, presize = r
554
        population += prepopulation
555
        population = max(population, 0)
556
        size += presize
557

    
558
        #insert or replace
559
        #TODO better upsert
560
        u = self.statistics.update().where(and_(
561
            self.statistics.c.node == node,
562
            self.statistics.c.cluster == cluster))
563
        u = u.values(population=population, size=size, mtime=mtime)
564
        rp = self.conn.execute(u)
565
        rp.close()
566
        if rp.rowcount == 0:
567
            ins = self.statistics.insert()
568
            ins = ins.values(node=node, population=population, size=size,
569
                             mtime=mtime, cluster=cluster)
570
            self.conn.execute(ins).close()
571

    
572
    def statistics_update_ancestors(self, node, population, size, mtime,
573
                                    cluster=0, recursion_depth=None):
574
        """Update the statistics of the given node's parent.
575
           Then recursively update all parents up to the root
576
           or up to the ``recursion_depth`` (if not None).
577
           Population is not recursive.
578
        """
579

    
580
        i = 0
581
        while True:
582
            if node == ROOTNODE:
583
                break
584
            if recursion_depth and recursion_depth <= i:
585
                break
586
            props = self.node_get_properties(node)
587
            if props is None:
588
                break
589
            parent, path = props
590
            self.statistics_update(parent, population, size, mtime, cluster)
591
            node = parent
592
            population = 0  # Population isn't recursive
593
            i += 1
594

    
595
    def statistics_latest(self, node, before=inf, except_cluster=0):
596
        """Return population, total size and last mtime
597
           for all latest versions under node that
598
           do not belong to the cluster.
599
        """
600

    
601
        # The node.
602
        props = self.node_get_properties(node)
603
        if props is None:
604
            return None
605
        parent, path = props
606

    
607
        # The latest version.
608
        s = select([self.versions.c.serial,
609
                    self.versions.c.node,
610
                    self.versions.c.hash,
611
                    self.versions.c.size,
612
                    self.versions.c.type,
613
                    self.versions.c.source,
614
                    self.versions.c.mtime,
615
                    self.versions.c.muser,
616
                    self.versions.c.uuid,
617
                    self.versions.c.checksum,
618
                    self.versions.c.cluster])
619
        if before != inf:
620
            filtered = select([func.max(self.versions.c.serial)],
621
                              self.versions.c.node == node)
622
            filtered = filtered.where(self.versions.c.mtime < before)
623
        else:
624
            filtered = select([self.nodes.c.latest_version],
625
                              self.nodes.c.node == node)
626
        s = s.where(and_(self.versions.c.cluster != except_cluster,
627
                         self.versions.c.serial == filtered))
628
        r = self.conn.execute(s)
629
        props = r.fetchone()
630
        r.close()
631
        if not props:
632
            return None
633
        mtime = props[MTIME]
634

    
635
        # First level, just under node (get population).
636
        v = self.versions.alias('v')
637
        s = select([func.count(v.c.serial),
638
                    func.sum(v.c.size),
639
                    func.max(v.c.mtime)])
640
        if before != inf:
641
            c1 = select([func.max(self.versions.c.serial)])
642
            c1 = c1.where(self.versions.c.mtime < before)
643
            c1.where(self.versions.c.node == v.c.node)
644
        else:
645
            c1 = select([self.nodes.c.latest_version])
646
            c1 = c1.where(self.nodes.c.node == v.c.node)
647
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
648
        s = s.where(and_(v.c.serial == c1,
649
                         v.c.cluster != except_cluster,
650
                         v.c.node.in_(c2)))
651
        rp = self.conn.execute(s)
652
        r = rp.fetchone()
653
        rp.close()
654
        if not r:
655
            return None
656
        count = r[0]
657
        mtime = max(mtime, r[2])
658
        if count == 0:
659
            return (0, 0, mtime)
660

    
661
        # All children (get size and mtime).
662
        # This is why the full path is stored.
663
        s = select([func.count(v.c.serial),
664
                    func.sum(v.c.size),
665
                    func.max(v.c.mtime)])
666
        if before != inf:
667
            c1 = select([func.max(self.versions.c.serial)],
668
                        self.versions.c.node == v.c.node)
669
            c1 = c1.where(self.versions.c.mtime < before)
670
        else:
671
            c1 = select([self.nodes.c.latest_version],
672
                        self.nodes.c.node == v.c.node)
673
        c2 = select([self.nodes.c.node],
674
                    self.nodes.c.path.like(self.escape_like(path) + '%',
675
                                           escape=ESCAPE_CHAR))
676
        s = s.where(and_(v.c.serial == c1,
677
                         v.c.cluster != except_cluster,
678
                         v.c.node.in_(c2)))
679
        rp = self.conn.execute(s)
680
        r = rp.fetchone()
681
        rp.close()
682
        if not r:
683
            return None
684
        size = r[1] - props[SIZE]
685
        mtime = max(mtime, r[2])
686
        return (count, size, mtime)
687

    
688
    def nodes_set_latest_version(self, node, serial):
689
        s = self.nodes.update().where(self.nodes.c.node == node)
690
        s = s.values(latest_version=serial)
691
        self.conn.execute(s).close()
692

    
693
    def version_create(self, node, hash, size, type, source, muser, uuid,
694
                       checksum, cluster=0,
695
                       update_statistics_ancestors_depth=None):
696
        """Create a new version from the given properties.
697
           Return the (serial, mtime) of the new version.
698
        """
699

    
700
        mtime = time()
701
        s = self.versions.insert().values(
702
            node=node, hash=hash, size=size, type=type, source=source,
703
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
704
            cluster=cluster)
705
        serial = self.conn.execute(s).inserted_primary_key[0]
706
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
707
                                         update_statistics_ancestors_depth)
708

    
709
        self.nodes_set_latest_version(node, serial)
710

    
711
        return serial, mtime
712

    
713
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
714
        """Lookup the current version of the given node.
715
           Return a list with its properties:
716
           (serial, node, hash, size, type, source, mtime,
717
            muser, uuid, checksum, cluster)
718
           or None if the current version is not found in the given cluster.
719
        """
720

    
721
        v = self.versions.alias('v')
722
        if not all_props:
723
            s = select([v.c.serial])
724
        else:
725
            s = select([v.c.serial, v.c.node, v.c.hash,
726
                        v.c.size, v.c.type, v.c.source,
727
                        v.c.mtime, v.c.muser, v.c.uuid,
728
                        v.c.checksum, v.c.cluster])
729
        if before != inf:
730
            c = select([func.max(self.versions.c.serial)],
731
                       self.versions.c.node == node)
732
            c = c.where(self.versions.c.mtime < before)
733
        else:
734
            c = select([self.nodes.c.latest_version],
735
                       self.nodes.c.node == node)
736
        s = s.where(and_(v.c.serial == c,
737
                         v.c.cluster == cluster))
738
        r = self.conn.execute(s)
739
        props = r.fetchone()
740
        r.close()
741
        if props:
742
            return props
743
        return None
744

    
745
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
746
                            all_props=True):
747
        """Lookup the current versions of the given nodes.
748
           Return a list with their properties:
749
           (serial, node, hash, size, type, source, mtime, muser, uuid,
750
            checksum, cluster).
751
        """
752
        if not nodes:
753
            return ()
754
        v = self.versions.alias('v')
755
        if not all_props:
756
            s = select([v.c.serial])
757
        else:
758
            s = select([v.c.serial, v.c.node, v.c.hash,
759
                        v.c.size, v.c.type, v.c.source,
760
                        v.c.mtime, v.c.muser, v.c.uuid,
761
                        v.c.checksum, v.c.cluster])
762
        if before != inf:
763
            c = select([func.max(self.versions.c.serial)],
764
                       self.versions.c.node.in_(nodes))
765
            c = c.where(self.versions.c.mtime < before)
766
            c = c.group_by(self.versions.c.node)
767
        else:
768
            c = select([self.nodes.c.latest_version],
769
                       self.nodes.c.node.in_(nodes))
770
        s = s.where(and_(v.c.serial.in_(c),
771
                         v.c.cluster == cluster))
772
        s = s.order_by(v.c.node)
773
        r = self.conn.execute(s)
774
        rproxy = r.fetchall()
775
        r.close()
776
        return (tuple(row.values()) for row in rproxy)
777

    
778
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
779
        """Return a sequence of values for the properties of
780
           the version specified by serial and the keys, in the order given.
781
           If keys is empty, return all properties in the order
782
           (serial, node, hash, size, type, source, mtime, muser, uuid,
783
            checksum, cluster).
784
        """
785

    
786
        v = self.versions.alias()
787
        s = select([v.c.serial, v.c.node, v.c.hash,
788
                    v.c.size, v.c.type, v.c.source,
789
                    v.c.mtime, v.c.muser, v.c.uuid,
790
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
791
        rp = self.conn.execute(s)
792
        r = rp.fetchone()
793
        rp.close()
794
        if r is None:
795
            return r
796

    
797
        if not keys:
798
            return r
799
        return [r[propnames[k]] for k in keys if k in propnames]
800

    
801
    def version_put_property(self, serial, key, value):
802
        """Set value for the property of version specified by key."""
803

    
804
        if key not in _propnames:
805
            return
806
        s = self.versions.update()
807
        s = s.where(self.versions.c.serial == serial)
808
        s = s.values(**{key: value})
809
        self.conn.execute(s).close()
810

    
811
    def version_recluster(self, serial, cluster,
812
                          update_statistics_ancestors_depth=None):
813
        """Move the version into another cluster."""
814

    
815
        props = self.version_get_properties(serial)
816
        if not props:
817
            return
818
        node = props[NODE]
819
        size = props[SIZE]
820
        oldcluster = props[CLUSTER]
821
        if cluster == oldcluster:
822
            return
823

    
824
        mtime = time()
825
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
826
                                         update_statistics_ancestors_depth)
827
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
828
                                         update_statistics_ancestors_depth)
829

    
830
        s = self.versions.update()
831
        s = s.where(self.versions.c.serial == serial)
832
        s = s.values(cluster=cluster)
833
        self.conn.execute(s).close()
834

    
835
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
836
        """Remove the serial specified."""
837

    
838
        props = self.version_get_properties(serial)
839
        if not props:
840
            return
841
        node = props[NODE]
842
        hash = props[HASH]
843
        size = props[SIZE]
844
        cluster = props[CLUSTER]
845

    
846
        mtime = time()
847
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
848
                                         update_statistics_ancestors_depth)
849

    
850
        s = self.versions.delete().where(self.versions.c.serial == serial)
851
        self.conn.execute(s).close()
852

    
853
        props = self.version_lookup(node, cluster=cluster, all_props=False)
854
        if props:
855
            self.nodes_set_latest_version(node, serial)
856

    
857
        return hash, size
858

    
859
    def attribute_get(self, serial, domain, keys=()):
860
        """Return a list of (key, value) pairs of the specific version.
861

862
        If keys is empty, return all attributes.
863
        Othwerise, return only those specified.
864
        """
865

    
866
        if keys:
867
            attrs = self.attributes.alias()
868
            s = select([attrs.c.key, attrs.c.value])
869
            s = s.where(and_(attrs.c.key.in_(keys),
870
                             attrs.c.serial == serial,
871
                             attrs.c.domain == domain))
872
        else:
873
            attrs = self.attributes.alias()
874
            s = select([attrs.c.key, attrs.c.value])
875
            s = s.where(and_(attrs.c.serial == serial,
876
                             attrs.c.domain == domain))
877
        r = self.conn.execute(s)
878
        l = r.fetchall()
879
        r.close()
880
        return l
881

    
882
    def attribute_set(self, serial, domain, node, items, is_latest=True):
883
        """Set the attributes of the version specified by serial.
884
           Receive attributes as an iterable of (key, value) pairs.
885
        """
886
        #insert or replace
887
        #TODO better upsert
888
        for k, v in items:
889
            s = self.attributes.update()
890
            s = s.where(and_(self.attributes.c.serial == serial,
891
                             self.attributes.c.domain == domain,
892
                             self.attributes.c.key == k))
893
            s = s.values(value=v)
894
            rp = self.conn.execute(s)
895
            rp.close()
896
            if rp.rowcount == 0:
897
                s = self.attributes.insert()
898
                s = s.values(serial=serial, domain=domain, node=node,
899
                             is_latest=is_latest, key=k, value=v)
900
                self.conn.execute(s).close()
901

    
902
    def attribute_del(self, serial, domain, keys=()):
903
        """Delete attributes of the version specified by serial.
904
           If keys is empty, delete all attributes.
905
           Otherwise delete those specified.
906
        """
907

    
908
        if keys:
909
            #TODO more efficient way to do this?
910
            for key in keys:
911
                s = self.attributes.delete()
912
                s = s.where(and_(self.attributes.c.serial == serial,
913
                                 self.attributes.c.domain == domain,
914
                                 self.attributes.c.key == key))
915
                self.conn.execute(s).close()
916
        else:
917
            s = self.attributes.delete()
918
            s = s.where(and_(self.attributes.c.serial == serial,
919
                             self.attributes.c.domain == domain))
920
            self.conn.execute(s).close()
921

    
922
    def attribute_copy(self, source, dest):
923
        s = select(
924
            [dest, self.attributes.c.domain, self.attributes.c.node,
925
             self.attributes.c.key, self.attributes.c.value],
926
            self.attributes.c.serial == source)
927
        rp = self.conn.execute(s)
928
        attributes = rp.fetchall()
929
        rp.close()
930
        for dest, domain, node, k, v in attributes:
931
            select_src_node = select(
932
                [self.versions.c.node],
933
                self.versions.c.serial == dest)
934
            # insert or replace
935
            s = self.attributes.update().where(and_(
936
                self.attributes.c.serial == dest,
937
                self.attributes.c.domain == domain,
938
                self.attributes.c.key == k))
939
            s = s.values(node=select_src_node, value=v)
940
            rp = self.conn.execute(s)
941
            rp.close()
942
            if rp.rowcount == 0:
943
                s = self.attributes.insert()
944
                s = s.values(serial=dest, domain=domain, node=select_src_node,
945
                             is_latest=True, key=k, value=v)
946
            self.conn.execute(s).close()
947

    
948
    def attribute_unset_is_latest(self, node, exclude):
949
        u = self.attributes.update().where(and_(
950
            self.attributes.c.node == node,
951
            self.attributes.c.serial != exclude)).values({'is_latest': False})
952
        self.conn.execute(u)
953

    
954
    def latest_attribute_keys(self, parent, domain, before=inf,
955
                              except_cluster=0, pathq=None):
956
        """Return a list with all keys pairs defined
957
           for all latest versions under parent that
958
           do not belong to the cluster.
959
        """
960

    
961
        pathq = pathq or []
962

    
963
        # TODO: Use another table to store before=inf results.
964
        a = self.attributes.alias('a')
965
        v = self.versions.alias('v')
966
        n = self.nodes.alias('n')
967
        s = select([a.c.key]).distinct()
968
        if before != inf:
969
            filtered = select([func.max(self.versions.c.serial)])
970
            filtered = filtered.where(self.versions.c.mtime < before)
971
            filtered = filtered.where(self.versions.c.node == v.c.node)
972
        else:
973
            filtered = select([self.nodes.c.latest_version])
974
            filtered = filtered.where(self.nodes.c.node == v.c.node)
975
        s = s.where(v.c.serial == filtered)
976
        s = s.where(v.c.cluster != except_cluster)
977
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
978
                                        self.nodes.c.parent == parent)))
979
        s = s.where(a.c.serial == v.c.serial)
980
        s = s.where(a.c.domain == domain)
981
        s = s.where(n.c.node == v.c.node)
982
        conj = []
983
        for path, match in pathq:
984
            if match == MATCH_PREFIX:
985
                conj.append(n.c.path.like(self.escape_like(path) + '%',
986
                                          escape=ESCAPE_CHAR))
987
            elif match == MATCH_EXACT:
988
                conj.append(n.c.path == path)
989
        if conj:
990
            s = s.where(or_(*conj))
991
        rp = self.conn.execute(s)
992
        rows = rp.fetchall()
993
        rp.close()
994
        return [r[0] for r in rows]
995

    
996
    def latest_version_list(self, parent, prefix='', delimiter=None,
997
                            start='', limit=10000, before=inf,
998
                            except_cluster=0, pathq=[], domain=None,
999
                            filterq=[], sizeq=None, all_props=False):
1000
        """Return a (list of (path, serial) tuples, list of common prefixes)
1001
           for the current versions of the paths with the given parent,
1002
           matching the following criteria.
1003

1004
           The property tuple for a version is returned if all
1005
           of these conditions are true:
1006

1007
                a. parent matches
1008

1009
                b. path > start
1010

1011
                c. path starts with prefix (and paths in pathq)
1012

1013
                d. version is the max up to before
1014

1015
                e. version is not in cluster
1016

1017
                f. the path does not have the delimiter occuring
1018
                   after the prefix, or ends with the delimiter
1019

1020
                g. serial matches the attribute filter query.
1021

1022
                   A filter query is a comma-separated list of
1023
                   terms in one of these three forms:
1024

1025
                   key
1026
                       an attribute with this key must exist
1027

1028
                   !key
1029
                       an attribute with this key must not exist
1030

1031
                   key ?op value
1032
                       the attribute with this key satisfies the value
1033
                       where ?op is one of ==, != <=, >=, <, >.
1034

1035
                h. the size is in the range set by sizeq
1036

1037
           The list of common prefixes includes the prefixes
1038
           matching up to the first delimiter after prefix,
1039
           and are reported only once, as "virtual directories".
1040
           The delimiter is included in the prefixes.
1041

1042
           If arguments are None, then the corresponding matching rule
1043
           will always match.
1044

1045
           Limit applies to the first list of tuples returned.
1046

1047
           If all_props is True, return all properties after path,
1048
           not just serial.
1049
        """
1050

    
1051
        if not start or start < prefix:
1052
            start = strprevling(prefix)
1053
        nextling = strnextling(prefix)
1054

    
1055
        v = self.versions.alias('v')
1056
        n = self.nodes.alias('n')
1057
        if not all_props:
1058
            s = select([n.c.path, v.c.serial]).distinct()
1059
        else:
1060
            s = select([n.c.path,
1061
                        v.c.serial, v.c.node, v.c.hash,
1062
                        v.c.size, v.c.type, v.c.source,
1063
                        v.c.mtime, v.c.muser, v.c.uuid,
1064
                        v.c.checksum, v.c.cluster]).distinct()
1065
        if before != inf:
1066
            filtered = select([func.max(self.versions.c.serial)])
1067
            filtered = filtered.where(self.versions.c.mtime < before)
1068
        else:
1069
            filtered = select([self.nodes.c.latest_version])
1070
        s = s.where(
1071
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
1072
        s = s.where(v.c.cluster != except_cluster)
1073
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
1074
                                        self.nodes.c.parent == parent)))
1075

    
1076
        s = s.where(n.c.node == v.c.node)
1077
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
1078
        conj = []
1079
        for path, match in pathq:
1080
            if match == MATCH_PREFIX:
1081
                conj.append(n.c.path.like(self.escape_like(path) + '%',
1082
                                          escape=ESCAPE_CHAR))
1083
            elif match == MATCH_EXACT:
1084
                conj.append(n.c.path == path)
1085
        if conj:
1086
            s = s.where(or_(*conj))
1087

    
1088
        if sizeq and len(sizeq) == 2:
1089
            if sizeq[0]:
1090
                s = s.where(v.c.size >= sizeq[0])
1091
            if sizeq[1]:
1092
                s = s.where(v.c.size < sizeq[1])
1093

    
1094
        if domain and filterq:
1095
            a = self.attributes.alias('a')
1096
            included, excluded, opers = parse_filters(filterq)
1097
            if included:
1098
                subs = select([1])
1099
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1100
                subs = subs.where(a.c.domain == domain)
1101
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1102
                s = s.where(exists(subs))
1103
            if excluded:
1104
                subs = select([1])
1105
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1106
                subs = subs.where(a.c.domain == domain)
1107
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1108
                s = s.where(not_(exists(subs)))
1109
            if opers:
1110
                for k, o, val in opers:
1111
                    subs = select([1])
1112
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1113
                    subs = subs.where(a.c.domain == domain)
1114
                    subs = subs.where(
1115
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1116
                    s = s.where(exists(subs))
1117

    
1118
        s = s.order_by(n.c.path)
1119

    
1120
        if not delimiter:
1121
            s = s.limit(limit)
1122
            rp = self.conn.execute(s, start=start)
1123
            r = rp.fetchall()
1124
            rp.close()
1125
            return r, ()
1126

    
1127
        pfz = len(prefix)
1128
        dz = len(delimiter)
1129
        count = 0
1130
        prefixes = []
1131
        pappend = prefixes.append
1132
        matches = []
1133
        mappend = matches.append
1134

    
1135
        rp = self.conn.execute(s, start=start)
1136
        while True:
1137
            props = rp.fetchone()
1138
            if props is None:
1139
                break
1140
            path = props[0]
1141
            idx = path.find(delimiter, pfz)
1142

    
1143
            if idx < 0:
1144
                mappend(props)
1145
                count += 1
1146
                if count >= limit:
1147
                    break
1148
                continue
1149

    
1150
            if idx + dz == len(path):
1151
                mappend(props)
1152
                count += 1
1153
                continue  # Get one more, in case there is a path.
1154
            pf = path[:idx + dz]
1155
            pappend(pf)
1156
            if count >= limit:
1157
                break
1158

    
1159
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1160
        rp.close()
1161

    
1162
        return matches, prefixes
1163

    
1164
    def latest_uuid(self, uuid, cluster):
1165
        """Return the latest version of the given uuid and cluster.
1166

1167
        Return a (path, serial) tuple.
1168
        If cluster is None, all clusters are considered.
1169

1170
        """
1171

    
1172
        v = self.versions.alias('v')
1173
        n = self.nodes.alias('n')
1174
        s = select([n.c.path, v.c.serial])
1175
        filtered = select([func.max(self.versions.c.serial)])
1176
        filtered = filtered.where(self.versions.c.uuid == uuid)
1177
        if cluster is not None:
1178
            filtered = filtered.where(self.versions.c.cluster == cluster)
1179
        s = s.where(v.c.serial == filtered)
1180
        s = s.where(n.c.node == v.c.node)
1181

    
1182
        r = self.conn.execute(s)
1183
        l = r.fetchone()
1184
        r.close()
1185
        return l
1186

    
1187
    def domain_object_list(self, domain, paths, cluster=None):
1188
        """Return a list of (path, property list, attribute dictionary)
1189
           for the objects in the specific domain and cluster.
1190
        """
1191

    
1192
        v = self.versions.alias('v')
1193
        n = self.nodes.alias('n')
1194
        a = self.attributes.alias('a')
1195

    
1196
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1197
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1198
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1199
        if cluster:
1200
            s = s.where(v.c.cluster == cluster)
1201
        s = s.where(v.c.serial == a.c.serial)
1202
        s = s.where(a.c.domain == domain)
1203
        s = s.where(a.c.node == n.c.node)
1204
        s = s.where(a.c.is_latest == True)
1205
        if paths:
1206
            s = s.where(n.c.path.in_(paths))
1207

    
1208
        r = self.conn.execute(s)
1209
        rows = r.fetchall()
1210
        r.close()
1211

    
1212
        group_by = itemgetter(slice(12))
1213
        rows.sort(key=group_by)
1214
        groups = groupby(rows, group_by)
1215
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1216
                (k, data) in groups]