Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ fbafd482

History | View | Annotate | Download (45.4 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.exc import NoSuchTableError
43

    
44
from dbworker import DBWorker, ESCAPE_CHAR
45

    
46
from pithos.backends.filter import parse_filters
47

    
48

    
49
ROOTNODE = 0
50

    
51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER) = range(11)
53

    
54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55

    
56
inf = float('inf')
57

    
58

    
59
def strnextling(prefix):
60
    """Return the first unicode string
61
       greater than but not starting with given prefix.
62
       strnextling('hello') -> 'hellp'
63
    """
64
    if not prefix:
65
        ## all strings start with the null string,
66
        ## therefore we have to approximate strnextling('')
67
        ## with the last unicode character supported by python
68
        ## 0x10ffff for wide (32-bit unicode) python builds
69
        ## 0x00ffff for narrow (16-bit unicode) python builds
70
        ## We will not autodetect. 0xffff is safe enough.
71
        return unichr(0xffff)
72
    s = prefix[:-1]
73
    c = ord(prefix[-1])
74
    if c >= 0xffff:
75
        raise RuntimeError
76
    s += unichr(c + 1)
77
    return s
78

    
79

    
80
def strprevling(prefix):
81
    """Return an approximation of the last unicode string
82
       less than but not starting with given prefix.
83
       strprevling(u'hello') -> u'helln\\xffff'
84
    """
85
    if not prefix:
86
        ## There is no prevling for the null string
87
        return prefix
88
    s = prefix[:-1]
89
    c = ord(prefix[-1])
90
    if c > 0:
91
        s += unichr(c - 1) + unichr(0xffff)
92
    return s
93

    
94
_propnames = {
95
    'serial': 0,
96
    'node': 1,
97
    'hash': 2,
98
    'size': 3,
99
    'type': 4,
100
    'source': 5,
101
    'mtime': 6,
102
    'muser': 7,
103
    'uuid': 8,
104
    'checksum': 9,
105
    'cluster': 10
106
}
107

    
108

    
109
def create_tables(engine):
110
    metadata = MetaData()
111

    
112
    #create nodes table
113
    columns = []
114
    columns.append(Column('node', Integer, primary_key=True))
115
    columns.append(Column('parent', Integer,
116
                          ForeignKey('nodes.node',
117
                                     ondelete='CASCADE',
118
                                     onupdate='CASCADE'),
119
                          autoincrement=False))
120
    columns.append(Column('latest_version', Integer))
121
    columns.append(Column('path', String(2048), default='', nullable=False))
122
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123
    Index('idx_nodes_path', nodes.c.path, unique=True)
124
    Index('idx_nodes_parent', nodes.c.parent)
125

    
126
    #create policy table
127
    columns = []
128
    columns.append(Column('node', Integer,
129
                          ForeignKey('nodes.node',
130
                                     ondelete='CASCADE',
131
                                     onupdate='CASCADE'),
132
                          primary_key=True))
133
    columns.append(Column('key', String(128), primary_key=True))
134
    columns.append(Column('value', String(256)))
135
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
136

    
137
    #create statistics table
138
    columns = []
139
    columns.append(Column('node', Integer,
140
                          ForeignKey('nodes.node',
141
                                     ondelete='CASCADE',
142
                                     onupdate='CASCADE'),
143
                          primary_key=True))
144
    columns.append(Column('population', Integer, nullable=False, default=0))
145
    columns.append(Column('size', BigInteger, nullable=False, default=0))
146
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
147
    columns.append(Column('cluster', Integer, nullable=False, default=0,
148
                          primary_key=True, autoincrement=False))
149
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
150

    
151
    #create versions table
152
    columns = []
153
    columns.append(Column('serial', Integer, primary_key=True))
154
    columns.append(Column('node', Integer,
155
                          ForeignKey('nodes.node',
156
                                     ondelete='CASCADE',
157
                                     onupdate='CASCADE')))
158
    columns.append(Column('hash', String(256)))
159
    columns.append(Column('size', BigInteger, nullable=False, default=0))
160
    columns.append(Column('type', String(256), nullable=False, default=''))
161
    columns.append(Column('source', Integer))
162
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
163
    columns.append(Column('muser', String(256), nullable=False, default=''))
164
    columns.append(Column('uuid', String(64), nullable=False, default=''))
165
    columns.append(Column('checksum', String(256), nullable=False, default=''))
166
    columns.append(Column('cluster', Integer, nullable=False, default=0))
167
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
168
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
169
    Index('idx_versions_node_uuid', versions.c.uuid)
170

    
171
    #create attributes table
172
    columns = []
173
    columns.append(Column('serial', Integer,
174
                          ForeignKey('versions.serial',
175
                                     ondelete='CASCADE',
176
                                     onupdate='CASCADE'),
177
                          primary_key=True))
178
    columns.append(Column('domain', String(256), primary_key=True))
179
    columns.append(Column('key', String(128), primary_key=True))
180
    columns.append(Column('value', String(256)))
181
    columns.append(Column('node', Integer, nullable=False, default=0))
182
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
183
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
184
    Index('idx_attributes_domain', attributes.c.domain)
185
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
186

    
187
    metadata.create_all(engine)
188
    return metadata.sorted_tables
