Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 94bff756

History | View | Annotate | Download (40.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41
from sqlalchemy.exc import NoSuchTableError
42

    
43
from dbworker import DBWorker
44

    
45
from pithos.backends.filter import parse_filters
46

    
47

    
48
ROOTNODE = 0
49

    
50
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
51
 CLUSTER) = range(11)
52

    
53
(MATCH_PREFIX, MATCH_EXACT) = range(2)
54

    
55
inf = float('inf')
56

    
57

    
58
def strnextling(prefix):
59
    """Return the first unicode string
60
       greater than but not starting with given prefix.
61
       strnextling('hello') -> 'hellp'
62
    """
63
    if not prefix:
64
        ## all strings start with the null string,
65
        ## therefore we have to approximate strnextling('')
66
        ## with the last unicode character supported by python
67
        ## 0x10ffff for wide (32-bit unicode) python builds
68
        ## 0x00ffff for narrow (16-bit unicode) python builds
69
        ## We will not autodetect. 0xffff is safe enough.
70
        return unichr(0xffff)
71
    s = prefix[:-1]
72
    c = ord(prefix[-1])
73
    if c >= 0xffff:
74
        raise RuntimeError
75
    s += unichr(c + 1)
76
    return s
77

    
78

    
79
def strprevling(prefix):
80
    """Return an approximation of the last unicode string
81
       less than but not starting with given prefix.
82
       strprevling(u'hello') -> u'helln\\xffff'
83
    """
84
    if not prefix:
85
        ## There is no prevling for the null string
86
        return prefix
87
    s = prefix[:-1]
88
    c = ord(prefix[-1])
89
    if c > 0:
90
        s += unichr(c - 1) + unichr(0xffff)
91
    return s
92

    
93
_propnames = {
94
    'serial': 0,
95
    'node': 1,
96
    'hash': 2,
97
    'size': 3,
98
    'type': 4,
99
    'source': 5,
100
    'mtime': 6,
101
    'muser': 7,
102
    'uuid': 8,
103
    'checksum': 9,
104
    'cluster': 10
105
}
106

    
107

    
108
def create_tables(engine):
109
    metadata = MetaData()
110

    
111
    #create nodes table
112
    columns = []
113
    columns.append(Column('node', Integer, primary_key=True))
114
    columns.append(Column('parent', Integer,
115
                          ForeignKey('nodes.node',
116
                                     ondelete='CASCADE',
117
                                     onupdate='CASCADE'),
118
                          autoincrement=False))
119
    columns.append(Column('latest_version', Integer))
120
    columns.append(Column('path', String(2048), default='', nullable=False))
121
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
122
    Index('idx_nodes_path', nodes.c.path, unique=True)
123
    Index('idx_nodes_parent', nodes.c.parent)
124

    
125
    #create policy table
126
    columns = []
127
    columns.append(Column('node', Integer,
128
                          ForeignKey('nodes.node',
129
                                     ondelete='CASCADE',
130
                                     onupdate='CASCADE'),
131
                          primary_key=True))
132
    columns.append(Column('key', String(128), primary_key=True))
133
    columns.append(Column('value', String(256)))
134
    policy = Table('policy', metadata, *columns, mysql_engine='InnoDB')
135

    
136
    #create statistics table
137
    columns = []
138
    columns.append(Column('node', Integer,
139
                          ForeignKey('nodes.node',
140
                                     ondelete='CASCADE',
141
                                     onupdate='CASCADE'),
142
                          primary_key=True))
143
    columns.append(Column('population', Integer, nullable=False, default=0))
144
    columns.append(Column('size', BigInteger, nullable=False, default=0))
145
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
146
    columns.append(Column('cluster', Integer, nullable=False, default=0,
147
                          primary_key=True, autoincrement=False))
148
    statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
149

    
150
    #create versions table
151
    columns = []
152
    columns.append(Column('serial', Integer, primary_key=True))
153
    columns.append(Column('node', Integer,
154
                          ForeignKey('nodes.node',
155
                                     ondelete='CASCADE',
156
                                     onupdate='CASCADE')))
157
    columns.append(Column('hash', String(256)))
158
    columns.append(Column('size', BigInteger, nullable=False, default=0))
159
    columns.append(Column('type', String(256), nullable=False, default=''))
160
    columns.append(Column('source', Integer))
161
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
162
    columns.append(Column('muser', String(256), nullable=False, default=''))
163
    columns.append(Column('uuid', String(64), nullable=False, default=''))
164
    columns.append(Column('checksum', String(256), nullable=False, default=''))
165
    columns.append(Column('cluster', Integer, nullable=False, default=0))
