Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 5576e6dd

History | View | Annotate | Download (42 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
39
from sqlalchemy.types import Text
40
from sqlalchemy.schema import Index, Sequence
41
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
42
from sqlalchemy.ext.compiler import compiles
43
from sqlalchemy.engine.reflection import Inspector
44
from sqlalchemy.exc import NoSuchTableError
45

    
46
from dbworker import DBWorker
47

    
48
from pithos.backends.filter import parse_filters
49

    
50

    
51
ROOTNODE = 0
52

    
53
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
54
 CLUSTER) = range(11)
55

    
56
(MATCH_PREFIX, MATCH_EXACT) = range(2)
57

    
58
inf = float('inf')
59

    
60

    
61
def strnextling(prefix):
62
    """Return the first unicode string
63
       greater than but not starting with given prefix.
64
       strnextling('hello') -> 'hellp'
65
    """
66
    if not prefix:
67
        ## all strings start with the null string,
68
        ## therefore we have to approximate strnextling('')
69
        ## with the last unicode character supported by python
70
        ## 0x10ffff for wide (32-bit unicode) python builds
71
        ## 0x00ffff for narrow (16-bit unicode) python builds
72
        ## We will not autodetect. 0xffff is safe enough.
73
        return unichr(0xffff)
74
    s = prefix[:-1]
75
    c = ord(prefix[-1])
76
    if c >= 0xffff:
77
        raise RuntimeError
78
    s += unichr(c + 1)
79
    return s
80

    
81

    
82
def strprevling(prefix):
83
    """Return an approximation of the last unicode string
84
       less than but not starting with given prefix.
85
       strprevling(u'hello') -> u'helln\\xffff'
86
    """
87
    if not prefix:
88
        ## There is no prevling for the null string
89
        return prefix
90
    s = prefix[:-1]
91
    c = ord(prefix[-1])
92
    if c > 0:
93
        s += unichr(c - 1) + unichr(0xffff)
94
    return s
95

    
96
_propnames = {
97
    'serial': 0,
98
    'node': 1,
99
    'hash': 2,
100
    'size': 3,
101
    'type': 4,
102
    'source': 5,
103
    'mtime': 6,
104
    'muser': 7,
105
    'uuid': 8,
106
    'checksum': 9,
107
    'cluster': 10
108
}
109

    
110

    
111
def create_tables(engine):
112
    metadata = MetaData()
113

    
114
    #create nodes table
115
    columns = []
116
    columns.append(Column('node', Integer, primary_key=True))
117
    columns.append(Column('parent', Integer,
118
                          ForeignKey('nodes.node',
119
                                     ondelete='CASCADE',
120
                                     onupdate='CASCADE'),
121
                          autoincrement=False))
122
    columns.append(Column('latest_version', Integer))
123
    columns.append(Column('path', String(2048), default='', nullable=False))
124
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
125
    Index('idx_nodes_path', nodes.c.path, unique=True)
126
    Index('idx_nodes_parent', nodes.c.parent)
127

    
128
    #create policy table
129
    columns = []
130
    columns.append(Column('node', Integer,
131
                          ForeignKey('nodes.node',
132
                                     ondelete='CASCADE',
133
                                     onupdate='CASCADE'),
134
                          primary_key=True))
135
    columns.append(Column('key', String(128), primary_key=True))
136
    columns.append(Column('value', String(256)))
137
    policy = Table('policy', metadata, *columns, mysql_engine='InnoDB')
138

    
139
    #create statistics table
140
    columns = []
141
    columns.append(Column('node', Integer,
142
                          ForeignKey('nodes.node',
143
                                     ondelete='CASCADE',
144
                                     onupdate='CASCADE'),
145
                          primary_key=True))
146
    columns.append(Column('population', Integer, nullable=False, default=0))
147
    columns.append(Column('size', BigInteger, nullable=False, default=0))
148
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
149
    columns.append(Column('cluster', Integer, nullable=False, default=0,
150
                          primary_key=True, autoincrement=False))
151
    statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
152

    
153
    #create versions table
154
    columns = []
155
    columns.append(Column('serial', Integer, primary_key=True))
156
    columns.append(Column('node', Integer,
157
                          ForeignKey('nodes.node',
158
                                     ondelete='CASCADE',
159
                                     onupdate='CASCADE')))
160
    columns.append(Column('hash', String(256)))
161
    columns.append(Column('size', BigInteger, nullable=False, default=0))
162
    columns.append(Column('type', String(256), nullable=False, default=''))
163
    columns.append(Column('source', Integer))
164
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
165
    columns.append(Column('muser', String(256), nullable=False, default=''))
166
    columns.append(Column('uuid', String(64), nullable=False, default=''))
167
    columns.append(Column('checksum', String(256), nullable=False, default=''))
168
    columns.append(Column('cluster', Integer, nullable=False, default=0))