189

    
190

    
191
class Node(DBWorker):
192
    """Nodes store path organization and have multiple versions.
193
       Versions store object history and have multiple attributes.
194
       Attributes store metadata.
195
    """
196

    
197
    # TODO: Provide an interface for included and excluded clusters.
198

    
199
    def __init__(self, **params):
200
        DBWorker.__init__(self, **params)
201
        try:
202
            metadata = MetaData(self.engine)
203
            self.nodes = Table('nodes', metadata, autoload=True)
204
            self.policy = Table('policy', metadata, autoload=True)
205
            self.statistics = Table('statistics', metadata, autoload=True)
206
            self.versions = Table('versions', metadata, autoload=True)
207
            self.attributes = Table('attributes', metadata, autoload=True)
208
        except NoSuchTableError:
209
            tables = create_tables(self.engine)
210
            map(lambda t: self.__setattr__(t.name, t), tables)
211

    
212
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
213
                                           self.nodes.c.parent == ROOTNODE))
214
        wrapper = self.wrapper
215
        wrapper.execute()
216
        try:
217
            rp = self.conn.execute(s)
218
            r = rp.fetchone()
219
            rp.close()
220
            if not r:
221
                s = self.nodes.insert(
222
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
223
                self.conn.execute(s)
224
        finally:
225
            wrapper.commit()
226

    
227
    def node_create(self, parent, path):
228
        """Create a new node from the given properties.
229
           Return the node identifier of the new node.
230
        """
231
        #TODO catch IntegrityError?
232
        s = self.nodes.insert().values(parent=parent, path=path)
233
        r = self.conn.execute(s)
234
        inserted_primary_key = r.inserted_primary_key[0]
235
        r.close()
236
        return inserted_primary_key
237

    
238
    def node_lookup(self, path, for_update=False):
239
        """Lookup the current node of the given path.
240
           Return None if the path is not found.
241
        """
242

    
243
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
244
        s = select([self.nodes.c.node], self.nodes.c.path.like(
245
            self.escape_like(path), escape=ESCAPE_CHAR), for_update=for_update)
246
        r = self.conn.execute(s)
247
        row = r.fetchone()
248
        r.close()
249
        if row:
250
            return row[0]
251
        return None
252

    
253
    def node_lookup_bulk(self, paths):
254
        """Lookup the current nodes for the given paths.
255
           Return () if the path is not found.
256
        """
257

    
258
        if not paths:
259
            return ()
260
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
261
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
262
        r = self.conn.execute(s)
263
        rows = r.fetchall()
264
        r.close()
265
        return [row[0] for row in rows]
266

    
267
    def node_get_properties(self, node):
268
        """Return the node's (parent, path).
269
           Return None if the node is not found.
270
        """
271

    
272
        s = select([self.nodes.c.parent, self.nodes.c.path])
273
        s = s.where(self.nodes.c.node == node)
274
        r = self.conn.execute(s)
275
        l = r.fetchone()
276
        r.close()
277
        return l
278

    
279
    def node_get_versions(self, node, keys=(), propnames=_propnames):
280
        """Return the properties of all versions at node.
281
           If keys is empty, return all properties in the order
282
           (serial, node, hash, size, type, source, mtime, muser, uuid,
283
            checksum, cluster).
284
        """
285

    
286
        s = select([self.versions.c.serial,
287
                    self.versions.c.node,
288
                    self.versions.c.hash,
289
                    self.versions.c.size,
290
                    self.versions.c.type,
291
                    self.versions.c.source,
292
                    self.versions.c.mtime,
293
                    self.versions.c.muser,
294
                    self.versions.c.uuid,
295
                    self.versions.c.checksum,
296
                    self.versions.c.cluster], self.versions.c.node == node)
297
        s = s.order_by(self.versions.c.serial)
298
        r = self.conn.execute(s)
299
        rows = r.fetchall()
300
        r.close()
301
        if not rows:
302
            return rows
303

    
304
        if not keys:
305
            return rows
306

    
307
        return [[p[propnames[k]] for k in keys if k in propnames] for
308
                p in rows]
309

    
310
    def node_count_children(self, node):
311
        """Return node's child count."""
312

    
313
        s = select([func.count(self.nodes.c.node)])
314
        s = s.where(and_(self.nodes.c.parent == node,
315
                         self.nodes.c.node != ROOTNODE))
316
        r = self.conn.execute(s)
317
        row = r.fetchone()
318
        r.close()
319
        return row[0]
320

    
321
    def node_purge_children(self, parent, before=inf, cluster=0,
322
                            update_statistics_ancestors_depth=None):
323
        """Delete all versions with the specified
324
           parent and cluster, and return
325
           the hashes, the total size and the serials of versions deleted.
326
           Clears out nodes with no remaining versions.
327
        """
328
        #update statistics
329
        c1 = select([self.nodes.c.node],
330
                    self.nodes.c.parent == parent)
331
        where_clause = and_(self.versions.c.node.in_(c1),
332
                            self.versions.c.cluster == cluster)
333
        if before != inf:
334
            where_clause = and_(where_clause,
335
                                self.versions.c.mtime <= before)
336
        s = select([func.count(self.versions.c.serial),
337
                    func.sum(self.versions.c.size)])
338
        s = s.where(where_clause)
339
        r = self.conn.execute(s)
340
        row = r.fetchone()
341
        r.close()
342
        if not row:
343
            return (), 0, ()
344
        nr, size = row[0], row[1] if row[1] else 0
345
        mtime = time()
346
        self.statistics_update(parent, -nr, -size, mtime, cluster)
347
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
348
                                         update_statistics_ancestors_depth)
