Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ ae6199c5

History | View | Annotate | Download (43.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
39
from sqlalchemy.types import Text
40
from sqlalchemy.schema import Index, Sequence
41
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
42
from sqlalchemy.ext.compiler import compiles
43
from sqlalchemy.engine.reflection import Inspector
44
from sqlalchemy.exc import NoSuchTableError
45

    
46
from dbworker import DBWorker
47

    
48
from pithos.backends.filter import parse_filters
49

    
50

    
51
ROOTNODE = 0
52

    
53
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
54
 CLUSTER) = range(11)
55

    
56
(MATCH_PREFIX, MATCH_EXACT) = range(2)
57

    
58
inf = float('inf')
59

    
60

    
61
def strnextling(prefix):
62
    """Return the first unicode string
63
       greater than but not starting with given prefix.
64
       strnextling('hello') -> 'hellp'
65
    """
66
    if not prefix:
67
        ## all strings start with the null string,
68
        ## therefore we have to approximate strnextling('')
69
        ## with the last unicode character supported by python
70
        ## 0x10ffff for wide (32-bit unicode) python builds
71
        ## 0x00ffff for narrow (16-bit unicode) python builds
72
        ## We will not autodetect. 0xffff is safe enough.
73
        return unichr(0xffff)
74
    s = prefix[:-1]
75
    c = ord(prefix[-1])
76
    if c >= 0xffff:
77
        raise RuntimeError
78
    s += unichr(c + 1)
79
    return s
80

    
81

    
82
def strprevling(prefix):
83
    """Return an approximation of the last unicode string
84
       less than but not starting with given prefix.
85
       strprevling(u'hello') -> u'helln\\xffff'
86
    """
87
    if not prefix:
88
        ## There is no prevling for the null string
89
        return prefix
90
    s = prefix[:-1]
91
    c = ord(prefix[-1])
92
    if c > 0:
93
        s += unichr(c - 1) + unichr(0xffff)
94
    return s
95

    
96
_propnames = {
97
    'serial': 0,
98
    'node': 1,
99
    'hash': 2,
100
    'size': 3,
101
    'type': 4,
102
    'source': 5,
103
    'mtime': 6,
104
    'muser': 7,
105
    'uuid': 8,
106
    'checksum': 9,
107
    'cluster': 10
108
}
109

    
110

    
111
def create_tables(engine):
112
    metadata = MetaData()
113

    
114
    #create nodes table
115
    columns = []
116
    columns.append(Column('node', Integer, primary_key=True))
117
    columns.append(Column('parent', Integer,
118
                          ForeignKey('nodes.node',
119
                                     ondelete='CASCADE',
120
                                     onupdate='CASCADE'),
121
                          autoincrement=False))
122
    columns.append(Column('latest_version', Integer))
123
    columns.append(Column('path', String(2048), default='', nullable=False))
124
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
125
    Index('idx_nodes_path', nodes.c.path, unique=True)
126
    Index('idx_nodes_parent', nodes.c.parent)
127

    
128
    #create policy table
129
    columns = []
130
    columns.append(Column('node', Integer,
131
                          ForeignKey('nodes.node',
132
                                     ondelete='CASCADE',
133
                                     onupdate='CASCADE'),
134
                          primary_key=True))
135
    columns.append(Column('key', String(128), primary_key=True))
136
    columns.append(Column('value', String(256)))
137
    policy = Table('policy', metadata, *columns, mysql_engine='InnoDB')
138

    
139
    #create statistics table
140
    columns = []
141
    columns.append(Column('node', Integer,
142
                          ForeignKey('nodes.node',
143
                                     ondelete='CASCADE',
144
                                     onupdate='CASCADE'),
145
                          primary_key=True))
146
    columns.append(Column('population', Integer, nullable=False, default=0))
147
    columns.append(Column('size', BigInteger, nullable=False, default=0))
148
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
149
    columns.append(Column('cluster', Integer, nullable=False, default=0,
150
                          primary_key=True, autoincrement=False))
151
    statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
152

    
153
    #create versions table
154
    columns = []
155
    columns.append(Column('serial', Integer, primary_key=True))
156
    columns.append(Column('node', Integer,
157
                          ForeignKey('nodes.node',
158
                                     ondelete='CASCADE',
159
                                     onupdate='CASCADE')))
160
    columns.append(Column('hash', String(256)))
161
    columns.append(Column('size', BigInteger, nullable=False, default=0))
162
    columns.append(Column('type', String(256), nullable=False, default=''))
163
    columns.append(Column('source', Integer))
164
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
165
    columns.append(Column('muser', String(256), nullable=False, default=''))
166
    columns.append(Column('uuid', String(64), nullable=False, default=''))
167
    columns.append(Column('checksum', String(256), nullable=False, default=''))
168
    columns.append(Column('cluster', Integer, nullable=False, default=0))
169
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
170
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
171
    Index('idx_versions_node_uuid', versions.c.uuid)
172

    
173
    #create attributes table
174
    columns = []