166
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
167
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
168
    Index('idx_versions_node_uuid', versions.c.uuid)
169

    
170
    #create attributes table
171
    columns = []
172
    columns.append(Column('serial', Integer,
173
                          ForeignKey('versions.serial',
174
                                     ondelete='CASCADE',
175
                                     onupdate='CASCADE'),
176
                          primary_key=True))
177
    columns.append(Column('domain', String(256), primary_key=True))
178
    columns.append(Column('key', String(128), primary_key=True))
179
    columns.append(Column('value', String(256)))
180
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
181

    
182
    metadata.create_all(engine)
183
    return metadata.sorted_tables
184

    
185

    
186
class Node(DBWorker):
187
    """Nodes store path organization and have multiple versions.
188
       Versions store object history and have multiple attributes.
189
       Attributes store metadata.
190
    """
191

    
192
    # TODO: Provide an interface for included and excluded clusters.
193

    
194
    def __init__(self, **params):
195
        DBWorker.__init__(self, **params)
196
        try:
197
            metadata = MetaData(self.engine)
198
            self.nodes = Table('nodes', metadata, autoload=True)
199
            self.policy = Table('policy', metadata, autoload=True)
200
            self.statistics = Table('statistics', metadata, autoload=True)
201
            self.versions = Table('versions', metadata, autoload=True)
202
            self.attributes = Table('attributes', metadata, autoload=True)
203
        except NoSuchTableError:
204
            tables = create_tables(self.engine)
205
            map(lambda t: self.__setattr__(t.name, t), tables)
206

    
207
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
208
                                           self.nodes.c.parent == ROOTNODE))
209
        rp = self.conn.execute(s)
210
        r = rp.fetchone()
211
        rp.close()
212
        if not r:
213
            s = self.nodes.insert(
214
            ).values(node=ROOTNODE, parent=ROOTNODE, path='')
215
            self.conn.execute(s)
216

    
217
    def node_create(self, parent, path):
218
        """Create a new node from the given properties.
219
           Return the node identifier of the new node.
220
        """
221
        #TODO catch IntegrityError?
222
        s = self.nodes.insert().values(parent=parent, path=path)
223
        r = self.conn.execute(s)
224
        inserted_primary_key = r.inserted_primary_key[0]
225
        r.close()
226
        return inserted_primary_key
227

    
228
    def node_lookup(self, path):
229
        """Lookup the current node of the given path.
230
           Return None if the path is not found.
231
        """
232

    
233
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
234
        s = select([self.nodes.c.node], self.nodes.c.path.like(
235
            self.escape_like(path), escape='\\'))
236
        r = self.conn.execute(s)
237
        row = r.fetchone()
238
        r.close()
239
        if row:
240
            return row[0]
241
        return None
242

    
243
    def node_lookup_bulk(self, paths):
244
        """Lookup the current nodes for the given paths.
245
           Return () if the path is not found.
246
        """
247

    
248
        if not paths:
249
            return ()
250
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
251
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
252
        r = self.conn.execute(s)
253
        rows = r.fetchall()
254
        r.close()
255
        return [row[0] for row in rows]
256

    
257
    def node_get_properties(self, node):
258
        """Return the node's (parent, path).
259
           Return None if the node is not found.
260
        """
261

    
262
        s = select([self.nodes.c.parent, self.nodes.c.path])
263
        s = s.where(self.nodes.c.node == node)
264
        r = self.conn.execute(s)
265
        l = r.fetchone()
266
        r.close()
267
        return l
268

    
269
    def node_get_versions(self, node, keys=(), propnames=_propnames):
270
        """Return the properties of all versions at node.
271
           If keys is empty, return all properties in the order
272
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
273
        """
274

    
275
        s = select([self.versions.c.serial,
276
                    self.versions.c.node,
277
                    self.versions.c.hash,
278
                    self.versions.c.size,
279
                    self.versions.c.type,
280
                    self.versions.c.source,
281
                    self.versions.c.mtime,
282
                    self.versions.c.muser,
283
                    self.versions.c.uuid,
284
                    self.versions.c.checksum,
285
                    self.versions.c.cluster], self.versions.c.node == node)
286
        s = s.order_by(self.versions.c.serial)
287
        r = self.conn.execute(s)
288
        rows = r.fetchall()
289
        r.close()
290
        if not rows:
291
            return rows
292

    
293
        if not keys:
294
            return rows
295

    
296
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
297

    
298
    def node_count_children(self, node):
299
        """Return node's child count."""
300

    
301
        s = select([func.count(self.nodes.c.node)])