349

    
350
        s = select([self.versions.c.hash, self.versions.c.serial])
351
        s = s.where(where_clause)
352
        r = self.conn.execute(s)
353
        hashes = []
354
        serials = []
355
        for row in r.fetchall():
356
            hashes += [row[0]]
357
            serials += [row[1]]
358
        r.close()
359

    
360
        #delete versions
361
        s = self.versions.delete().where(where_clause)
362
        r = self.conn.execute(s)
363
        r.close()
364

    
365
        #delete nodes
366
        s = select([self.nodes.c.node],
367
                   and_(self.nodes.c.parent == parent,
368
                        select([func.count(self.versions.c.serial)],
369
                               self.versions.c.node == self.nodes.c.node).
370
                        as_scalar() == 0))
371
        rp = self.conn.execute(s)
372
        nodes = [row[0] for row in rp.fetchall()]
373
        rp.close()
374
        if nodes:
375
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
376
            self.conn.execute(s).close()
377

    
378
        return hashes, size, serials
379

    
380
    def node_purge(self, node, before=inf, cluster=0,
381
                   update_statistics_ancestors_depth=None):
382
        """Delete all versions with the specified
383
           node and cluster, and return
384
           the hashes and size of versions deleted.
385
           Clears out the node if it has no remaining versions.
386
        """
387

    
388
        #update statistics
389
        s = select([func.count(self.versions.c.serial),
390
                    func.sum(self.versions.c.size)])
391
        where_clause = and_(self.versions.c.node == node,
392
                            self.versions.c.cluster == cluster)
393
        if before != inf:
394
            where_clause = and_(where_clause,
395
                                self.versions.c.mtime <= before)
396
        s = s.where(where_clause)
397
        r = self.conn.execute(s)
398
        row = r.fetchone()
399
        nr, size = row[0], row[1]
400
        r.close()
401
        if not nr:
402
            return (), 0, ()
403
        mtime = time()
404
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
405
                                         update_statistics_ancestors_depth)
406

    
407
        s = select([self.versions.c.hash, self.versions.c.serial])
408
        s = s.where(where_clause)
409
        r = self.conn.execute(s)
410
        hashes = []
411
        serials = []
412
        for row in r.fetchall():
413
            hashes += [row[0]]
414
            serials += [row[1]]
415
        r.close()
416

    
417
        #delete versions
418
        s = self.versions.delete().where(where_clause)
419
        r = self.conn.execute(s)
420
        r.close()
421

    
422
        #delete nodes
423
        s = select([self.nodes.c.node],
424
                   and_(self.nodes.c.node == node,
425
                        select([func.count(self.versions.c.serial)],
426
                               self.versions.c.node == self.nodes.c.node).
427
                        as_scalar() == 0))
428
        rp = self.conn.execute(s)
429
        nodes = [row[0] for row in rp.fetchall()]
430
        rp.close()
431
        if nodes:
432
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
433
            self.conn.execute(s).close()
434

    
435
        return hashes, size, serials
436

    
437
    def node_remove(self, node, update_statistics_ancestors_depth=None):
438
        """Remove the node specified.
439
           Return false if the node has children or is not found.
440
        """
441

    
442
        if self.node_count_children(node):
443
            return False
444

    
445
        mtime = time()
446
        s = select([func.count(self.versions.c.serial),
447
                    func.sum(self.versions.c.size),
448
                    self.versions.c.cluster])
449
        s = s.where(self.versions.c.node == node)
450
        s = s.group_by(self.versions.c.cluster)
451
        r = self.conn.execute(s)
452
        for population, size, cluster in r.fetchall():
453
            self.statistics_update_ancestors(
454
                node, -population, -size, mtime, cluster,
455
                update_statistics_ancestors_depth)
456
        r.close()
457

    
458
        s = self.nodes.delete().where(self.nodes.c.node == node)
459
        self.conn.execute(s).close()
460
        return True
461

    
462
    def node_accounts(self, accounts=()):
463
        s = select([self.nodes.c.path, self.nodes.c.node])
464
        s = s.where(and_(self.nodes.c.node != 0,
465
                         self.nodes.c.parent == 0))
466
        if accounts:
467
            s = s.where(self.nodes.c.path.in_(accounts))
468
        r = self.conn.execute(s)
469
        rows = r.fetchall()
470
        r.close()
471
        return rows
472

    
473
    def node_account_quotas(self):
474
        s = select([self.nodes.c.path, self.policy.c.value])
475
        s = s.where(and_(self.nodes.c.node != 0,
476
                         self.nodes.c.parent == 0))
477
        s = s.where(self.nodes.c.node == self.policy.c.node)
478
        s = s.where(self.policy.c.key == 'quota')
479
        r = self.conn.execute(s)
480
        rows = r.fetchall()
481
        r.close()
482
        return dict(rows)
483

    
484
    def node_account_usage(self, account_node, cluster):
485
        select_children = select(
486
            [self.nodes.c.node]).where(self.nodes.c.parent == account_node)
487
        select_descendants = select([self.nodes.c.node]).where(
488
            or_(self.nodes.c.parent.in_(select_children),
489
                self.nodes.c.node.in_(select_children)))