175
    columns.append(Column('serial', Integer,
176
                          ForeignKey('versions.serial',
177
                                     ondelete='CASCADE',
178
                                     onupdate='CASCADE'),
179
                          primary_key=True))
180
    columns.append(Column('domain', String(256), primary_key=True))
181
    columns.append(Column('key', String(128), primary_key=True))
182
    columns.append(Column('value', String(256)))
183
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
184
    Index('idx_attributes_domain', attributes.c.domain)
185

    
186
    metadata.create_all(engine)
187
    return metadata.sorted_tables
188

    
189

    
190
class Node(DBWorker):
191
    """Nodes store path organization and have multiple versions.
192
       Versions store object history and have multiple attributes.
193
       Attributes store metadata.
194
    """
195

    
196
    # TODO: Provide an interface for included and excluded clusters.
197

    
198
    def __init__(self, **params):
199
        DBWorker.__init__(self, **params)
200
        try:
201
            metadata = MetaData(self.engine)
202
            self.nodes = Table('nodes', metadata, autoload=True)
203
            self.policy = Table('policy', metadata, autoload=True)
204
            self.statistics = Table('statistics', metadata, autoload=True)
205
            self.versions = Table('versions', metadata, autoload=True)
206
            self.attributes = Table('attributes', metadata, autoload=True)
207
        except NoSuchTableError:
208
            tables = create_tables(self.engine)
209
            map(lambda t: self.__setattr__(t.name, t), tables)
210

    
211
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
212
                                           self.nodes.c.parent == ROOTNODE))
213
        wrapper = self.wrapper
214
        wrapper.execute()
215
        try:
216
            rp = self.conn.execute(s)
217
            r = rp.fetchone()
218
            rp.close()
219
            if not r:
220
                s = self.nodes.insert(
221
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
222
                self.conn.execute(s)
223
        finally:
224
            wrapper.commit()
225

    
226
    def node_create(self, parent, path):
227
        """Create a new node from the given properties.
228
           Return the node identifier of the new node.
229
        """
230
        #TODO catch IntegrityError?
231
        s = self.nodes.insert().values(parent=parent, path=path)
232
        r = self.conn.execute(s)
233
        inserted_primary_key = r.inserted_primary_key[0]
234
        r.close()
235
        return inserted_primary_key
236

    
237
    def node_lookup(self, path):
238
        """Lookup the current node of the given path.
239
           Return None if the path is not found.
240
        """
241

    
242
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
243
        s = select([self.nodes.c.node], self.nodes.c.path.like(
244
            self.escape_like(path), escape='\\'))
245
        r = self.conn.execute(s)
246
        row = r.fetchone()
247
        r.close()
248
        if row:
249
            return row[0]
250
        return None
251

    
252
    def node_lookup_bulk(self, paths):
253
        """Lookup the current nodes for the given paths.
254
           Return () if the path is not found.
255
        """
256

    
257
        if not paths:
258
            return ()
259
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
260
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
261
        r = self.conn.execute(s)
262
        rows = r.fetchall()
263
        r.close()
264
        return [row[0] for row in rows]
265

    
266
    def node_get_properties(self, node):
267
        """Return the node's (parent, path).
268
           Return None if the node is not found.
269
        """
270

    
271
        s = select([self.nodes.c.parent, self.nodes.c.path])
272
        s = s.where(self.nodes.c.node == node)
273
        r = self.conn.execute(s)
274
        l = r.fetchone()
275
        r.close()
276
        return l
277

    
278
    def node_get_versions(self, node, keys=(), propnames=_propnames):
279
        """Return the properties of all versions at node.
280
           If keys is empty, return all properties in the order
281
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
282
        """
283

    
284
        s = select([self.versions.c.serial,
285
                    self.versions.c.node,
286
                    self.versions.c.hash,
287
                    self.versions.c.size,
288
                    self.versions.c.type,
289
                    self.versions.c.source,
290
                    self.versions.c.mtime,
291
                    self.versions.c.muser,
292
                    self.versions.c.uuid,
293
                    self.versions.c.checksum,
294
                    self.versions.c.cluster], self.versions.c.node == node)
295
        s = s.order_by(self.versions.c.serial)
296
        r = self.conn.execute(s)
297
        rows = r.fetchall()
298
        r.close()
299
        if not rows:
300
            return rows
301

    
302
        if not keys:
303
            return rows
304

    
305
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
306

    
307
    def node_count_children(self, node):
308
        """Return node's child count."""
309

    
310
        s = select([func.count(self.nodes.c.node)])
311
        s = s.where(and_(self.nodes.c.parent == node,
312
                         self.nodes.c.node != ROOTNODE))
313
        r = self.conn.execute(s)
314
        row = r.fetchone()
315
        r.close()
316
        return row[0]
317

    
318
    def node_purge_children(self, parent, before=inf, cluster=0):
319
        """Delete all versions with the specified
320
           parent and cluster, and return
321
           the hashes, the total size and the serials of versions deleted.
322
           Clears out nodes with no remaining versions.
323
        """
324
        #update statistics
325
        c1 = select([self.nodes.c.node],
326
                    self.nodes.c.parent == parent)
327
        where_clause = and_(self.versions.c.node.in_(c1),
328
                            self.versions.c.cluster == cluster)
329
        if before != inf:
330
            where_clause = and_(where_clause,
331
                                self.versions.c.mtime <= before)
332
        s = select([func.count(self.versions.c.serial),
333
                    func.sum(self.versions.c.size)])
334
        s = s.where(where_clause)
335
        r = self.conn.execute(s)
336
        row = r.fetchone()
337
        r.close()
338
        if not row:
339
            return (), 0, ()
340
        nr, size = row[0], row[1] if row[1] else 0
341
        mtime = time()
342
        self.statistics_update(parent, -nr, -size, mtime, cluster)
343
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
344

    
345
        s = select([self.versions.c.hash, self.versions.c.serial])
346
        s = s.where(where_clause)
347
        r = self.conn.execute(s)
348
        hashes = []
349
        serials = []
350
        for row in r.fetchall():
351
            hashes += [row[0]]
352
            serials += [row[1]]
353
        r.close()
354

    
355
        #delete versions
356
        s = self.versions.delete().where(where_clause)
357
        r = self.conn.execute(s)
358
        r.close()
359

    
360
        #delete nodes
361
        s = select([self.nodes.c.node],
362
                   and_(self.nodes.c.parent == parent,
363
                        select([func.count(self.versions.c.serial)],
364
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
365
        rp = self.conn.execute(s)
366
        nodes = [r[0] for r in rp.fetchall()]
367
        rp.close()
368
        if nodes:
369
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
370
            self.conn.execute(s).close()
371

    
372
        return hashes, size, serials
373

    
374
    def node_purge(self, node, before=inf, cluster=0):
375
        """Delete all versions with the specified
376
           node and cluster, and return
377
           the hashes and size of versions deleted.
378
           Clears out the node if it has no remaining versions.
379
        """
380

    
381
        #update statistics
382
        s = select([func.count(self.versions.c.serial),
383
                    func.sum(self.versions.c.size)])
384
        where_clause = and_(self.versions.c.node == node,
385
                            self.versions.c.cluster == cluster)
386
        if before != inf:
387
            where_clause = and_(where_clause,
388
                                self.versions.c.mtime <= before)
389
        s = s.where(where_clause)
390
        r = self.conn.execute(s)
391
        row = r.fetchone()
392
        nr, size = row[0], row[1]
393
        r.close()
394
        if not nr:
395
            return (), 0, ()
396
        mtime = time()
397
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
398

    
399
        s = select([self.versions.c.hash, self.versions.c.serial])
400
        s = s.where(where_clause)
401
        r = self.conn.execute(s)
402
        hashes = []
403
        serials = []
404
        for row in r.fetchall():
405
            hashes += [row[0]]
406
            serials += [row[1]]
407
        r.close()
408

    
409
        #delete versions
410
        s = self.versions.delete().where(where_clause)
411
        r = self.conn.execute(s)
412
        r.close()
413

    
414
        #delete nodes
415
        s = select([self.nodes.c.node],
416
                   and_(self.nodes.c.node == node,
417
                        select([func.count(self.versions.c.serial)],
418
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
419
        rp= self.conn.execute(s)
420
        nodes = [r[0] for r in rp.fetchall()]
421
        rp.close()
422
        if nodes:
423
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
424
            self.conn.execute(s).close()
425

    
426
        return hashes, size, serials
427

    
428
    def node_remove(self, node):
429
        """Remove the node specified.
430
           Return false if the node has children or is not found.
431
        """
432

    
433
        if self.node_count_children(node):
434
            return False
435

    
436
        mtime = time()
437
        s = select([func.count(self.versions.c.serial),
438
                    func.sum(self.versions.c.size),
439
                    self.versions.c.cluster])
440
        s = s.where(self.versions.c.node == node)
441
        s = s.group_by(self.versions.c.cluster)
442
        r = self.conn.execute(s)
443
        for population, size, cluster in r.fetchall():
444
            self.statistics_update_ancestors(
445
                node, -population, -size, mtime, cluster)
446
        r.close()
447

    
448
        s = self.nodes.delete().where(self.nodes.c.node == node)
449
        self.conn.execute(s).close()
450
        return True
451

    
452
    def node_accounts(self, accounts=()):
453
        s = select([self.nodes.c.path, self.nodes.c.node])
454
        s = s.where(and_(self.nodes.c.node != 0,
455
                         self.nodes.c.parent == 0))
456
        if accounts:
457
            s = s.where(self.nodes.c.path.in_(accounts))
458
        r = self.conn.execute(s)
459
        rows = r.fetchall()
460
        r.close()
461
        return rows
462

    
463
    def node_account_quotas(self):
464
        s = select([self.nodes.c.path, self.policy.c.value])
465
        s = s.where(and_(self.nodes.c.node != 0,
466
                         self.nodes.c.parent == 0))
467
        s = s.where(self.nodes.c.node == self.policy.c.node)
468
        s = s.where(self.policy.c.key == 'quota')
469
        r = self.conn.execute(s)
470
        rows = r.fetchall()
471
        r.close()
472
        return dict(rows)
473

    
474
    def node_account_usage(self, account_node, cluster):
475
        select_children = select(
476
            [self.nodes.c.node]).where(self.nodes.c.parent == account_node)
477
        select_descendants = select([self.nodes.c.node]).where(
478
            or_(self.nodes.c.parent.in_(select_children),
479
                self.nodes.c.node.in_(select_children)))
480
        s = select([func.sum(self.versions.c.size)])
481
        s = s.group_by(self.versions.c.cluster)
482
        s = s.where(self.nodes.c.node == self.versions.c.node)
483
        s = s.where(self.nodes.c.node.in_(select_descendants))
484
        s = s.where(self.versions.c.cluster == cluster)
485
        r = self.conn.execute(s)
486
        usage = r.fetchone()[0]
487
        r.close()
488
        return usage
489

    
490
    def policy_get(self, node):
491
        s = select([self.policy.c.key, self.policy.c.value],
492
                   self.policy.c.node == node)
493
        r = self.conn.execute(s)
494
        d = dict(r.fetchall())
495
        r.close()
496
        return d
497

    
498
    def policy_set(self, node, policy):
499
        #insert or replace
500
        for k, v in policy.iteritems():
501
            s = self.policy.update().where(and_(self.policy.c.node == node,
502
                                                self.policy.c.key == k))
503
            s = s.values(value=v)
504
            rp = self.conn.execute(s)
505
            rp.close()
506
            if rp.rowcount == 0:
507
                s = self.policy.insert()
508
                values = {'node': node, 'key': k, 'value': v}
509
                r = self.conn.execute(s, values)
510
                r.close()
511

    
512
    def statistics_get(self, node, cluster=0):
513
        """Return population, total size and last mtime
514
           for all versions under node that belong to the cluster.
515
        """
516

    
517
        s = select([self.statistics.c.population,
518
                    self.statistics.c.size,
519
                    self.statistics.c.mtime])
520
        s = s.where(and_(self.statistics.c.node == node,
521
                         self.statistics.c.cluster == cluster))
522
        r = self.conn.execute(s)
523
        row = r.fetchone()
524
        r.close()
525
        return row
526

    
527
    def statistics_update(self, node, population, size, mtime, cluster=0):
528
        """Update the statistics of the given node.
529
           Statistics keep track the population, total
530
           size of objects and mtime in the node's namespace.
531
           May be zero or positive or negative numbers.
532
        """
533
        s = select([self.statistics.c.population, self.statistics.c.size],
534
                   and_(self.statistics.c.node == node,
535
                        self.statistics.c.cluster == cluster))
536
        rp = self.conn.execute(s)
537
        r = rp.fetchone()
538
        rp.close()
539
        if not r:
540
            prepopulation, presize = (0, 0)
541
        else:
542
            prepopulation, presize = r
543
        population += prepopulation
544
        population = max(population, 0)
545
        size += presize
546

    
547
        #insert or replace
548
        #TODO better upsert
549
        u = self.statistics.update().where(and_(self.statistics.c.node == node,
550
                                           self.statistics.c.cluster == cluster))
551
        u = u.values(population=population, size=size, mtime=mtime)
552
        rp = self.conn.execute(u)
553
        rp.close()
554
        if rp.rowcount == 0:
555
            ins = self.statistics.insert()
556
            ins = ins.values(node=node, population=population, size=size,
557
                             mtime=mtime, cluster=cluster)
558
            self.conn.execute(ins).close()
559

    
560
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
561
        """Update the statistics of the given node's parent.
562
           Then recursively update all parents up to the root.
563
           Population is not recursive.
564
        """
565

    
566
        while True:
567
            if node == ROOTNODE:
568
                break
569
            props = self.node_get_properties(node)
570
            if props is None:
571
                break
572
            parent, path = props
573
            self.statistics_update(parent, population, size, mtime, cluster)
574
            node = parent
575
            population = 0  # Population isn't recursive
576

    
577
    def statistics_latest(self, node, before=inf, except_cluster=0):
578
        """Return population, total size and last mtime
579
           for all latest versions under node that
580
           do not belong to the cluster.
581
        """
582

    
583
        # The node.
584
        props = self.node_get_properties(node)
585
        if props is None:
586
            return None
587
        parent, path = props
588

    
589
        # The latest version.
590
        s = select([self.versions.c.serial,
591
                    self.versions.c.node,
592
                    self.versions.c.hash,
593
                    self.versions.c.size,
594
                    self.versions.c.type,
595
                    self.versions.c.source,
596
                    self.versions.c.mtime,
597
                    self.versions.c.muser,
598
                    self.versions.c.uuid,
599
                    self.versions.c.checksum,
600
                    self.versions.c.cluster])
601
        if before != inf:
602
            filtered = select([func.max(self.versions.c.serial)],
603
                              self.versions.c.node == node)
604
            filtered = filtered.where(self.versions.c.mtime < before)
605
        else:
606
            filtered = select([self.nodes.c.latest_version],
607
                              self.versions.c.node == node)
608
        s = s.where(and_(self.versions.c.cluster != except_cluster,
609
                         self.versions.c.serial == filtered))
610
        r = self.conn.execute(s)
611
        props = r.fetchone()
612
        r.close()
613
        if not props:
614
            return None
615
        mtime = props[MTIME]
616

    
617
        # First level, just under node (get population).
618
        v = self.versions.alias('v')
619
        s = select([func.count(v.c.serial),
620
                    func.sum(v.c.size),
621
                    func.max(v.c.mtime)])
622
        if before != inf:
623
            c1 = select([func.max(self.versions.c.serial)])
624
            c1 = c1.where(self.versions.c.mtime < before)
625
            c1.where(self.versions.c.node == v.c.node)
626
        else:
627
            c1 = select([self.nodes.c.latest_version])
628
            c1.where(self.nodes.c.node == v.c.node)
629
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
630
        s = s.where(and_(v.c.serial == c1,
631
                         v.c.cluster != except_cluster,
632
                         v.c.node.in_(c2)))
633
        rp = self.conn.execute(s)
634
        r = rp.fetchone()
635
        rp.close()
636
        if not r:
637
            return None
638
        count = r[0]
639
        mtime = max(mtime, r[2])
640
        if count == 0:
641
            return (0, 0, mtime)
642

    
643
        # All children (get size and mtime).
644
        # This is why the full path is stored.
645
        s = select([func.count(v.c.serial),
646
                    func.sum(v.c.size),
647
                    func.max(v.c.mtime)])
648
        if before != inf:
649
            c1 = select([func.max(self.versions.c.serial)],
650
                        self.versions.c.node == v.c.node)
651
            c1 = c1.where(self.versions.c.mtime < before)
652
        else:
653
            c1 = select([self.nodes.c.serial],
654
                        self.nodes.c.node == v.c.node)
655
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(
656
            self.escape_like(path) + '%', escape='\\'))
657
        s = s.where(and_(v.c.serial == c1,
658
                         v.c.cluster != except_cluster,
659
                         v.c.node.in_(c2)))
660
        rp = self.conn.execute(s)
661
        r = rp.fetchone()
662
        rp.close()
663
        if not r:
664
            return None
665
        size = r[1] - props[SIZE]
666
        mtime = max(mtime, r[2])
667
        return (count, size, mtime)
668

    
669
    def nodes_set_latest_version(self, node, serial):
670
        s = self.nodes.update().where(self.nodes.c.node == node)
671
        s = s.values(latest_version=serial)
672
        self.conn.execute(s).close()
673

    
674
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
675
        """Create a new version from the given properties.
676
           Return the (serial, mtime) of the new version.
677
        """
678

    
679
        mtime = time()
680
        s = self.versions.insert(
681
        ).values(node=node, hash=hash, size=size, type=type, source=source,
682
                 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
683
        serial = self.conn.execute(s).inserted_primary_key[0]
684
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
685

    
686
        self.nodes_set_latest_version(node, serial)
687

    
688
        return serial, mtime
689

    
690
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
691
        """Lookup the current version of the given node.
692
           Return a list with its properties:
693
           (serial, node, hash, size, type, source, mtime,
694
            muser, uuid, checksum, cluster)
695
           or None if the current version is not found in the given cluster.
696
        """
697

    
698
        v = self.versions.alias('v')
699
        if not all_props:
700
            s = select([v.c.serial])
701
        else:
702
            s = select([v.c.serial, v.c.node, v.c.hash,
703
                        v.c.size, v.c.type, v.c.source,
704
                        v.c.mtime, v.c.muser, v.c.uuid,
705
                        v.c.checksum, v.c.cluster])
706
        if before != inf:
707
            c = select([func.max(self.versions.c.serial)],
708
                       self.versions.c.node == node)
709
            c = c.where(self.versions.c.mtime < before)
710
        else:
711
            c = select([self.nodes.c.latest_version],
712
                       self.nodes.c.node == node)
713
        s = s.where(and_(v.c.serial == c,
714
                         v.c.cluster == cluster))
715
        r = self.conn.execute(s)
716
        props = r.fetchone()
717
        r.close()
718
        if props:
719
            return props
720
        return None
721

    
722
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
723
        """Lookup the current versions of the given nodes.
724
           Return a list with their properties:
725
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
726
        """
727
        if not nodes:
728
            return ()
729
        v = self.versions.alias('v')
730
        if not all_props:
731
            s = select([v.c.serial])
732
        else:
733
            s = select([v.c.serial, v.c.node, v.c.hash,
734
                        v.c.size, v.c.type, v.c.source,
735
                        v.c.mtime, v.c.muser, v.c.uuid,
736
                        v.c.checksum, v.c.cluster])
737
        if before != inf:
738
            c = select([func.max(self.versions.c.serial)],
739
                       self.versions.c.node.in_(nodes))
740
            c = c.where(self.versions.c.mtime < before)
741
            c = c.group_by(self.versions.c.node)
742
        else:
743
            c = select([self.nodes.c.latest_version],
744
                       self.nodes.c.node.in_(nodes))
745
        s = s.where(and_(v.c.serial.in_(c),
746
                         v.c.cluster == cluster))
747
        s = s.order_by(v.c.node)
748
        r = self.conn.execute(s)
749
        rproxy = r.fetchall()
750
        r.close()
751
        return (tuple(row.values()) for row in rproxy)
752

    
753
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
754
        """Return a sequence of values for the properties of
755
           the version specified by serial and the keys, in the order given.
756
           If keys is empty, return all properties in the order
757
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
758
        """
759

    
760
        v = self.versions.alias()
761
        s = select([v.c.serial, v.c.node, v.c.hash,
762
                    v.c.size, v.c.type, v.c.source,
763
                    v.c.mtime, v.c.muser, v.c.uuid,
764
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
765
        rp = self.conn.execute(s)
766
        r = rp.fetchone()
767
        rp.close()
768
        if r is None:
769
            return r
770

    
771
        if not keys:
772
            return r
773
        return [r[propnames[k]] for k in keys if k in propnames]
774

    
775
    def version_put_property(self, serial, key, value):
776
        """Set value for the property of version specified by key."""
777

    
778
        if key not in _propnames:
779
            return
780
        s = self.versions.update()
781
        s = s.where(self.versions.c.serial == serial)
782
        s = s.values(**{key: value})
783
        self.conn.execute(s).close()
784

    
785
    def version_recluster(self, serial, cluster):
786
        """Move the version into another cluster."""
787

    
788
        props = self.version_get_properties(serial)
789
        if not props:
790
            return
791
        node = props[NODE]
792
        size = props[SIZE]
793
        oldcluster = props[CLUSTER]
794
        if cluster == oldcluster:
795
            return
796

    
797
        mtime = time()
798
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
799
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
800

    
801
        s = self.versions.update()
802
        s = s.where(self.versions.c.serial == serial)
803
        s = s.values(cluster=cluster)
804
        self.conn.execute(s).close()
805

    
806
    def version_remove(self, serial):
807
        """Remove the serial specified."""
808

    
809
        props = self.version_get_properties(serial)
810
        if not props:
811
            return
812
        node = props[NODE]
813
        hash = props[HASH]
814
        size = props[SIZE]
815
        cluster = props[CLUSTER]
816

    
817
        mtime = time()
818
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
819

    
820
        s = self.versions.delete().where(self.versions.c.serial == serial)
821
        self.conn.execute(s).close()
822

    
823
        props = self.version_lookup(node, cluster=cluster, all_props=False)
824
        if props:
825
            self.nodes_set_latest_version(node, serial)
826

    
827
        return hash, size
828

    
829
    def attribute_get(self, serial, domain, keys=()):
830
        """Return a list of (key, value) pairs of the version specified by serial.
831
           If keys is empty, return all attributes.
832
           Othwerise, return only those specified.
833
        """
834

    
835
        if keys:
836
            attrs = self.attributes.alias()
837
            s = select([attrs.c.key, attrs.c.value])
838
            s = s.where(and_(attrs.c.key.in_(keys),
839
                             attrs.c.serial == serial,
840
                             attrs.c.domain == domain))
841
        else:
842
            attrs = self.attributes.alias()
843
            s = select([attrs.c.key, attrs.c.value])
844
            s = s.where(and_(attrs.c.serial == serial,
845
                             attrs.c.domain == domain))
846
        r = self.conn.execute(s)
847
        l = r.fetchall()
848
        r.close()
849
        return l
850

    
851
    def attribute_set(self, serial, domain, items):
852
        """Set the attributes of the version specified by serial.
853
           Receive attributes as an iterable of (key, value) pairs.
854
        """
855
        #insert or replace
856
        #TODO better upsert
857
        for k, v in items:
858
            s = self.attributes.update()
859
            s = s.where(and_(self.attributes.c.serial == serial,
860
                             self.attributes.c.domain == domain,
861
                             self.attributes.c.key == k))
862
            s = s.values(value=v)
863
            rp = self.conn.execute(s)
864
            rp.close()
865
            if rp.rowcount == 0:
866
                s = self.attributes.insert()
867
                s = s.values(serial=serial, domain=domain, key=k, value=v)
868
                self.conn.execute(s).close()
869

    
870
    def attribute_del(self, serial, domain, keys=()):
871
        """Delete attributes of the version specified by serial.
872
           If keys is empty, delete all attributes.
873
           Otherwise delete those specified.
874
        """
875

    
876
        if keys:
877
            #TODO more efficient way to do this?
878
            for key in keys:
879
                s = self.attributes.delete()
880
                s = s.where(and_(self.attributes.c.serial == serial,
881
                                 self.attributes.c.domain == domain,
882
                                 self.attributes.c.key == key))
883
                self.conn.execute(s).close()
884
        else:
885
            s = self.attributes.delete()
886
            s = s.where(and_(self.attributes.c.serial == serial,
887
                             self.attributes.c.domain == domain))
888
            self.conn.execute(s).close()
889

    
890
    def attribute_copy(self, source, dest):
891
        s = select(
892
            [dest, self.attributes.c.domain,
893
                self.attributes.c.key, self.attributes.c.value],
894
            self.attributes.c.serial == source)
895
        rp = self.conn.execute(s)
896
        attributes = rp.fetchall()
897
        rp.close()
898
        for dest, domain, k, v in attributes:
899
            #insert or replace
900
            s = self.attributes.update().where(and_(
901
                self.attributes.c.serial == dest,
902
                self.attributes.c.domain == domain,
903
                self.attributes.c.key == k))
904
            rp = self.conn.execute(s, value=v)
905
            rp.close()
906
            if rp.rowcount == 0:
907
                s = self.attributes.insert()
908
                values = {'serial': dest, 'domain': domain,
909
                          'key': k, 'value': v}
910
                self.conn.execute(s, values).close()
911

    
912
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=None):
913
        """Return a list with all keys pairs defined
914
           for all latest versions under parent that
915
           do not belong to the cluster.
916
        """
917

    
918
        pathq = pathq or []
919

    
920
        # TODO: Use another table to store before=inf results.
921
        a = self.attributes.alias('a')
922
        v = self.versions.alias('v')
923
        n = self.nodes.alias('n')
924
        s = select([a.c.key]).distinct()
925
        if before != inf:
926
            filtered = select([func.max(self.versions.c.serial)])
927
            filtered = filtered.where(self.versions.c.mtime < before)
928
            filtered = filtered.where(self.versions.c.node == v.c.node)
929
        else:
930
            filtered = select([self.nodes.c.latest_version])
931
            filtered = filtered.where(self.nodes.c.node == v.c.node)
932
        s = s.where(v.c.serial == filtered)
933
        s = s.where(v.c.cluster != except_cluster)
934
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
935
                                        self.nodes.c.parent == parent)))
936
        s = s.where(a.c.serial == v.c.serial)
937
        s = s.where(a.c.domain == domain)
938
        s = s.where(n.c.node == v.c.node)
939
        conj = []
940
        for path, match in pathq:
941
            if match == MATCH_PREFIX:
942
                conj.append(
943
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
944
            elif match == MATCH_EXACT:
945
                conj.append(n.c.path == path)
946
        if conj:
947
            s = s.where(or_(*conj))
948
        rp = self.conn.execute(s)
949
        rows = rp.fetchall()
950
        rp.close()
951
        return [r[0] for r in rows]
952

    
953
    def latest_version_list(self, parent, prefix='', delimiter=None,
954
                            start='', limit=10000, before=inf,
955
                            except_cluster=0, pathq=[], domain=None,
956
                            filterq=[], sizeq=None, all_props=False):
957
        """Return a (list of (path, serial) tuples, list of common prefixes)
958
           for the current versions of the paths with the given parent,
959
           matching the following criteria.
960

961
           The property tuple for a version is returned if all
962
           of these conditions are true:
963

964
                a. parent matches
965

966
                b. path > start
967

968
                c. path starts with prefix (and paths in pathq)
969

970
                d. version is the max up to before
971

972
                e. version is not in cluster
973

974
                f. the path does not have the delimiter occuring
975
                   after the prefix, or ends with the delimiter
976

977
                g. serial matches the attribute filter query.
978

979
                   A filter query is a comma-separated list of
980
                   terms in one of these three forms:
981

982
                   key
983
                       an attribute with this key must exist
984

985
                   !key
986
                       an attribute with this key must not exist
987

988
                   key ?op value
989
                       the attribute with this key satisfies the value
990
                       where ?op is one of ==, != <=, >=, <, >.
991

992
                h. the size is in the range set by sizeq
993

994
           The list of common prefixes includes the prefixes
995
           matching up to the first delimiter after prefix,
996
           and are reported only once, as "virtual directories".
997
           The delimiter is included in the prefixes.
998

999
           If arguments are None, then the corresponding matching rule
1000
           will always match.
1001

1002
           Limit applies to the first list of tuples returned.
1003

1004
           If all_props is True, return all properties after path, not just serial.
1005
        """
1006

    
1007
        if not start or start < prefix:
1008
            start = strprevling(prefix)
1009
        nextling = strnextling(prefix)
1010

    
1011
        v = self.versions.alias('v')
1012
        n = self.nodes.alias('n')
1013
        if not all_props:
1014
            s = select([n.c.path, v.c.serial]).distinct()
1015
        else:
1016
            s = select([n.c.path,
1017
                        v.c.serial, v.c.node, v.c.hash,
1018
                        v.c.size, v.c.type, v.c.source,
1019
                        v.c.mtime, v.c.muser, v.c.uuid,
1020
                        v.c.checksum, v.c.cluster]).distinct()
1021
        if before != inf:
1022
            filtered = select([func.max(self.versions.c.serial)])
1023
            filtered = filtered.where(self.versions.c.mtime < before)
1024
        else:
1025
            filtered = select([self.nodes.c.latest_version])
1026
        s = s.where(
1027
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
1028
        s = s.where(v.c.cluster != except_cluster)
1029
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
1030
                                        self.nodes.c.parent == parent)))
1031

    
1032
        s = s.where(n.c.node == v.c.node)
1033
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
1034
        conj = []
1035
        for path, match in pathq:
1036
            if match == MATCH_PREFIX:
1037
                conj.append(
1038
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
1039
            elif match == MATCH_EXACT:
1040
                conj.append(n.c.path == path)
1041
        if conj:
1042
            s = s.where(or_(*conj))
1043

    
1044
        if sizeq and len(sizeq) == 2:
1045
            if sizeq[0]:
1046
                s = s.where(v.c.size >= sizeq[0])
1047
            if sizeq[1]:
1048
                s = s.where(v.c.size < sizeq[1])
1049

    
1050
        if domain and filterq:
1051
            a = self.attributes.alias('a')
1052
            included, excluded, opers = parse_filters(filterq)
1053
            if included:
1054
                subs = select([1])
1055
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1056
                subs = subs.where(a.c.domain == domain)
1057
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1058
                s = s.where(exists(subs))
1059
            if excluded:
1060
                subs = select([1])
1061
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1062
                subs = subs.where(a.c.domain == domain)
1063
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1064
                s = s.where(not_(exists(subs)))
1065
            if opers:
1066
                for k, o, val in opers:
1067
                    subs = select([1])
1068
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1069
                    subs = subs.where(a.c.domain == domain)
1070
                    subs = subs.where(
1071
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1072
                    s = s.where(exists(subs))
1073

    
1074
        s = s.order_by(n.c.path)
1075

    
1076
        if not delimiter:
1077
            s = s.limit(limit)
1078
            rp = self.conn.execute(s, start=start)
1079
            r = rp.fetchall()
1080
            rp.close()
1081
            return r, ()
1082

    
1083
        pfz = len(prefix)
1084
        dz = len(delimiter)
1085
        count = 0
1086
        prefixes = []
1087
        pappend = prefixes.append
1088
        matches = []
1089
        mappend = matches.append
1090

    
1091
        rp = self.conn.execute(s, start=start)
1092
        while True:
1093
            props = rp.fetchone()
1094
            if props is None:
1095
                break
1096
            path = props[0]
1097
            serial = props[1]
1098
            idx = path.find(delimiter, pfz)
1099

    
1100
            if idx < 0:
1101
                mappend(props)
1102
                count += 1
1103
                if count >= limit:
1104
                    break
1105
                continue
1106

    
1107
            if idx + dz == len(path):
1108
                mappend(props)
1109
                count += 1
1110
                continue  # Get one more, in case there is a path.
1111
            pf = path[:idx + dz]
1112
            pappend(pf)
1113
            if count >= limit:
1114
                break
1115

    
1116
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1117
        rp.close()
1118

    
1119
        return matches, prefixes
1120

    
1121
    def latest_uuid(self, uuid, cluster):
1122
        """Return the latest version of the given uuid and cluster.
1123

1124
        Return a (path, serial) tuple.
1125
        If cluster is None, all clusters are considered.
1126

1127
        """
1128

    
1129
        v = self.versions.alias('v')
1130
        n = self.nodes.alias('n')
1131
        s = select([n.c.path, v.c.serial])
1132
        filtered = select([func.max(self.versions.c.serial)])
1133
        filtered = filtered.where(self.versions.c.uuid == uuid)
1134
        if cluster is not None:
1135
            filtered = filtered.where(self.versions.c.cluster == cluster)
1136
        s = s.where(v.c.serial == filtered)
1137
        s = s.where(n.c.node == v.c.node)
1138

    
1139
        r = self.conn.execute(s)
1140
        l = r.fetchone()
1141
        r.close()
1142
        return l
1143

    
1144
    def domain_object_list(self, domain, cluster=None):
1145
        """Return a list of (path, property list, attribute dictionary)
1146
           for the objects in the specific domain and cluster.
1147
        """
1148

    
1149
        v = self.versions.alias('v')
1150
        n = self.nodes.alias('n')
1151
        a = self.attributes.alias('a')
1152

    
1153
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1154
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1155
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1156
        s = s.where(n.c.node == v.c.node)
1157
        s = s.where(n.c.latest_version == v.c.serial)
1158
        if cluster:
1159
            s = s.where(v.c.cluster == cluster)
1160
        s = s.where(v.c.serial == a.c.serial)
1161
        s = s.where(a.c.domain == domain)
1162

    
1163
        r = self.conn.execute(s)
1164
        rows = r.fetchall()
1165
        r.close()
1166

    
1167
        group_by = itemgetter(slice(12))
1168
        rows.sort(key = group_by)
1169
        groups = groupby(rows, group_by)
1170
        return [(k[0], k[1:], dict([i[12:] for i in data])) \
1171
            for (k, data) in groups]