169
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
170
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
171
    Index('idx_versions_node_uuid', versions.c.uuid)
172

    
173
    #create attributes table
174
    columns = []
175
    columns.append(Column('serial', Integer,
176
                          ForeignKey('versions.serial',
177
                                     ondelete='CASCADE',
178
                                     onupdate='CASCADE'),
179
                          primary_key=True))
180
    columns.append(Column('domain', String(256), primary_key=True))
181
    columns.append(Column('key', String(128), primary_key=True))
182
    columns.append(Column('value', String(256)))
183
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
184
    Index('idx_attributes_domain', attributes.c.domain)
185

    
186
    metadata.create_all(engine)
187
    return metadata.sorted_tables
188

    
189

    
190
class Node(DBWorker):
191
    """Nodes store path organization and have multiple versions.
192
       Versions store object history and have multiple attributes.
193
       Attributes store metadata.
194
    """
195

    
196
    # TODO: Provide an interface for included and excluded clusters.
197

    
198
    def __init__(self, **params):
199
        DBWorker.__init__(self, **params)
200
        try:
201
            metadata = MetaData(self.engine)
202
            self.nodes = Table('nodes', metadata, autoload=True)
203
            self.policy = Table('policy', metadata, autoload=True)
204
            self.statistics = Table('statistics', metadata, autoload=True)
205
            self.versions = Table('versions', metadata, autoload=True)
206
            self.attributes = Table('attributes', metadata, autoload=True)
207
        except NoSuchTableError:
208
            tables = create_tables(self.engine)
209
            map(lambda t: self.__setattr__(t.name, t), tables)
210

    
211
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
212
                                           self.nodes.c.parent == ROOTNODE))
213
        wrapper = self.wrapper
214
        wrapper.execute()
215
        try:
216
            rp = self.conn.execute(s)
217
            r = rp.fetchone()
218
            rp.close()
219
            if not r:
220
                s = self.nodes.insert(
221
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
222
                self.conn.execute(s)
223
        finally:
224
            wrapper.commit()
225

    
226
    def node_create(self, parent, path):
227
        """Create a new node from the given properties.
228
           Return the node identifier of the new node.
229
        """
230
        #TODO catch IntegrityError?
231
        s = self.nodes.insert().values(parent=parent, path=path)
232
        r = self.conn.execute(s)
233
        inserted_primary_key = r.inserted_primary_key[0]
234
        r.close()
235
        return inserted_primary_key
236

    
237
    def node_lookup(self, path):
238
        """Lookup the current node of the given path.
239
           Return None if the path is not found.
240
        """
241

    
242
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
243
        s = select([self.nodes.c.node], self.nodes.c.path.like(
244
            self.escape_like(path), escape='\\'))
245
        r = self.conn.execute(s)
246
        row = r.fetchone()
247
        r.close()
248
        if row:
249
            return row[0]
250
        return None
251

    
252
    def node_lookup_bulk(self, paths):
253
        """Lookup the current nodes for the given paths.
254
           Return () if the path is not found.
255
        """
256

    
257
        if not paths:
258
            return ()
259
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
260
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
261
        r = self.conn.execute(s)
262
        rows = r.fetchall()
263
        r.close()
264
        return [row[0] for row in rows]
265

    
266
    def node_get_properties(self, node):
267
        """Return the node's (parent, path).
268
           Return None if the node is not found.
269
        """
270

    
271
        s = select([self.nodes.c.parent, self.nodes.c.path])
272
        s = s.where(self.nodes.c.node == node)
273
        r = self.conn.execute(s)
274
        l = r.fetchone()
275
        r.close()
276
        return l
277

    
278
    def node_get_versions(self, node, keys=(), propnames=_propnames):
279
        """Return the properties of all versions at node.
280
           If keys is empty, return all properties in the order
281
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
282
        """
283

    
284
        s = select([self.versions.c.serial,
285
                    self.versions.c.node,
286
                    self.versions.c.hash,
287
                    self.versions.c.size,
288
                    self.versions.c.type,
289
                    self.versions.c.source,
290
                    self.versions.c.mtime,
291
                    self.versions.c.muser,
292
                    self.versions.c.uuid,
293
                    self.versions.c.checksum,
294
                    self.versions.c.cluster], self.versions.c.node == node)
295
        s = s.order_by(self.versions.c.serial)
296
        r = self.conn.execute(s)
297
        rows = r.fetchall()
298
        r.close()
299
        if not rows:
300
            return rows
301

    
302
        if not keys:
303
            return rows
304

    
305
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
306

    
307
    def node_count_children(self, node):
308
        """Return node's child count."""
309

    
310
        s = select([func.count(self.nodes.c.node)])
311
        s = s.where(and_(self.nodes.c.parent == node,
312
                         self.nodes.c.node != ROOTNODE))
313
        r = self.conn.execute(s)
314
        row = r.fetchone()
315
        r.close()
316
        return row[0]
317

    
318
    def node_purge_children(self, parent, before=inf, cluster=0):
319
        """Delete all versions with the specified
320
           parent and cluster, and return
321
           the hashes, the total size and the serials of versions deleted.
322
           Clears out nodes with no remaining versions.
323
        """
324
        #update statistics
325
        c1 = select([self.nodes.c.node],
326
                    self.nodes.c.parent == parent)
327
        where_clause = and_(self.versions.c.node.in_(c1),
328
                            self.versions.c.cluster == cluster)
329
        if before != inf:
330
            where_clause = and_(where_clause,
331
                                self.versions.c.mtime <= before)
332
        s = select([func.count(self.versions.c.serial),
333
                    func.sum(self.versions.c.size)])
334
        s = s.where(where_clause)
335
        r = self.conn.execute(s)
336
        row = r.fetchone()
337
        r.close()
338
        if not row:
339
            return (), 0, ()
340
        nr, size = row[0], row[1] if row[1] else 0
341
        mtime = time()
342
        self.statistics_update(parent, -nr, -size, mtime, cluster)
343
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
344

    
345
        s = select([self.versions.c.hash, self.versions.c.serial])
346
        s = s.where(where_clause)
347
        r = self.conn.execute(s)
348
        hashes = []
349
        serials = []
350
        for row in r.fetchall():
351
            hashes += [row[0]]
352
            serials += [row[1]]
353
        r.close()
354

    
355
        #delete versions
356
        s = self.versions.delete().where(where_clause)
357
        r = self.conn.execute(s)
358
        r.close()
359

    
360
        #delete nodes
361
        s = select([self.nodes.c.node],
362
                   and_(self.nodes.c.parent == parent,
363
                        select([func.count(self.versions.c.serial)],
364
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
365
        rp = self.conn.execute(s)
366
        nodes = [r[0] for r in rp.fetchall()]
367
        rp.close()
368
        if nodes:
369
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
370
            self.conn.execute(s).close()
371

    
372
        return hashes, size, serials
373

    
374
    def node_purge(self, node, before=inf, cluster=0):
375
        """Delete all versions with the specified
376
           node and cluster, and return
377
           the hashes and size of versions deleted.
378
           Clears out the node if it has no remaining versions.
379
        """
380

    
381
        #update statistics
382
        s = select([func.count(self.versions.c.serial),
383
                    func.sum(self.versions.c.size)])
384
        where_clause = and_(self.versions.c.node == node,
385
                            self.versions.c.cluster == cluster)
386
        if before != inf:
387
            where_clause = and_(where_clause,
388
                                self.versions.c.mtime <= before)
389
        s = s.where(where_clause)
390
        r = self.conn.execute(s)
391
        row = r.fetchone()
392
        nr, size = row[0], row[1]
393
        r.close()
394
        if not nr:
395
            return (), 0, ()
396
        mtime = time()
397
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
398

    
399
        s = select([self.versions.c.hash, self.versions.c.serial])
400
        s = s.where(where_clause)
401
        r = self.conn.execute(s)
402
        hashes = []
403
        serials = []
404
        for row in r.fetchall():
405
            hashes += [row[0]]
406
            serials += [row[1]]
407
        r.close()
408

    
409
        #delete versions
410
        s = self.versions.delete().where(where_clause)
411
        r = self.conn.execute(s)
412
        r.close()
413

    
414
        #delete nodes
415
        s = select([self.nodes.c.node],
416
                   and_(self.nodes.c.node == node,
417
                        select([func.count(self.versions.c.serial)],
418
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
419
        rp= self.conn.execute(s)
420
        nodes = [r[0] for r in rp.fetchall()]
421
        rp.close()
422
        if nodes:
423
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
424
            self.conn.execute(s).close()
425

    
426
        return hashes, size, serials
427

    
428
    def node_remove(self, node):
429
        """Remove the node specified.
430
           Return false if the node has children or is not found.
431
        """
432

    
433
        if self.node_count_children(node):
434
            return False
435

    
436
        mtime = time()
437
        s = select([func.count(self.versions.c.serial),
438
                    func.sum(self.versions.c.size),
439
                    self.versions.c.cluster])
440
        s = s.where(self.versions.c.node == node)
441
        s = s.group_by(self.versions.c.cluster)
442
        r = self.conn.execute(s)
443
        for population, size, cluster in r.fetchall():
444
            self.statistics_update_ancestors(
445
                node, -population, -size, mtime, cluster)
446
        r.close()
447

    
448
        s = self.nodes.delete().where(self.nodes.c.node == node)
449
        self.conn.execute(s).close()
450
        return True
451

    
452
    def node_accounts(self):
453
        s = select([self.nodes.c.path])
454
        s = s.where(and_(self.nodes.c.node != 0, self.nodes.c.parent == 0))
455
        account_nodes = self.conn.execute(s).fetchall()
456
        return sorted(i[0] for i in account_nodes)
457

    
458
    def policy_get(self, node):
459
        s = select([self.policy.c.key, self.policy.c.value],
460
                   self.policy.c.node == node)
461
        r = self.conn.execute(s)
462
        d = dict(r.fetchall())
463
        r.close()
464
        return d
465

    
466
    def policy_set(self, node, policy):
467
        #insert or replace
468
        for k, v in policy.iteritems():
469
            s = self.policy.update().where(and_(self.policy.c.node == node,
470
                                                self.policy.c.key == k))
471
            s = s.values(value=v)
472
            rp = self.conn.execute(s)
473
            rp.close()
474
            if rp.rowcount == 0:
475
                s = self.policy.insert()
476
                values = {'node': node, 'key': k, 'value': v}
477
                r = self.conn.execute(s, values)
478
                r.close()
479

    
480
    def statistics_get(self, node, cluster=0):
481
        """Return population, total size and last mtime
482
           for all versions under node that belong to the cluster.
483
        """
484

    
485
        s = select([self.statistics.c.population,
486
                    self.statistics.c.size,
487
                    self.statistics.c.mtime])
488
        s = s.where(and_(self.statistics.c.node == node,
489
                         self.statistics.c.cluster == cluster))
490
        r = self.conn.execute(s)
491
        row = r.fetchone()
492
        r.close()
493
        return row
494

    
495
    def statistics_update(self, node, population, size, mtime, cluster=0):
496
        """Update the statistics of the given node.
497
           Statistics keep track the population, total
498
           size of objects and mtime in the node's namespace.
499
           May be zero or positive or negative numbers.
500
        """
501
        s = select([self.statistics.c.population, self.statistics.c.size],
502
                   and_(self.statistics.c.node == node,
503
                        self.statistics.c.cluster == cluster))
504
        rp = self.conn.execute(s)
505
        r = rp.fetchone()
506
        rp.close()
507
        if not r:
508
            prepopulation, presize = (0, 0)
509
        else:
510
            prepopulation, presize = r
511
        population += prepopulation
512
        population = max(population, 0)
513
        size += presize
514

    
515
        #insert or replace
516
        #TODO better upsert
517
        u = self.statistics.update().where(and_(self.statistics.c.node == node,
518
                                           self.statistics.c.cluster == cluster))
519
        u = u.values(population=population, size=size, mtime=mtime)
520
        rp = self.conn.execute(u)
521
        rp.close()
522
        if rp.rowcount == 0:
523
            ins = self.statistics.insert()
524
            ins = ins.values(node=node, population=population, size=size,
525
                             mtime=mtime, cluster=cluster)
526
            self.conn.execute(ins).close()
527

    
528
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
529
        """Update the statistics of the given node's parent.
530
           Then recursively update all parents up to the root.
531
           Population is not recursive.
532
        """
533

    
534
        while True:
535
            if node == ROOTNODE:
536
                break
537
            props = self.node_get_properties(node)
538
            if props is None:
539
                break
540
            parent, path = props
541
            self.statistics_update(parent, population, size, mtime, cluster)
542
            node = parent
543
            population = 0  # Population isn't recursive
544

    
545
    def statistics_latest(self, node, before=inf, except_cluster=0):
546
        """Return population, total size and last mtime
547
           for all latest versions under node that
548
           do not belong to the cluster.
549
        """
550

    
551
        # The node.
552
        props = self.node_get_properties(node)
553
        if props is None:
554
            return None
555
        parent, path = props
556

    
557
        # The latest version.
558
        s = select([self.versions.c.serial,
559
                    self.versions.c.node,
560
                    self.versions.c.hash,
561
                    self.versions.c.size,
562
                    self.versions.c.type,
563
                    self.versions.c.source,
564
                    self.versions.c.mtime,
565
                    self.versions.c.muser,
566
                    self.versions.c.uuid,
567
                    self.versions.c.checksum,
568
                    self.versions.c.cluster])
569
        if before != inf:
570
            filtered = select([func.max(self.versions.c.serial)],
571
                              self.versions.c.node == node)
572
            filtered = filtered.where(self.versions.c.mtime < before)
573
        else:
574
            filtered = select([self.nodes.c.latest_version],
575
                              self.versions.c.node == node)
576
        s = s.where(and_(self.versions.c.cluster != except_cluster,
577
                         self.versions.c.serial == filtered))
578
        r = self.conn.execute(s)
579
        props = r.fetchone()
580
        r.close()
581
        if not props:
582
            return None
583
        mtime = props[MTIME]
584

    
585
        # First level, just under node (get population).
586
        v = self.versions.alias('v')
587
        s = select([func.count(v.c.serial),
588
                    func.sum(v.c.size),
589
                    func.max(v.c.mtime)])
590
        if before != inf:
591
            c1 = select([func.max(self.versions.c.serial)])
592
            c1 = c1.where(self.versions.c.mtime < before)
593
            c1.where(self.versions.c.node == v.c.node)
594
        else:
595
            c1 = select([self.nodes.c.latest_version])
596
            c1.where(self.nodes.c.node == v.c.node)
597
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
598
        s = s.where(and_(v.c.serial == c1,
599
                         v.c.cluster != except_cluster,
600
                         v.c.node.in_(c2)))
601
        rp = self.conn.execute(s)
602
        r = rp.fetchone()
603
        rp.close()
604
        if not r:
605
            return None
606
        count = r[0]
607
        mtime = max(mtime, r[2])
608
        if count == 0:
609
            return (0, 0, mtime)
610

    
611
        # All children (get size and mtime).
612
        # This is why the full path is stored.
613
        s = select([func.count(v.c.serial),
614
                    func.sum(v.c.size),
615
                    func.max(v.c.mtime)])
616
        if before != inf:
617
            c1 = select([func.max(self.versions.c.serial)],
618
                        self.versions.c.node == v.c.node)
619
            c1 = c1.where(self.versions.c.mtime < before)
620
        else:
621
            c1 = select([self.nodes.c.serial],
622
                        self.nodes.c.node == v.c.node)
623
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(
624
            self.escape_like(path) + '%', escape='\\'))
625
        s = s.where(and_(v.c.serial == c1,
626
                         v.c.cluster != except_cluster,
627
                         v.c.node.in_(c2)))
628
        rp = self.conn.execute(s)
629
        r = rp.fetchone()
630
        rp.close()
631
        if not r:
632
            return None
633
        size = r[1] - props[SIZE]
634
        mtime = max(mtime, r[2])
635
        return (count, size, mtime)
636

    
637
    def nodes_set_latest_version(self, node, serial):
638
        s = self.nodes.update().where(self.nodes.c.node == node)
639
        s = s.values(latest_version=serial)
640
        self.conn.execute(s).close()
641

    
642
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
643
        """Create a new version from the given properties.
644
           Return the (serial, mtime) of the new version.
645
        """
646

    
647
        mtime = time()
648
        s = self.versions.insert(
649
        ).values(node=node, hash=hash, size=size, type=type, source=source,
650
                 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
651
        serial = self.conn.execute(s).inserted_primary_key[0]
652
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
653

    
654
        self.nodes_set_latest_version(node, serial)
655

    
656
        return serial, mtime
657

    
658
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
659
        """Lookup the current version of the given node.
660
           Return a list with its properties:
661
           (serial, node, hash, size, type, source, mtime,
662
            muser, uuid, checksum, cluster)
663
           or None if the current version is not found in the given cluster.
664
        """
665

    
666
        v = self.versions.alias('v')
667
        if not all_props:
668
            s = select([v.c.serial])
669
        else:
670
            s = select([v.c.serial, v.c.node, v.c.hash,
671
                        v.c.size, v.c.type, v.c.source,
672
                        v.c.mtime, v.c.muser, v.c.uuid,
673
                        v.c.checksum, v.c.cluster])
674
        if before != inf:
675
            c = select([func.max(self.versions.c.serial)],
676
                       self.versions.c.node == node)
677
            c = c.where(self.versions.c.mtime < before)
678
        else:
679
            c = select([self.nodes.c.latest_version],
680
                       self.nodes.c.node == node)
681
        s = s.where(and_(v.c.serial == c,
682
                         v.c.cluster == cluster))
683
        r = self.conn.execute(s)
684
        props = r.fetchone()
685
        r.close()
686
        if props:
687
            return props
688
        return None
689

    
690
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
691
        """Lookup the current versions of the given nodes.
692
           Return a list with their properties:
693
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
694
        """
695
        if not nodes:
696
            return ()
697
        v = self.versions.alias('v')
698
        if not all_props:
699
            s = select([v.c.serial])
700
        else:
701
            s = select([v.c.serial, v.c.node, v.c.hash,
702
                        v.c.size, v.c.type, v.c.source,
703
                        v.c.mtime, v.c.muser, v.c.uuid,
704
                        v.c.checksum, v.c.cluster])
705
        if before != inf:
706
            c = select([func.max(self.versions.c.serial)],
707
                       self.versions.c.node.in_(nodes))
708
            c = c.where(self.versions.c.mtime < before)
709
            c = c.group_by(self.versions.c.node)
710
        else:
711
            c = select([self.nodes.c.latest_version],
712
                       self.nodes.c.node.in_(nodes))
713
        s = s.where(and_(v.c.serial.in_(c),
714
                         v.c.cluster == cluster))
715
        s = s.order_by(v.c.node)
716
        r = self.conn.execute(s)
717
        rproxy = r.fetchall()
718
        r.close()
719
        return (tuple(row.values()) for row in rproxy)
720

    
721
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
722
        """Return a sequence of values for the properties of
723
           the version specified by serial and the keys, in the order given.
724
           If keys is empty, return all properties in the order
725
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
726
        """
727

    
728
        v = self.versions.alias()
729
        s = select([v.c.serial, v.c.node, v.c.hash,
730
                    v.c.size, v.c.type, v.c.source,
731
                    v.c.mtime, v.c.muser, v.c.uuid,
732
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
733
        rp = self.conn.execute(s)
734
        r = rp.fetchone()
735
        rp.close()
736
        if r is None:
737
            return r
738

    
739
        if not keys:
740
            return r
741
        return [r[propnames[k]] for k in keys if k in propnames]
742

    
743
    def version_put_property(self, serial, key, value):
744
        """Set value for the property of version specified by key."""
745

    
746
        if key not in _propnames:
747
            return
748
        s = self.versions.update()
749
        s = s.where(self.versions.c.serial == serial)
750
        s = s.values(**{key: value})
751
        self.conn.execute(s).close()
752

    
753
    def version_recluster(self, serial, cluster):
754
        """Move the version into another cluster."""
755

    
756
        props = self.version_get_properties(serial)
757
        if not props:
758
            return
759
        node = props[NODE]
760
        size = props[SIZE]
761
        oldcluster = props[CLUSTER]
762
        if cluster == oldcluster:
763
            return
764

    
765
        mtime = time()
766
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
767
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
768

    
769
        s = self.versions.update()
770
        s = s.where(self.versions.c.serial == serial)
771
        s = s.values(cluster=cluster)
772
        self.conn.execute(s).close()
773

    
774
    def version_remove(self, serial):
775
        """Remove the serial specified."""
776

    
777
        props = self.version_get_properties(serial)
778
        if not props:
779
            return
780
        node = props[NODE]
781
        hash = props[HASH]
782
        size = props[SIZE]
783
        cluster = props[CLUSTER]
784

    
785
        mtime = time()
786
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
787

    
788
        s = self.versions.delete().where(self.versions.c.serial == serial)
789
        self.conn.execute(s).close()
790

    
791
        props = self.version_lookup(node, cluster=cluster, all_props=False)
792
        if props:
793
            self.nodes_set_latest_version(node, serial)
794

    
795
        return hash, size
796

    
797
    def attribute_get(self, serial, domain, keys=()):
798
        """Return a list of (key, value) pairs of the version specified by serial.
799
           If keys is empty, return all attributes.
800
           Othwerise, return only those specified.
801
        """
802

    
803
        if keys:
804
            attrs = self.attributes.alias()
805
            s = select([attrs.c.key, attrs.c.value])
806
            s = s.where(and_(attrs.c.key.in_(keys),
807
                             attrs.c.serial == serial,
808
                             attrs.c.domain == domain))
809
        else:
810
            attrs = self.attributes.alias()
811
            s = select([attrs.c.key, attrs.c.value])
812
            s = s.where(and_(attrs.c.serial == serial,
813
                             attrs.c.domain == domain))
814
        r = self.conn.execute(s)
815
        l = r.fetchall()
816
        r.close()
817
        return l
818

    
819
    def attribute_set(self, serial, domain, items):
820
        """Set the attributes of the version specified by serial.
821
           Receive attributes as an iterable of (key, value) pairs.
822
        """
823
        #insert or replace
824
        #TODO better upsert
825
        for k, v in items:
826
            s = self.attributes.update()
827
            s = s.where(and_(self.attributes.c.serial == serial,
828
                             self.attributes.c.domain == domain,
829
                             self.attributes.c.key == k))
830
            s = s.values(value=v)
831
            rp = self.conn.execute(s)
832
            rp.close()
833
            if rp.rowcount == 0:
834
                s = self.attributes.insert()
835
                s = s.values(serial=serial, domain=domain, key=k, value=v)
836
                self.conn.execute(s).close()
837

    
838
    def attribute_del(self, serial, domain, keys=()):
839
        """Delete attributes of the version specified by serial.
840
           If keys is empty, delete all attributes.
841
           Otherwise delete those specified.
842
        """
843

    
844
        if keys:
845
            #TODO more efficient way to do this?
846
            for key in keys:
847
                s = self.attributes.delete()
848
                s = s.where(and_(self.attributes.c.serial == serial,
849
                                 self.attributes.c.domain == domain,
850
                                 self.attributes.c.key == key))
851
                self.conn.execute(s).close()
852
        else:
853
            s = self.attributes.delete()
854
            s = s.where(and_(self.attributes.c.serial == serial,
855
                             self.attributes.c.domain == domain))
856
            self.conn.execute(s).close()
857

    
858
    def attribute_copy(self, source, dest):
859
        s = select(
860
            [dest, self.attributes.c.domain,
861
                self.attributes.c.key, self.attributes.c.value],
862
            self.attributes.c.serial == source)
863
        rp = self.conn.execute(s)
864
        attributes = rp.fetchall()
865
        rp.close()
866
        for dest, domain, k, v in attributes:
867
            #insert or replace
868
            s = self.attributes.update().where(and_(
869
                self.attributes.c.serial == dest,
870
                self.attributes.c.domain == domain,
871
                self.attributes.c.key == k))
872
            rp = self.conn.execute(s, value=v)
873
            rp.close()
874
            if rp.rowcount == 0:
875
                s = self.attributes.insert()
876
                values = {'serial': dest, 'domain': domain,
877
                          'key': k, 'value': v}
878
                self.conn.execute(s, values).close()
879

    
880
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=None):
881
        """Return a list with all keys pairs defined
882
           for all latest versions under parent that
883
           do not belong to the cluster.
884
        """
885

    
886
        pathq = pathq or []
887

    
888
        # TODO: Use another table to store before=inf results.
889
        a = self.attributes.alias('a')
890
        v = self.versions.alias('v')
891
        n = self.nodes.alias('n')
892
        s = select([a.c.key]).distinct()
893
        if before != inf:
894
            filtered = select([func.max(self.versions.c.serial)])
895
            filtered = filtered.where(self.versions.c.mtime < before)
896
            filtered = filtered.where(self.versions.c.node == v.c.node)
897
        else:
898
            filtered = select([self.nodes.c.latest_version])
899
            filtered = filtered.where(self.nodes.c.node == v.c.node)
900
        s = s.where(v.c.serial == filtered)
901
        s = s.where(v.c.cluster != except_cluster)
902
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
903
                                        self.nodes.c.parent == parent)))
904
        s = s.where(a.c.serial == v.c.serial)
905
        s = s.where(a.c.domain == domain)
906
        s = s.where(n.c.node == v.c.node)
907
        conj = []
908
        for path, match in pathq:
909
            if match == MATCH_PREFIX:
910
                conj.append(
911
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
912
            elif match == MATCH_EXACT:
913
                conj.append(n.c.path == path)
914
        if conj:
915
            s = s.where(or_(*conj))
916
        rp = self.conn.execute(s)
917
        rows = rp.fetchall()
918
        rp.close()
919
        return [r[0] for r in rows]
920

    
921
    def latest_version_list(self, parent, prefix='', delimiter=None,
922
                            start='', limit=10000, before=inf,
923
                            except_cluster=0, pathq=[], domain=None,
924
                            filterq=[], sizeq=None, all_props=False):
925
        """Return a (list of (path, serial) tuples, list of common prefixes)
926
           for the current versions of the paths with the given parent,
927
           matching the following criteria.
928

929
           The property tuple for a version is returned if all
930
           of these conditions are true:
931

932
                a. parent matches
933

934
                b. path > start
935

936
                c. path starts with prefix (and paths in pathq)
937

938
                d. version is the max up to before
939

940
                e. version is not in cluster
941

942
                f. the path does not have the delimiter occuring
943
                   after the prefix, or ends with the delimiter
944

945
                g. serial matches the attribute filter query.
946

947
                   A filter query is a comma-separated list of
948
                   terms in one of these three forms:
949

950
                   key
951
                       an attribute with this key must exist
952

953
                   !key
954
                       an attribute with this key must not exist
955

956
                   key ?op value
957
                       the attribute with this key satisfies the value
958
                       where ?op is one of ==, != <=, >=, <, >.
959

960
                h. the size is in the range set by sizeq
961

962
           The list of common prefixes includes the prefixes
963
           matching up to the first delimiter after prefix,
964
           and are reported only once, as "virtual directories".
965
           The delimiter is included in the prefixes.
966

967
           If arguments are None, then the corresponding matching rule
968
           will always match.
969

970
           Limit applies to the first list of tuples returned.
971

972
           If all_props is True, return all properties after path, not just serial.
973
        """
974

    
975
        if not start or start < prefix:
976
            start = strprevling(prefix)
977
        nextling = strnextling(prefix)
978

    
979
        v = self.versions.alias('v')
980
        n = self.nodes.alias('n')
981
        if not all_props:
982
            s = select([n.c.path, v.c.serial]).distinct()
983
        else:
984
            s = select([n.c.path,
985
                        v.c.serial, v.c.node, v.c.hash,
986
                        v.c.size, v.c.type, v.c.source,
987
                        v.c.mtime, v.c.muser, v.c.uuid,
988
                        v.c.checksum, v.c.cluster]).distinct()
989
        if before != inf:
990
            filtered = select([func.max(self.versions.c.serial)])
991
            filtered = filtered.where(self.versions.c.mtime < before)
992
        else:
993
            filtered = select([self.nodes.c.latest_version])
994
        s = s.where(
995
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
996
        s = s.where(v.c.cluster != except_cluster)
997
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
998
                                        self.nodes.c.parent == parent)))
999

    
1000
        s = s.where(n.c.node == v.c.node)
1001
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
1002
        conj = []
1003
        for path, match in pathq:
1004
            if match == MATCH_PREFIX:
1005
                conj.append(
1006
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
1007
            elif match == MATCH_EXACT:
1008
                conj.append(n.c.path == path)
1009
        if conj:
1010
            s = s.where(or_(*conj))
1011

    
1012
        if sizeq and len(sizeq) == 2:
1013
            if sizeq[0]:
1014
                s = s.where(v.c.size >= sizeq[0])
1015
            if sizeq[1]:
1016
                s = s.where(v.c.size < sizeq[1])
1017

    
1018
        if domain and filterq:
1019
            a = self.attributes.alias('a')
1020
            included, excluded, opers = parse_filters(filterq)
1021
            if included:
1022
                subs = select([1])
1023
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1024
                subs = subs.where(a.c.domain == domain)
1025
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1026
                s = s.where(exists(subs))
1027
            if excluded:
1028
                subs = select([1])
1029
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1030
                subs = subs.where(a.c.domain == domain)
1031
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1032
                s = s.where(not_(exists(subs)))
1033
            if opers:
1034
                for k, o, val in opers:
1035
                    subs = select([1])
1036
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1037
                    subs = subs.where(a.c.domain == domain)
1038
                    subs = subs.where(
1039
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1040
                    s = s.where(exists(subs))
1041

    
1042
        s = s.order_by(n.c.path)
1043

    
1044
        if not delimiter:
1045
            s = s.limit(limit)
1046
            rp = self.conn.execute(s, start=start)
1047
            r = rp.fetchall()
1048
            rp.close()
1049
            return r, ()
1050

    
1051
        pfz = len(prefix)
1052
        dz = len(delimiter)
1053
        count = 0
1054
        prefixes = []
1055
        pappend = prefixes.append
1056
        matches = []
1057
        mappend = matches.append
1058

    
1059
        rp = self.conn.execute(s, start=start)
1060
        while True:
1061
            props = rp.fetchone()
1062
            if props is None:
1063
                break
1064
            path = props[0]
1065
            serial = props[1]
1066
            idx = path.find(delimiter, pfz)
1067

    
1068
            if idx < 0:
1069
                mappend(props)
1070
                count += 1
1071
                if count >= limit:
1072
                    break
1073
                continue
1074

    
1075
            if idx + dz == len(path):
1076
                mappend(props)
1077
                count += 1
1078
                continue  # Get one more, in case there is a path.
1079
            pf = path[:idx + dz]
1080
            pappend(pf)
1081
            if count >= limit:
1082
                break
1083

    
1084
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1085
        rp.close()
1086

    
1087
        return matches, prefixes
1088

    
1089
    def latest_uuid(self, uuid, cluster):
1090
        """Return the latest version of the given uuid and cluster.
1091

1092
        Return a (path, serial) tuple.
1093
        If cluster is None, all clusters are considered.
1094

1095
        """
1096

    
1097
        v = self.versions.alias('v')
1098
        n = self.nodes.alias('n')
1099
        s = select([n.c.path, v.c.serial])
1100
        filtered = select([func.max(self.versions.c.serial)])
1101
        filtered = filtered.where(self.versions.c.uuid == uuid)
1102
        if cluster is not None:
1103
            filtered = filtered.where(self.versions.c.cluster == cluster)
1104
        s = s.where(v.c.serial == filtered)
1105
        s = s.where(n.c.node == v.c.node)
1106

    
1107
        r = self.conn.execute(s)
1108
        l = r.fetchone()
1109
        r.close()
1110
        return l
1111

    
1112
    def domain_object_list(self, domain, cluster=None):
1113
        """Return a list of (path, property list, attribute dictionary)
1114
           for the objects in the specific domain and cluster.
1115
        """
1116

    
1117
        v = self.versions.alias('v')
1118
        n = self.nodes.alias('n')
1119
        a = self.attributes.alias('a')
1120

    
1121
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1122
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1123
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1124
        s = s.where(n.c.node == v.c.node)
1125
        s = s.where(n.c.latest_version == v.c.serial)
1126
        if cluster:
1127
            s = s.where(v.c.cluster == cluster)
1128
        s = s.where(v.c.serial == a.c.serial)
1129
        s = s.where(a.c.domain == domain)
1130

    
1131
        r = self.conn.execute(s)
1132
        rows = r.fetchall()
1133
        r.close()
1134

    
1135
        group_by = itemgetter(slice(12))
1136
        rows.sort(key = group_by)
1137
        groups = groupby(rows, group_by)
1138
        return [(k[0], k[1:], dict([i[12:] for i in data])) \
1139
            for (k, data) in groups]