490
        s = select([func.sum(self.versions.c.size)])
491
        s = s.where(self.nodes.c.node == self.versions.c.node)
492
        s = s.where(self.nodes.c.node.in_(select_descendants))
493
        s = s.where(self.versions.c.cluster == cluster)
494
        r = self.conn.execute(s)
495
        usage = r.fetchone()[0]
496
        r.close()
497
        return usage
498

    
499
    def policy_get(self, node):
500
        s = select([self.policy.c.key, self.policy.c.value],
501
                   self.policy.c.node == node)
502
        r = self.conn.execute(s)
503
        d = dict(r.fetchall())
504
        r.close()
505
        return d
506

    
507
    def policy_set(self, node, policy):
508
        #insert or replace
509
        for k, v in policy.iteritems():
510
            s = self.policy.update().where(and_(self.policy.c.node == node,
511
                                                self.policy.c.key == k))
512
            s = s.values(value=v)
513
            rp = self.conn.execute(s)
514
            rp.close()
515
            if rp.rowcount == 0:
516
                s = self.policy.insert()
517
                values = {'node': node, 'key': k, 'value': v}
518
                r = self.conn.execute(s, values)
519
                r.close()
520

    
521
    def statistics_get(self, node, cluster=0):
522
        """Return population, total size and last mtime
523
           for all versions under node that belong to the cluster.
524
        """
525

    
526
        s = select([self.statistics.c.population,
527
                    self.statistics.c.size,
528
                    self.statistics.c.mtime])
529
        s = s.where(and_(self.statistics.c.node == node,
530
                         self.statistics.c.cluster == cluster))
531
        r = self.conn.execute(s)
532
        row = r.fetchone()
533
        r.close()
534
        return row
535

    
536
    def statistics_update(self, node, population, size, mtime, cluster=0):
537
        """Update the statistics of the given node.
538
           Statistics keep track the population, total
539
           size of objects and mtime in the node's namespace.
540
           May be zero or positive or negative numbers.
541
        """
542
        s = select([self.statistics.c.population, self.statistics.c.size],
543
                   and_(self.statistics.c.node == node,
544
                        self.statistics.c.cluster == cluster))
545
        rp = self.conn.execute(s)
546
        r = rp.fetchone()
547
        rp.close()
548
        if not r:
549
            prepopulation, presize = (0, 0)
550
        else:
551
            prepopulation, presize = r
552
        population += prepopulation
553
        population = max(population, 0)
554
        size += presize
555

    
556
        #insert or replace
557
        #TODO better upsert
558
        u = self.statistics.update().where(and_(
559
            self.statistics.c.node == node,
560
            self.statistics.c.cluster == cluster))
561
        u = u.values(population=population, size=size, mtime=mtime)
562
        rp = self.conn.execute(u)
563
        rp.close()
564
        if rp.rowcount == 0:
565
            ins = self.statistics.insert()
566
            ins = ins.values(node=node, population=population, size=size,
567
                             mtime=mtime, cluster=cluster)
568
            self.conn.execute(ins).close()
569

    
570
    def statistics_update_ancestors(self, node, population, size, mtime,
571
                                    cluster=0, recursion_depth=None):
572
        """Update the statistics of the given node's parent.
573
           Then recursively update all parents up to the root
574
           or up to the ``recursion_depth`` (if not None).
575
           Population is not recursive.
576
        """
577

    
578
        i = 0
579
        while True:
580
            if node == ROOTNODE:
581
                break
582
            if recursion_depth and recursion_depth <= i:
583
                break
584
            props = self.node_get_properties(node)
585
            if props is None:
586
                break
587
            parent, path = props
588
            self.statistics_update(parent, population, size, mtime, cluster)
589
            node = parent
590
            population = 0  # Population isn't recursive
591
            i += 1
592

    
593
    def statistics_latest(self, node, before=inf, except_cluster=0):
594
        """Return population, total size and last mtime
595
           for all latest versions under node that
596
           do not belong to the cluster.
597
        """
598

    
599
        # The node.
600
        props = self.node_get_properties(node)
601
        if props is None:
602
            return None
603
        parent, path = props
604

    
605
        # The latest version.
606
        s = select([self.versions.c.serial,
607
                    self.versions.c.node,
608
                    self.versions.c.hash,
609
                    self.versions.c.size,
610
                    self.versions.c.type,
611
                    self.versions.c.source,
612
                    self.versions.c.mtime,
613
                    self.versions.c.muser,
614
                    self.versions.c.uuid,
615
                    self.versions.c.checksum,
616
                    self.versions.c.cluster])
617
        if before != inf:
618
            filtered = select([func.max(self.versions.c.serial)],
619
                              self.versions.c.node == node)
620
            filtered = filtered.where(self.versions.c.mtime < before)
621
        else:
622
            filtered = select([self.nodes.c.latest_version],
623
                              self.nodes.c.node == node)
624
        s = s.where(and_(self.versions.c.cluster != except_cluster,
625
                         self.versions.c.serial == filtered))
626
        r = self.conn.execute(s)
627
        props = r.fetchone()
628
        r.close()
629
        if not props:
630
            return None
631
        mtime = props[MTIME]
632

    
633
        # First level, just under node (get population).
634
        v = self.versions.alias('v')