302
        s = s.where(and_(self.nodes.c.parent == node,
303
                         self.nodes.c.node != ROOTNODE))
304
        r = self.conn.execute(s)
305
        row = r.fetchone()
306
        r.close()
307
        return row[0]
308

    
309
    def node_purge_children(self, parent, before=inf, cluster=0):
310
        """Delete all versions with the specified
311
           parent and cluster, and return
312
           the hashes and size of versions deleted.
313
           Clears out nodes with no remaining versions.
314
        """
315
        #update statistics
316
        c1 = select([self.nodes.c.node],
317
                    self.nodes.c.parent == parent)
318
        where_clause = and_(self.versions.c.node.in_(c1),
319
                            self.versions.c.cluster == cluster)
320
        s = select([func.count(self.versions.c.serial),
321
                    func.sum(self.versions.c.size)])
322
        s = s.where(where_clause)
323
        if before != inf:
324
            s = s.where(self.versions.c.mtime <= before)
325
        r = self.conn.execute(s)
326
        row = r.fetchone()
327
        r.close()
328
        if not row:
329
            return (), 0
330
        nr, size = row[0], -row[1] if row[1] else 0
331
        mtime = time()
332
        self.statistics_update(parent, -nr, size, mtime, cluster)
333
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
334

    
335
        s = select([self.versions.c.hash, self.versions.c.serial])
336
        s = s.where(where_clause)
337
        r = self.conn.execute(s)
338
        hashes = []
339
        serials = []
340
        for row in r.fetchall():
341
            hashes += [row[0]]
342
            serials += [row[1]]
343
        r.close()
344

    
345
        #delete versions
346
        s = self.versions.delete().where(where_clause)
347
        r = self.conn.execute(s)
348
        r.close()
349

    
350
        #delete nodes