635
        s = select([func.count(v.c.serial),
636
                    func.sum(v.c.size),
637
                    func.max(v.c.mtime)])
638
        if before != inf:
639
            c1 = select([func.max(self.versions.c.serial)])
640
            c1 = c1.where(self.versions.c.mtime < before)
641
            c1.where(self.versions.c.node == v.c.node)
642
        else:
643
            c1 = select([self.nodes.c.latest_version])
644
            c1 = c1.where(self.nodes.c.node == v.c.node)
645
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
646
        s = s.where(and_(v.c.serial == c1,
647
                         v.c.cluster != except_cluster,
648
                         v.c.node.in_(c2)))
649
        rp = self.conn.execute(s)
650
        r = rp.fetchone()
651
        rp.close()
652
        if not r:
653
            return None
654
        count = r[0]
655
        mtime = max(mtime, r[2])
656
        if count == 0:
657
            return (0, 0, mtime)
658

    
659
        # All children (get size and mtime).
660
        # This is why the full path is stored.
661
        s = select([func.count(v.c.serial),
662
                    func.sum(v.c.size),
663
                    func.max(v.c.mtime)])
664
        if before != inf:
665
            c1 = select([func.max(self.versions.c.serial)],
666
                        self.versions.c.node == v.c.node)
667
            c1 = c1.where(self.versions.c.mtime < before)
668
        else:
669
            c1 = select([self.nodes.c.latest_version],
670
                        self.nodes.c.node == v.c.node)
671
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(
672
            self.escape_like(path) + '%', escape=ESCAPE_CHAR))
673
        s = s.where(and_(v.c.serial == c1,
674
                         v.c.cluster != except_cluster,
675
                         v.c.node.in_(c2)))
676
        rp = self.conn.execute(s)
677
        r = rp.fetchone()
678
        rp.close()
679
        if not r:
680
            return None
681
        size = r[1] - props[SIZE]
682
        mtime = max(mtime, r[2])
683
        return (count, size, mtime)
684

    
685
    def nodes_set_latest_version(self, node, serial):
686
        s = self.nodes.update().where(self.nodes.c.node == node)
687
        s = s.values(latest_version=serial)
688
        self.conn.execute(s).close()
689

    
690
    def version_create(self, node, hash, size, type, source, muser, uuid,
691
                       checksum, cluster=0,
692
                       update_statistics_ancestors_depth=None):
693
        """Create a new version from the given properties.
694
           Return the (serial, mtime) of the new version.
695
        """
696

    
697
        mtime = time()
698
        s = self.versions.insert().values(
699
            node=node, hash=hash, size=size, type=type, source=source,
700
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
701
            cluster=cluster)
702
        serial = self.conn.execute(s).inserted_primary_key[0]
703
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
704
                                         update_statistics_ancestors_depth)
705

    
706
        self.nodes_set_latest_version(node, serial)
707

    
708
        return serial, mtime
709

    
710
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
711
        """Lookup the current version of the given node.
712
           Return a list with its properties:
713
           (serial, node, hash, size, type, source, mtime,
714
            muser, uuid, checksum, cluster)
715
           or None if the current version is not found in the given cluster.
716
        """
717

    
718
        v = self.versions.alias('v')
719
        if not all_props:
720
            s = select([v.c.serial])
721
        else:
722
            s = select([v.c.serial, v.c.node, v.c.hash,
723
                        v.c.size, v.c.type, v.c.source,
724
                        v.c.mtime, v.c.muser, v.c.uuid,
725
                        v.c.checksum, v.c.cluster])
726
        if before != inf:
727
            c = select([func.max(self.versions.c.serial)],
728
                       self.versions.c.node == node)
729
            c = c.where(self.versions.c.mtime < before)
730
        else:
731
            c = select([self.nodes.c.latest_version],
732
                       self.nodes.c.node == node)
733
        s = s.where(and_(v.c.serial == c,
734
                         v.c.cluster == cluster))
735
        r = self.conn.execute(s)
736
        props = r.fetchone()
737
        r.close()
738
        if props:
739
            return props
740
        return None
741

    
742
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
743
                            all_props=True):
744
        """Lookup the current versions of the given nodes.
745
           Return a list with their properties:
746
           (serial, node, hash, size, type, source, mtime, muser, uuid,
747
            checksum, cluster).
748
        """
749
        if not nodes:
750
            return ()
751
        v = self.versions.alias('v')
752
        if not all_props:
753
            s = select([v.c.serial])
754
        else:
755
            s = select([v.c.serial, v.c.node, v.c.hash,
756
                        v.c.size, v.c.type, v.c.source,
757
                        v.c.mtime, v.c.muser, v.c.uuid,
758
                        v.c.checksum, v.c.cluster])
759
        if before != inf:
760
            c = select([func.max(self.versions.c.serial)],
761
                       self.versions.c.node.in_(nodes))
762
            c = c.where(self.versions.c.mtime < before)
763
            c = c.group_by(self.versions.c.node)
764
        else:
765
            c = select([self.nodes.c.latest_version],
766
                       self.nodes.c.node.in_(nodes))
767
        s = s.where(and_(v.c.serial.in_(c),
768
                         v.c.cluster == cluster))
769
        s = s.order_by(v.c.node)
770
        r = self.conn.execute(s)
771
        rproxy = r.fetchall()
772
        r.close()
773
        return (tuple(row.values()) for row in rproxy)
774

    
775
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
776
        """Return a sequence of values for the properties of
777
           the version specified by serial and the keys, in the order given.
778
           If keys is empty, return all properties in the order
779
           (serial, node, hash, size, type, source, mtime, muser, uuid,
780
            checksum, cluster).
781
        """
782

    
783
        v = self.versions.alias()
784
        s = select([v.c.serial, v.c.node, v.c.hash,
785
                    v.c.size, v.c.type, v.c.source,
786
                    v.c.mtime, v.c.muser, v.c.uuid,
787
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
788
        rp = self.conn.execute(s)
789
        r = rp.fetchone()
790
        rp.close()
791
        if r is None:
792
            return r
793

    
794
        if not keys:
795
            return r
796
        return [r[propnames[k]] for k in keys if k in propnames]
797

    
798
    def version_put_property(self, serial, key, value):
799
        """Set value for the property of version specified by key."""
800

    
801
        if key not in _propnames:
802
            return
803
        s = self.versions.update()
804
        s = s.where(self.versions.c.serial == serial)
805
        s = s.values(**{key: value})
806
        self.conn.execute(s).close()
807

    
808
    def version_recluster(self, serial, cluster,
809
                          update_statistics_ancestors_depth=None):
810
        """Move the version into another cluster."""
811

    
812
        props = self.version_get_properties(serial)
813
        if not props:
814
            return
815
        node = props[NODE]
816
        size = props[SIZE]
817
        oldcluster = props[CLUSTER]
818
        if cluster == oldcluster:
819
            return
820

    
821
        mtime = time()
822
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
823
                                         update_statistics_ancestors_depth)
824
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
825
                                         update_statistics_ancestors_depth)
826

    
827
        s = self.versions.update()
828
        s = s.where(self.versions.c.serial == serial)
829
        s = s.values(cluster=cluster)
830
        self.conn.execute(s).close()
831

    
832
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
833
        """Remove the serial specified."""
834

    
835
        props = self.version_get_properties(serial)
836
        if not props:
837
            return
838
        node = props[NODE]
839
        hash = props[HASH]
840
        size = props[SIZE]
841
        cluster = props[CLUSTER]
842

    
843
        mtime = time()
844
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
845
                                         update_statistics_ancestors_depth)
846

    
847
        s = self.versions.delete().where(self.versions.c.serial == serial)
848
        self.conn.execute(s).close()
849

    
850
        props = self.version_lookup(node, cluster=cluster, all_props=False)
851
        if props:
852
            self.nodes_set_latest_version(node, serial)
853

    
854
        return hash, size
855

    
856
    def attribute_get(self, serial, domain, keys=()):
857
        """Return a list of (key, value) pairs of the specific version.
858

859
        If keys is empty, return all attributes.
860
        Othwerise, return only those specified.
861
        """
862

    
863
        if keys:
864
            attrs = self.attributes.alias()
865
            s = select([attrs.c.key, attrs.c.value])
866
            s = s.where(and_(attrs.c.key.in_(keys),
867
                             attrs.c.serial == serial,
868
                             attrs.c.domain == domain))
869
        else:
870
            attrs = self.attributes.alias()
871
            s = select([attrs.c.key, attrs.c.value])
872
            s = s.where(and_(attrs.c.serial == serial,
873
                             attrs.c.domain == domain))
874
        r = self.conn.execute(s)
875
        l = r.fetchall()
876
        r.close()
877
        return l
878

    
879
    def attribute_set(self, serial, domain, node, items, is_latest=True):
880
        """Set the attributes of the version specified by serial.
881
           Receive attributes as an iterable of (key, value) pairs.
882
        """
883
        #insert or replace
884
        #TODO better upsert
885
        for k, v in items:
886
            s = self.attributes.update()
887
            s = s.where(and_(self.attributes.c.serial == serial,
888
                             self.attributes.c.domain == domain,
889
                             self.attributes.c.key == k))
890
            s = s.values(value=v)
891
            rp = self.conn.execute(s)
892
            rp.close()
893
            if rp.rowcount == 0:
894
                s = self.attributes.insert()
895
                s = s.values(serial=serial, domain=domain, node=node,
896
                             is_latest=is_latest, key=k, value=v)
897
                self.conn.execute(s).close()
898

    
899
    def attribute_del(self, serial, domain, keys=()):
900
        """Delete attributes of the version specified by serial.
901
           If keys is empty, delete all attributes.
902
           Otherwise delete those specified.
903
        """
904

    
905
        if keys:
906
            #TODO more efficient way to do this?
907
            for key in keys:
908
                s = self.attributes.delete()
909
                s = s.where(and_(self.attributes.c.serial == serial,
910
                                 self.attributes.c.domain == domain,
911
                                 self.attributes.c.key == key))
912
                self.conn.execute(s).close()
913
        else:
914
            s = self.attributes.delete()
915
            s = s.where(and_(self.attributes.c.serial == serial,
916
                             self.attributes.c.domain == domain))
917
            self.conn.execute(s).close()
918

    
919
    def attribute_copy(self, source, dest):
920
        s = select(
921
            [dest, self.attributes.c.domain, self.attributes.c.node,
922
             self.attributes.c.key, self.attributes.c.value],
923
            self.attributes.c.serial == source)