351
        s = select([self.nodes.c.node],
352
                   and_(self.nodes.c.parent == parent,
353
                        select([func.count(self.versions.c.serial)],
354
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
355
        rp = self.conn.execute(s)
356
        nodes = [r[0] for r in rp.fetchall()]
357
        rp.close()
358
        if nodes:
359
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
360
            self.conn.execute(s).close()
361

    
362
        return hashes, size, serials
363

    
364
    def node_purge(self, node, before=inf, cluster=0):
365
        """Delete all versions with the specified
366
           node and cluster, and return
367
           the hashes and size of versions deleted.
368
           Clears out the node if it has no remaining versions.
369
        """
370

    
371
        #update statistics
372
        s = select([func.count(self.versions.c.serial),
373
                    func.sum(self.versions.c.size)])
374
        where_clause = and_(self.versions.c.node == node,
375
                            self.versions.c.cluster == cluster)
376
        s = s.where(where_clause)
377
        if before != inf:
378
            s = s.where(self.versions.c.mtime <= before)
379
        r = self.conn.execute(s)
380
        row = r.fetchone()
381
        nr, size = row[0], row[1]
382
        r.close()
383
        if not nr:
384
            return (), 0
385
        mtime = time()
386
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
387

    
388
        s = select([self.versions.c.hash, self.versions.c.serial])
389
        s = s.where(where_clause)
390
        r = self.conn.execute(s)
391
        hashes = []
392
        serials = []
393
        for row in r.fetchall():
394
            hashes += [row[0]]
395
            serials += [row[1]]
396
        r.close()
397

    
398
        #delete versions
399
        s = self.versions.delete().where(where_clause)
400
        r = self.conn.execute(s)
401
        r.close()
402

    
403
        #delete nodes
404
        s = select([self.nodes.c.node],
405
                   and_(self.nodes.c.node == node,
406
                        select([func.count(self.versions.c.serial)],
407
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
408
        r = self.conn.execute(s)
409
        nodes = r.fetchall()
410
        r.close()
411
        if nodes:
412
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
413
            self.conn.execute(s).close()
414

    
415
        return hashes, size, serials
416

    
417
    def node_remove(self, node):
418
        """Remove the node specified.
419
           Return false if the node has children or is not found.
420
        """
421

    
422
        if self.node_count_children(node):
423
            return False
424

    
425
        mtime = time()
426
        s = select([func.count(self.versions.c.serial),
427
                    func.sum(self.versions.c.size),
428
                    self.versions.c.cluster])
429
        s = s.where(self.versions.c.node == node)
430
        s = s.group_by(self.versions.c.cluster)
431
        r = self.conn.execute(s)
432
        for population, size, cluster in r.fetchall():
433
            self.statistics_update_ancestors(
434
                node, -population, -size, mtime, cluster)
435
        r.close()
436

    
437
        s = self.nodes.delete().where(self.nodes.c.node == node)
438
        self.conn.execute(s).close()
439
        return True
440

    
441
    def node_accounts(self):
442
        s = select([self.nodes.c.path])
443
        s = s.where(and_(self.nodes.c.node != 0, self.nodes.c.parent == 0))
444
        account_nodes = self.conn.execute(s).fetchall()
445
        return sorted(i[0] for i in account_nodes)
446

    
447
    def policy_get(self, node):
448
        s = select([self.policy.c.key, self.policy.c.value],
449
                   self.policy.c.node == node)
450
        r = self.conn.execute(s)
451
        d = dict(r.fetchall())
452
        r.close()
453
        return d
454

    
455
    def policy_set(self, node, policy):
456
        #insert or replace
457
        for k, v in policy.iteritems():
458
            s = self.policy.update().where(and_(self.policy.c.node == node,
459
                                                self.policy.c.key == k))
460
            s = s.values(value=v)
461
            rp = self.conn.execute(s)
462
            rp.close()
463
            if rp.rowcount == 0:
464
                s = self.policy.insert()
465
                values = {'node': node, 'key': k, 'value': v}
466
                r = self.conn.execute(s, values)
467
                r.close()
468

    
469
    def statistics_get(self, node, cluster=0):
470
        """Return population, total size and last mtime
471
           for all versions under node that belong to the cluster.
472
        """
473

    
474
        s = select([self.statistics.c.population,
475
                    self.statistics.c.size,
476
                    self.statistics.c.mtime])
477
        s = s.where(and_(self.statistics.c.node == node,
478
                         self.statistics.c.cluster == cluster))
479
        r = self.conn.execute(s)
480
        row = r.fetchone()
481
        r.close()
482
        return row
483

    
484
    def statistics_update(self, node, population, size, mtime, cluster=0):
485
        """Update the statistics of the given node.
486
           Statistics keep track the population, total
487
           size of objects and mtime in the node's namespace.
488
           May be zero or positive or negative numbers.
489
        """
490
        s = select([self.statistics.c.population, self.statistics.c.size],
491
                   and_(self.statistics.c.node == node,
492
                        self.statistics.c.cluster == cluster))
493
        rp = self.conn.execute(s)
494
        r = rp.fetchone()
495
        rp.close()
496
        if not r:
497
            prepopulation, presize = (0, 0)
498
        else:
499
            prepopulation, presize = r
500
        population += prepopulation
501
        population = max(population, 0)
502
        size += presize
503

    
504
        #insert or replace
505
        #TODO better upsert
506
        u = self.statistics.update().where(and_(self.statistics.c.node == node,
507
                                           self.statistics.c.cluster == cluster))
508
        u = u.values(population=population, size=size, mtime=mtime)
509
        rp = self.conn.execute(u)
510
        rp.close()
511
        if rp.rowcount == 0:
512
            ins = self.statistics.insert()
513
            ins = ins.values(node=node, population=population, size=size,
514
                             mtime=mtime, cluster=cluster)
515
            self.conn.execute(ins).close()
516

    
517
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
518
        """Update the statistics of the given node's parent.
519
           Then recursively update all parents up to the root.
520
           Population is not recursive.
521
        """
522

    
523
        while True:
524
            if node == ROOTNODE:
525
                break
526
            props = self.node_get_properties(node)
527
            if props is None:
528
                break
529
            parent, path = props
530
            self.statistics_update(parent, population, size, mtime, cluster)
531
            node = parent
532
            population = 0  # Population isn't recursive
533

    
534
    def statistics_latest(self, node, before=inf, except_cluster=0):
535
        """Return population, total size and last mtime
536
           for all latest versions under node that
537
           do not belong to the cluster.
538
        """
539

    
540
        # The node.
541
        props = self.node_get_properties(node)
542
        if props is None:
543
            return None
544
        parent, path = props
545

    
546
        # The latest version.
547
        s = select([self.versions.c.serial,
548
                    self.versions.c.node,
549
                    self.versions.c.hash,
550
                    self.versions.c.size,
551
                    self.versions.c.type,
552
                    self.versions.c.source,
553
                    self.versions.c.mtime,
554
                    self.versions.c.muser,
555
                    self.versions.c.uuid,
556
                    self.versions.c.checksum,
557
                    self.versions.c.cluster])
558
        if before != inf:
559
            filtered = select([func.max(self.versions.c.serial)],
560
                              self.versions.c.node == node)
561
            filtered = filtered.where(self.versions.c.mtime < before)
562
        else:
563
            filtered = select([self.nodes.c.latest_version],
564
                              self.versions.c.node == node)
565
        s = s.where(and_(self.versions.c.cluster != except_cluster,
566
                         self.versions.c.serial == filtered))
567
        r = self.conn.execute(s)
568
        props = r.fetchone()
569
        r.close()
570
        if not props:
571
            return None
572
        mtime = props[MTIME]
573

    
574
        # First level, just under node (get population).
575
        v = self.versions.alias('v')
576
        s = select([func.count(v.c.serial),
577
                    func.sum(v.c.size),
578
                    func.max(v.c.mtime)])
579
        if before != inf:
580
            c1 = select([func.max(self.versions.c.serial)])
581
            c1 = c1.where(self.versions.c.mtime < before)
582
            c1.where(self.versions.c.node == v.c.node)
583
        else:
584
            c1 = select([self.nodes.c.latest_version])
585
            c1.where(self.nodes.c.node == v.c.node)
586
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
587
        s = s.where(and_(v.c.serial == c1,
588
                         v.c.cluster != except_cluster,
589
                         v.c.node.in_(c2)))
590
        rp = self.conn.execute(s)
591
        r = rp.fetchone()
592
        rp.close()
593
        if not r:
594
            return None
595
        count = r[0]
596
        mtime = max(mtime, r[2])
597
        if count == 0:
598
            return (0, 0, mtime)
599

    
600
        # All children (get size and mtime).
601
        # This is why the full path is stored.
602
        s = select([func.count(v.c.serial),
603
                    func.sum(v.c.size),
604
                    func.max(v.c.mtime)])
605
        if before != inf:
606
            c1 = select([func.max(self.versions.c.serial)],
607
                        self.versions.c.node == v.c.node)
608
            c1 = c1.where(self.versions.c.mtime < before)
609
        else:
610
            c1 = select([self.nodes.c.serial],
611
                        self.nodes.c.node == v.c.node)
612
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(
613
            self.escape_like(path) + '%', escape='\\'))
614
        s = s.where(and_(v.c.serial == c1,
615
                         v.c.cluster != except_cluster,
616
                         v.c.node.in_(c2)))
617
        rp = self.conn.execute(s)
618
        r = rp.fetchone()
619
        rp.close()
620
        if not r:
621
            return None
622
        size = r[1] - props[SIZE]
623
        mtime = max(mtime, r[2])
624
        return (count, size, mtime)
625

    
626
    def nodes_set_latest_version(self, node, serial):
627
        s = self.nodes.update().where(self.nodes.c.node == node)
628
        s = s.values(latest_version=serial)
629
        self.conn.execute(s).close()
630

    
631
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
632
        """Create a new version from the given properties.
633
           Return the (serial, mtime) of the new version.
634
        """
635

    
636
        mtime = time()
637
        s = self.versions.insert(
638
        ).values(node=node, hash=hash, size=size, type=type, source=source,
639
                 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
640
        serial = self.conn.execute(s).inserted_primary_key[0]
641
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
642

    
643
        self.nodes_set_latest_version(node, serial)
644

    
645
        return serial, mtime
646

    
647
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
648
        """Lookup the current version of the given node.
649
           Return a list with its properties:
650
           (serial, node, hash, size, type, source, mtime,
651
            muser, uuid, checksum, cluster)
652
           or None if the current version is not found in the given cluster.
653
        """
654

    
655
        v = self.versions.alias('v')
656
        if not all_props:
657
            s = select([v.c.serial])
658
        else:
659
            s = select([v.c.serial, v.c.node, v.c.hash,
660
                        v.c.size, v.c.type, v.c.source,
661
                        v.c.mtime, v.c.muser, v.c.uuid,
662
                        v.c.checksum, v.c.cluster])
663
        if before != inf:
664
            c = select([func.max(self.versions.c.serial)],
665
                       self.versions.c.node == node)
666
            c = c.where(self.versions.c.mtime < before)
667
        else:
668
            c = select([self.nodes.c.latest_version],
669
                       self.nodes.c.node == node)
670
        s = s.where(and_(v.c.serial == c,
671
                         v.c.cluster == cluster))
672
        r = self.conn.execute(s)
673
        props = r.fetchone()
674
        r.close()
675
        if props:
676
            return props
677
        return None
678

    
679
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
680
        """Lookup the current versions of the given nodes.
681
           Return a list with their properties:
682
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
683
        """
684
        if not nodes:
685
            return ()
686
        v = self.versions.alias('v')
687
        if not all_props:
688
            s = select([v.c.serial])
689
        else:
690
            s = select([v.c.serial, v.c.node, v.c.hash,
691
                        v.c.size, v.c.type, v.c.source,
692
                        v.c.mtime, v.c.muser, v.c.uuid,
693
                        v.c.checksum, v.c.cluster])
694
        if before != inf:
695
            c = select([func.max(self.versions.c.serial)],
696
                       self.versions.c.node.in_(nodes))
697
            c = c.where(self.versions.c.mtime < before)
698
            c = c.group_by(self.versions.c.node)
699
        else:
700
            c = select([self.nodes.c.latest_version],
701
                       self.nodes.c.node.in_(nodes))
702
        s = s.where(and_(v.c.serial.in_(c),
703
                         v.c.cluster == cluster))
704
        s = s.order_by(v.c.node)
705
        r = self.conn.execute(s)
706
        rproxy = r.fetchall()
707
        r.close()
708
        return (tuple(row.values()) for row in rproxy)
709

    
710
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
711
        """Return a sequence of values for the properties of
712
           the version specified by serial and the keys, in the order given.
713
           If keys is empty, return all properties in the order
714
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
715
        """
716

    
717
        v = self.versions.alias()
718
        s = select([v.c.serial, v.c.node, v.c.hash,
719
                    v.c.size, v.c.type, v.c.source,
720
                    v.c.mtime, v.c.muser, v.c.uuid,
721
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
722
        rp = self.conn.execute(s)
723
        r = rp.fetchone()
724
        rp.close()
725
        if r is None:
726
            return r
727

    
728
        if not keys:
729
            return r
730
        return [r[propnames[k]] for k in keys if k in propnames]
731

    
732
    def version_put_property(self, serial, key, value):
733
        """Set value for the property of version specified by key."""
734

    
735
        if key not in _propnames:
736
            return
737
        s = self.versions.update()
738
        s = s.where(self.versions.c.serial == serial)
739
        s = s.values(**{key: value})
740
        self.conn.execute(s).close()
741

    
742
    def version_recluster(self, serial, cluster):
743
        """Move the version into another cluster."""
744

    
745
        props = self.version_get_properties(serial)
746
        if not props:
747
            return
748
        node = props[NODE]
749
        size = props[SIZE]
750
        oldcluster = props[CLUSTER]
751
        if cluster == oldcluster:
752
            return
753

    
754
        mtime = time()
755
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
756
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
757

    
758
        s = self.versions.update()
759
        s = s.where(self.versions.c.serial == serial)
760
        s = s.values(cluster=cluster)
761
        self.conn.execute(s).close()
762

    
763
    def version_remove(self, serial):
764
        """Remove the serial specified."""
765

    
766
        props = self.version_get_properties(serial)
767
        if not props:
768
            return
769
        node = props[NODE]
770
        hash = props[HASH]
771
        size = props[SIZE]
772
        cluster = props[CLUSTER]
773

    
774
        mtime = time()
775
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
776

    
777
        s = self.versions.delete().where(self.versions.c.serial == serial)
778
        self.conn.execute(s).close()
779

    
780
        props = self.version_lookup(node, cluster=cluster, all_props=False)
781
        if props:
782
            self.nodes_set_latest_version(node, serial)
783

    
784
        return hash, size
785

    
786
    def attribute_get(self, serial, domain, keys=()):
787
        """Return a list of (key, value) pairs of the version specified by serial.
788
           If keys is empty, return all attributes.
789
           Othwerise, return only those specified.
790
        """
791

    
792
        if keys:
793
            attrs = self.attributes.alias()
794
            s = select([attrs.c.key, attrs.c.value])
795
            s = s.where(and_(attrs.c.key.in_(keys),
796
                             attrs.c.serial == serial,
797
                             attrs.c.domain == domain))
798
        else:
799
            attrs = self.attributes.alias()
800
            s = select([attrs.c.key, attrs.c.value])
801
            s = s.where(and_(attrs.c.serial == serial,
802
                             attrs.c.domain == domain))
803
        r = self.conn.execute(s)
804
        l = r.fetchall()
805
        r.close()
806
        return l
807

    
808
    def attribute_set(self, serial, domain, items):
809
        """Set the attributes of the version specified by serial.
810
           Receive attributes as an iterable of (key, value) pairs.
811
        """
812
        #insert or replace
813
        #TODO better upsert
814
        for k, v in items:
815
            s = self.attributes.update()
816
            s = s.where(and_(self.attributes.c.serial == serial,
817
                             self.attributes.c.domain == domain,
818
                             self.attributes.c.key == k))
819
            s = s.values(value=v)
820
            rp = self.conn.execute(s)
821
            rp.close()
822
            if rp.rowcount == 0:
823
                s = self.attributes.insert()
824
                s = s.values(serial=serial, domain=domain, key=k, value=v)
825
                self.conn.execute(s).close()
826

    
827
    def attribute_del(self, serial, domain, keys=()):
828
        """Delete attributes of the version specified by serial.
829
           If keys is empty, delete all attributes.
830
           Otherwise delete those specified.
831
        """
832

    
833
        if keys:
834
            #TODO more efficient way to do this?
835
            for key in keys:
836
                s = self.attributes.delete()
837
                s = s.where(and_(self.attributes.c.serial == serial,
838
                                 self.attributes.c.domain == domain,
839
                                 self.attributes.c.key == key))
840
                self.conn.execute(s).close()
841
        else:
842
            s = self.attributes.delete()
843
            s = s.where(and_(self.attributes.c.serial == serial,
844
                             self.attributes.c.domain == domain))
845
            self.conn.execute(s).close()
846

    
847
    def attribute_copy(self, source, dest):
848
        s = select(
849
            [dest, self.attributes.c.domain,
850
                self.attributes.c.key, self.attributes.c.value],
851
            self.attributes.c.serial == source)
852
        rp = self.conn.execute(s)
853
        attributes = rp.fetchall()
854
        rp.close()
855
        for dest, domain, k, v in attributes:
856
            #insert or replace
857
            s = self.attributes.update().where(and_(
858
                self.attributes.c.serial == dest,
859
                self.attributes.c.domain == domain,
860
                self.attributes.c.key == k))
861
            rp = self.conn.execute(s, value=v)
862
            rp.close()
863
            if rp.rowcount == 0:
864
                s = self.attributes.insert()
865
                values = {'serial': dest, 'domain': domain,
866
                          'key': k, 'value': v}
867
                self.conn.execute(s, values).close()
868

    
869
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
870
        """Return a list with all keys pairs defined
871
           for all latest versions under parent that
872
           do not belong to the cluster.
873
        """
874

    
875
        # TODO: Use another table to store before=inf results.
876
        a = self.attributes.alias('a')
877
        v = self.versions.alias('v')
878
        n = self.nodes.alias('n')
879
        s = select([a.c.key]).distinct()
880
        if before != inf:
881
            filtered = select([func.max(self.versions.c.serial)])
882
            filtered = filtered.where(self.versions.c.mtime < before)
883
            filtered = filtered.where(self.versions.c.node == v.c.node)
884
        else:
885
            filtered = select([self.nodes.c.latest_version])
886
            filtered = filtered.where(self.nodes.c.node == v.c.node)
887
        s = s.where(v.c.serial == filtered)
888
        s = s.where(v.c.cluster != except_cluster)
889
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
890
                                        self.nodes.c.parent == parent)))
891
        s = s.where(a.c.serial == v.c.serial)
892
        s = s.where(a.c.domain == domain)
893
        s = s.where(n.c.node == v.c.node)
894
        conj = []
895
        for path, match in pathq:
896
            if match == MATCH_PREFIX:
897
                conj.append(
898
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
899
            elif match == MATCH_EXACT:
900
                conj.append(n.c.path == path)
901
        if conj:
902
            s = s.where(or_(*conj))
903
        rp = self.conn.execute(s)
904
        rows = rp.fetchall()
905
        rp.close()
906
        return [r[0] for r in rows]
907

    
908
    def latest_version_list(self, parent, prefix='', delimiter=None,
909
                            start='', limit=10000, before=inf,
910
                            except_cluster=0, pathq=[], domain=None,
911
                            filterq=[], sizeq=None, all_props=False):
912
        """Return a (list of (path, serial) tuples, list of common prefixes)
913
           for the current versions of the paths with the given parent,
914
           matching the following criteria.
915

916
           The property tuple for a version is returned if all
917
           of these conditions are true:
918

919
                a. parent matches
920

921
                b. path > start
922

923
                c. path starts with prefix (and paths in pathq)
924

925
                d. version is the max up to before
926

927
                e. version is not in cluster
928

929
                f. the path does not have the delimiter occuring
930
                   after the prefix, or ends with the delimiter
931

932
                g. serial matches the attribute filter query.
933

934
                   A filter query is a comma-separated list of
935
                   terms in one of these three forms:
936

937
                   key
938
                       an attribute with this key must exist
939

940
                   !key
941
                       an attribute with this key must not exist
942

943
                   key ?op value
944
                       the attribute with this key satisfies the value
945
                       where ?op is one of ==, != <=, >=, <, >.
946

947
                h. the size is in the range set by sizeq
948

949
           The list of common prefixes includes the prefixes
950
           matching up to the first delimiter after prefix,
951
           and are reported only once, as "virtual directories".
952
           The delimiter is included in the prefixes.
953

954
           If arguments are None, then the corresponding matching rule
955
           will always match.
956

957
           Limit applies to the first list of tuples returned.
958

959
           If all_props is True, return all properties after path, not just serial.
960
        """
961

    
962
        if not start or start < prefix:
963
            start = strprevling(prefix)
964
        nextling = strnextling(prefix)
965

    
966
        v = self.versions.alias('v')
967
        n = self.nodes.alias('n')
968
        if not all_props:
969
            s = select([n.c.path, v.c.serial]).distinct()
970
        else:
971
            s = select([n.c.path,
972
                        v.c.serial, v.c.node, v.c.hash,
973
                        v.c.size, v.c.type, v.c.source,
974
                        v.c.mtime, v.c.muser, v.c.uuid,
975
                        v.c.checksum, v.c.cluster]).distinct()
976
        if before != inf:
977
            filtered = select([func.max(self.versions.c.serial)])
978
            filtered = filtered.where(self.versions.c.mtime < before)
979
        else:
980
            filtered = select([self.nodes.c.latest_version])
981
        s = s.where(
982
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
983
        s = s.where(v.c.cluster != except_cluster)
984
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
985
                                        self.nodes.c.parent == parent)))
986

    
987
        s = s.where(n.c.node == v.c.node)
988
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
989
        conj = []
990
        for path, match in pathq:
991
            if match == MATCH_PREFIX:
992
                conj.append(
993
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
994
            elif match == MATCH_EXACT:
995
                conj.append(n.c.path == path)
996
        if conj:
997
            s = s.where(or_(*conj))
998

    
999
        if sizeq and len(sizeq) == 2:
1000
            if sizeq[0]:
1001
                s = s.where(v.c.size >= sizeq[0])
1002
            if sizeq[1]:
1003
                s = s.where(v.c.size < sizeq[1])
1004

    
1005
        if domain and filterq:
1006
            a = self.attributes.alias('a')
1007
            included, excluded, opers = parse_filters(filterq)
1008
            if included:
1009
                subs = select([1])
1010
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1011
                subs = subs.where(a.c.domain == domain)
1012
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1013
                s = s.where(exists(subs))
1014
            if excluded:
1015
                subs = select([1])
1016
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1017
                subs = subs.where(a.c.domain == domain)
1018
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1019
                s = s.where(not_(exists(subs)))
1020
            if opers:
1021
                for k, o, val in opers:
1022
                    subs = select([1])
1023
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1024
                    subs = subs.where(a.c.domain == domain)
1025
                    subs = subs.where(
1026
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1027
                    s = s.where(exists(subs))
1028

    
1029
        s = s.order_by(n.c.path)
1030

    
1031
        if not delimiter:
1032
            s = s.limit(limit)
1033
            rp = self.conn.execute(s, start=start)
1034
            r = rp.fetchall()
1035
            rp.close()
1036
            return r, ()
1037

    
1038
        pfz = len(prefix)
1039
        dz = len(delimiter)
1040
        count = 0
1041
        prefixes = []
1042
        pappend = prefixes.append
1043
        matches = []
1044
        mappend = matches.append
1045

    
1046
        rp = self.conn.execute(s, start=start)
1047
        while True:
1048
            props = rp.fetchone()
1049
            if props is None:
1050
                break
1051
            path = props[0]
1052
            serial = props[1]
1053
            idx = path.find(delimiter, pfz)
1054

    
1055
            if idx < 0:
1056
                mappend(props)
1057
                count += 1
1058
                if count >= limit:
1059
                    break
1060
                continue
1061

    
1062
            if idx + dz == len(path):
1063
                mappend(props)
1064
                count += 1
1065
                continue  # Get one more, in case there is a path.
1066
            pf = path[:idx + dz]
1067
            pappend(pf)
1068
            if count >= limit:
1069
                break
1070

    
1071
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1072
        rp.close()
1073

    
1074
        return matches, prefixes
1075

    
1076
    def latest_uuid(self, uuid):
1077
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
1078

    
1079
        v = self.versions.alias('v')
1080
        n = self.nodes.alias('n')
1081
        s = select([n.c.path, v.c.serial])
1082
        filtered = select([func.max(self.versions.c.serial)])
1083
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
1084
        s = s.where(n.c.node == v.c.node)
1085

    
1086
        r = self.conn.execute(s)
1087
        l = r.fetchone()
1088
        r.close()
1089
        return l