924
        rp = self.conn.execute(s)
925
        attributes = rp.fetchall()
926
        rp.close()
927
        for dest, domain, node, k, v in attributes:
928
            select_src_node = select(
929
                [self.versions.c.node],
930
                self.versions.c.serial == dest)
931
            # insert or replace
932
            s = self.attributes.update().where(and_(
933
                self.attributes.c.serial == dest,
934
                self.attributes.c.domain == domain,
935
                self.attributes.c.key == k))
936
            s = s.values(node = select_src_node, value=v)
937
            rp = self.conn.execute(s)
938
            rp.close()
939
            if rp.rowcount == 0:
940
                s = self.attributes.insert()
941
                s = s.values(serial=dest, domain=domain, node=select_src_node,
942
                             is_latest=True, key=k, value=v)
943
            self.conn.execute(s).close()
944

    
945
    def attribute_unset_is_latest(self, node, exclude):
946
        u = self.attributes.update().where(and_(
947
            self.attributes.c.node == node,
948
            self.attributes.c.serial != exclude)).values({'is_latest': False})
949
        self.conn.execute(u)
950

    
951
    def latest_attribute_keys(self, parent, domain, before=inf,
952
                              except_cluster=0, pathq=None):
953
        """Return a list with all keys pairs defined
954
           for all latest versions under parent that
955
           do not belong to the cluster.
956
        """
957

    
958
        pathq = pathq or []
959

    
960
        # TODO: Use another table to store before=inf results.
961
        a = self.attributes.alias('a')
962
        v = self.versions.alias('v')
963
        n = self.nodes.alias('n')
964
        s = select([a.c.key]).distinct()
965
        if before != inf:
966
            filtered = select([func.max(self.versions.c.serial)])
967
            filtered = filtered.where(self.versions.c.mtime < before)
968
            filtered = filtered.where(self.versions.c.node == v.c.node)
969
        else:
970
            filtered = select([self.nodes.c.latest_version])
971
            filtered = filtered.where(self.nodes.c.node == v.c.node)
972
        s = s.where(v.c.serial == filtered)
973
        s = s.where(v.c.cluster != except_cluster)
974
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
975
                                        self.nodes.c.parent == parent)))
976
        s = s.where(a.c.serial == v.c.serial)
977
        s = s.where(a.c.domain == domain)
978
        s = s.where(n.c.node == v.c.node)
979
        conj = []
980
        for path, match in pathq:
981
            if match == MATCH_PREFIX:
982
                conj.append(
983
                    n.c.path.like(
984
                        self.escape_like(path) + '%',
985
                        escape=ESCAPE_CHAR
986
                    )
987
                )
988
            elif match == MATCH_EXACT:
989
                conj.append(n.c.path == path)
990
        if conj:
991
            s = s.where(or_(*conj))
992
        rp = self.conn.execute(s)
993
        rows = rp.fetchall()
994
        rp.close()
995
        return [r[0] for r in rows]
996

    
997
    def latest_version_list(self, parent, prefix='', delimiter=None,
998
                            start='', limit=10000, before=inf,
999
                            except_cluster=0, pathq=[], domain=None,
1000
                            filterq=[], sizeq=None, all_props=False):
1001
        """Return a (list of (path, serial) tuples, list of common prefixes)
1002
           for the current versions of the paths with the given parent,
1003
           matching the following criteria.
1004

1005
           The property tuple for a version is returned if all
1006
           of these conditions are true:
1007

1008
                a. parent matches
1009

1010
                b. path > start
1011

1012
                c. path starts with prefix (and paths in pathq)
1013

1014
                d. version is the max up to before
1015

1016
                e. version is not in cluster
1017

1018
                f. the path does not have the delimiter occuring
1019
                   after the prefix, or ends with the delimiter
1020

1021
                g. serial matches the attribute filter query.
1022

1023
                   A filter query is a comma-separated list of
1024
                   terms in one of these three forms:
1025

1026
                   key
1027
                       an attribute with this key must exist
1028

1029
                   !key
1030
                       an attribute with this key must not exist
1031

1032
                   key ?op value
1033
                       the attribute with this key satisfies the value
1034
                       where ?op is one of ==, != <=, >=, <, >.
1035

1036
                h. the size is in the range set by sizeq
1037

1038
           The list of common prefixes includes the prefixes
1039
           matching up to the first delimiter after prefix,
1040
           and are reported only once, as "virtual directories".
1041
           The delimiter is included in the prefixes.
1042

1043
           If arguments are None, then the corresponding matching rule
1044
           will always match.
1045

1046
           Limit applies to the first list of tuples returned.
1047

1048
           If all_props is True, return all properties after path,
1049
           not just serial.
1050
        """
1051

    
1052
        if not start or start < prefix:
1053
            start = strprevling(prefix)
1054
        nextling = strnextling(prefix)
1055

    
1056
        v = self.versions.alias('v')
1057
        n = self.nodes.alias('n')
1058
        if not all_props:
1059
            s = select([n.c.path, v.c.serial]).distinct()
1060
        else:
1061
            s = select([n.c.path,
1062
                        v.c.serial, v.c.node, v.c.hash,
1063
                        v.c.size, v.c.type, v.c.source,
1064
                        v.c.mtime, v.c.muser, v.c.uuid,
1065
                        v.c.checksum, v.c.cluster]).distinct()
1066
        if before != inf:
1067
            filtered = select([func.max(self.versions.c.serial)])
1068
            filtered = filtered.where(self.versions.c.mtime < before)
1069
        else:
1070
            filtered = select([self.nodes.c.latest_version])
1071
        s = s.where(
1072
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
1073
        s = s.where(v.c.cluster != except_cluster)
1074
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
1075
                                        self.nodes.c.parent == parent)))
1076

    
1077
        s = s.where(n.c.node == v.c.node)
1078
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
1079
        conj = []
1080
        for path, match in pathq:
1081
            if match == MATCH_PREFIX:
1082
                conj.append(
1083
                    n.c.path.like(
1084
                        self.escape_like(path) + '%',
1085
                        escape=ESCAPE_CHAR
1086
                    )
1087
                )
1088
            elif match == MATCH_EXACT:
1089
                conj.append(n.c.path == path)
1090
        if conj:
1091
            s = s.where(or_(*conj))
1092

    
1093
        if sizeq and len(sizeq) == 2:
1094
            if sizeq[0]:
1095
                s = s.where(v.c.size >= sizeq[0])
1096
            if sizeq[1]:
1097
                s = s.where(v.c.size < sizeq[1])
1098

    
1099
        if domain and filterq:
1100
            a = self.attributes.alias('a')
1101
            included, excluded, opers = parse_filters(filterq)
1102
            if included:
1103
                subs = select([1])
1104
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1105
                subs = subs.where(a.c.domain == domain)
1106
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1107
                s = s.where(exists(subs))
1108
            if excluded:
1109
                subs = select([1])
1110
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1111
                subs = subs.where(a.c.domain == domain)
1112
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1113
                s = s.where(not_(exists(subs)))
1114
            if opers:
1115
                for k, o, val in opers:
1116
                    subs = select([1])
1117
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1118
                    subs = subs.where(a.c.domain == domain)
1119
                    subs = subs.where(
1120
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1121
                    s = s.where(exists(subs))
1122

    
1123
        s = s.order_by(n.c.path)
1124

    
1125
        if not delimiter:
1126
            s = s.limit(limit)
1127
            rp = self.conn.execute(s, start=start)
1128
            r = rp.fetchall()
1129
            rp.close()
1130
            return r, ()
1131

    
1132
        pfz = len(prefix)
1133
        dz = len(delimiter)
1134
        count = 0
1135
        prefixes = []
1136
        pappend = prefixes.append
1137
        matches = []
1138
        mappend = matches.append
1139

    
1140
        rp = self.conn.execute(s, start=start)
1141
        while True:
1142
            props = rp.fetchone()
1143
            if props is None:
1144
                break
1145
            path = props[0]
1146
            idx = path.find(delimiter, pfz)
1147

    
1148
            if idx < 0:
1149
                mappend(props)
1150
                count += 1
1151
                if count >= limit:
1152
                    break
1153
                continue
1154

    
1155
            if idx + dz == len(path):
1156
                mappend(props)
1157
                count += 1
1158
                continue  # Get one more, in case there is a path.
1159
            pf = path[:idx + dz]
1160
            pappend(pf)
1161
            if count >= limit:
1162
                break
1163

    
1164
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1165
        rp.close()
1166

    
1167
        return matches, prefixes
1168

    
1169
    def latest_uuid(self, uuid, cluster):
1170
        """Return the latest version of the given uuid and cluster.
1171

1172
        Return a (path, serial) tuple.
1173
        If cluster is None, all clusters are considered.
1174

1175
        """
1176

    
1177
        v = self.versions.alias('v')
1178
        n = self.nodes.alias('n')
1179
        s = select([n.c.path, v.c.serial])
1180
        filtered = select([func.max(self.versions.c.serial)])
1181
        filtered = filtered.where(self.versions.c.uuid == uuid)
1182
        if cluster is not None:
1183
            filtered = filtered.where(self.versions.c.cluster == cluster)
1184
        s = s.where(v.c.serial == filtered)
1185
        s = s.where(n.c.node == v.c.node)
1186

    
1187
        r = self.conn.execute(s)
1188
        l = r.fetchone()
1189
        r.close()
1190
        return l
1191

    
1192
    def domain_object_list(self, domain, paths, cluster=None):
1193
        """Return a list of (path, property list, attribute dictionary)
1194
           for the objects in the specific domain and cluster.
1195
        """
1196

    
1197
        v = self.versions.alias('v')
1198
        n = self.nodes.alias('n')
1199
        a = self.attributes.alias('a')
1200

    
1201
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1202
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1203
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1204
        if cluster:
1205
            s = s.where(v.c.cluster == cluster)
1206
        s = s.where(v.c.serial == a.c.serial)
1207
        s = s.where(a.c.domain == domain)
1208
        s = s.where(a.c.node == n.c.node)
1209
        s = s.where(a.c.is_latest == True)
1210
        if paths:
1211
            s = s.where(n.c.path.in_(paths))
1212

    
1213
        r = self.conn.execute(s)
1214
        rows = r.fetchall()
1215
        r.close()
1216

    
1217
        group_by = itemgetter(slice(12))
1218
        rows.sort(key=group_by)
1219
        groups = groupby(rows, group_by)
1220
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1221
                (k, data) in groups]