Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 096a7c3b

History | View | Annotate | Download (40 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41
from sqlalchemy.exc import NoSuchTableError
42

    
43
from dbworker import DBWorker
44

    
45
from pithos.backends.filter import parse_filters
46

    
47

    
48
ROOTNODE = 0
49

    
50
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
51
 CLUSTER) = range(11)
52

    
53
(MATCH_PREFIX, MATCH_EXACT) = range(2)
54

    
55
inf = float('inf')
56

    
57

    
58
def strnextling(prefix):
59
    """Return the first unicode string
60
       greater than but not starting with given prefix.
61
       strnextling('hello') -> 'hellp'
62
    """
63
    if not prefix:
64
        ## all strings start with the null string,
65
        ## therefore we have to approximate strnextling('')
66
        ## with the last unicode character supported by python
67
        ## 0x10ffff for wide (32-bit unicode) python builds
68
        ## 0x00ffff for narrow (16-bit unicode) python builds
69
        ## We will not autodetect. 0xffff is safe enough.
70
        return unichr(0xffff)
71
    s = prefix[:-1]
72
    c = ord(prefix[-1])
73
    if c >= 0xffff:
74
        raise RuntimeError
75
    s += unichr(c + 1)
76
    return s
77

    
78

    
79
def strprevling(prefix):
80
    """Return an approximation of the last unicode string
81
       less than but not starting with given prefix.
82
       strprevling(u'hello') -> u'helln\\xffff'
83
    """
84
    if not prefix:
85
        ## There is no prevling for the null string
86
        return prefix
87
    s = prefix[:-1]
88
    c = ord(prefix[-1])
89
    if c > 0:
90
        s += unichr(c - 1) + unichr(0xffff)
91
    return s
92

    
93
_propnames = {
94
    'serial': 0,
95
    'node': 1,
96
    'hash': 2,
97
    'size': 3,
98
    'type': 4,
99
    'source': 5,
100
    'mtime': 6,
101
    'muser': 7,
102
    'uuid': 8,
103
    'checksum': 9,
104
    'cluster': 10
105
}
106

    
107

    
108
def create_tables(engine):
109
    metadata = MetaData()
110

    
111
    #create nodes table
112
    columns = []
113
    columns.append(Column('node', Integer, primary_key=True))
114
    columns.append(Column('parent', Integer,
115
                          ForeignKey('nodes.node',
116
                                     ondelete='CASCADE',
117
                                     onupdate='CASCADE'),
118
                          autoincrement=False))
119
    columns.append(Column('latest_version', Integer))
120
    columns.append(Column('path', String(2048), default='', nullable=False))
121
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
122
    Index('idx_nodes_path', nodes.c.path, unique=True)
123
    Index('idx_nodes_parent', nodes.c.parent)
124

    
125
    #create policy table
126
    columns = []
127
    columns.append(Column('node', Integer,
128
                          ForeignKey('nodes.node',
129
                                     ondelete='CASCADE',
130
                                     onupdate='CASCADE'),
131
                          primary_key=True))
132
    columns.append(Column('key', String(128), primary_key=True))
133
    columns.append(Column('value', String(256)))
134
    policy = Table('policy', metadata, *columns, mysql_engine='InnoDB')
135

    
136
    #create statistics table
137
    columns = []
138
    columns.append(Column('node', Integer,
139
                          ForeignKey('nodes.node',
140
                                     ondelete='CASCADE',
141
                                     onupdate='CASCADE'),
142
                          primary_key=True))
143
    columns.append(Column('population', Integer, nullable=False, default=0))
144
    columns.append(Column('size', BigInteger, nullable=False, default=0))
145
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
146
    columns.append(Column('cluster', Integer, nullable=False, default=0,
147
                          primary_key=True, autoincrement=False))
148
    statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
149

    
150
    #create versions table
151
    columns = []
152
    columns.append(Column('serial', Integer, primary_key=True))
153
    columns.append(Column('node', Integer,
154
                          ForeignKey('nodes.node',
155
                                     ondelete='CASCADE',
156
                                     onupdate='CASCADE')))
157
    columns.append(Column('hash', String(256)))
158
    columns.append(Column('size', BigInteger, nullable=False, default=0))
159
    columns.append(Column('type', String(256), nullable=False, default=''))
160
    columns.append(Column('source', Integer))
161
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
162
    columns.append(Column('muser', String(256), nullable=False, default=''))
163
    columns.append(Column('uuid', String(64), nullable=False, default=''))
164
    columns.append(Column('checksum', String(256), nullable=False, default=''))
165
    columns.append(Column('cluster', Integer, nullable=False, default=0))
166
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
167
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
168
    Index('idx_versions_node_uuid', versions.c.uuid)
169

    
170
    #create attributes table
171
    columns = []
172
    columns.append(Column('serial', Integer,
173
                          ForeignKey('versions.serial',
174
                                     ondelete='CASCADE',
175
                                     onupdate='CASCADE'),
176
                          primary_key=True))
177
    columns.append(Column('domain', String(256), primary_key=True))
178
    columns.append(Column('key', String(128), primary_key=True))
179
    columns.append(Column('value', String(256)))
180
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
181

    
182
    metadata.create_all(engine)
183
    return metadata.sorted_tables
184

    
185

    
186
class Node(DBWorker):
187
    """Nodes store path organization and have multiple versions.
188
       Versions store object history and have multiple attributes.
189
       Attributes store metadata.
190
    """
191

    
192
    # TODO: Provide an interface for included and excluded clusters.
193

    
194
    def __init__(self, **params):
195
        DBWorker.__init__(self, **params)
196
        try:
197
            metadata = MetaData(self.engine)
198
            self.nodes = Table('nodes', metadata, autoload=True)
199
            self.policy = Table('policy', metadata, autoload=True)
200
            self.statistics = Table('statistics', metadata, autoload=True)
201
            self.versions = Table('versions', metadata, autoload=True)
202
            self.attributes = Table('attributes', metadata, autoload=True)
203
        except NoSuchTableError:
204
            tables = create_tables(self.engine)
205
            map(lambda t: self.__setattr__(t.name, t), tables)
206

    
207
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
208
                                           self.nodes.c.parent == ROOTNODE))
209
        rp = self.conn.execute(s)
210
        r = rp.fetchone()
211
        rp.close()
212
        if not r:
213
            s = self.nodes.insert(
214
            ).values(node=ROOTNODE, parent=ROOTNODE, path='')
215
            self.conn.execute(s)
216

    
217
    def node_create(self, parent, path):
218
        """Create a new node from the given properties.
219
           Return the node identifier of the new node.
220
        """
221
        #TODO catch IntegrityError?
222
        s = self.nodes.insert().values(parent=parent, path=path)
223
        r = self.conn.execute(s)
224
        inserted_primary_key = r.inserted_primary_key[0]
225
        r.close()
226
        return inserted_primary_key
227

    
228
    def node_lookup(self, path):
229
        """Lookup the current node of the given path.
230
           Return None if the path is not found.
231
        """
232

    
233
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
234
        s = select([self.nodes.c.node], self.nodes.c.path.like(
235
            self.escape_like(path), escape='\\'))
236
        r = self.conn.execute(s)
237
        row = r.fetchone()
238
        r.close()
239
        if row:
240
            return row[0]
241
        return None
242

    
243
    def node_lookup_bulk(self, paths):
244
        """Lookup the current nodes for the given paths.
245
           Return () if the path is not found.
246
        """
247

    
248
        if not paths:
249
            return ()
250
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
251
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
252
        r = self.conn.execute(s)
253
        rows = r.fetchall()
254
        r.close()
255
        return [row[0] for row in rows]
256

    
257
    def node_get_properties(self, node):
258
        """Return the node's (parent, path).
259
           Return None if the node is not found.
260
        """
261

    
262
        s = select([self.nodes.c.parent, self.nodes.c.path])
263
        s = s.where(self.nodes.c.node == node)
264
        r = self.conn.execute(s)
265
        l = r.fetchone()
266
        r.close()
267
        return l
268

    
269
    def node_get_versions(self, node, keys=(), propnames=_propnames):
270
        """Return the properties of all versions at node.
271
           If keys is empty, return all properties in the order
272
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
273
        """
274

    
275
        s = select([self.versions.c.serial,
276
                    self.versions.c.node,
277
                    self.versions.c.hash,
278
                    self.versions.c.size,
279
                    self.versions.c.type,
280
                    self.versions.c.source,
281
                    self.versions.c.mtime,
282
                    self.versions.c.muser,
283
                    self.versions.c.uuid,
284
                    self.versions.c.checksum,
285
                    self.versions.c.cluster], self.versions.c.node == node)
286
        s = s.order_by(self.versions.c.serial)
287
        r = self.conn.execute(s)
288
        rows = r.fetchall()
289
        r.close()
290
        if not rows:
291
            return rows
292

    
293
        if not keys:
294
            return rows
295

    
296
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
297

    
298
    def node_count_children(self, node):
299
        """Return node's child count."""
300

    
301
        s = select([func.count(self.nodes.c.node)])
302
        s = s.where(and_(self.nodes.c.parent == node,
303
                         self.nodes.c.node != ROOTNODE))
304
        r = self.conn.execute(s)
305
        row = r.fetchone()
306
        r.close()
307
        return row[0]
308

    
309
    def node_purge_children(self, parent, before=inf, cluster=0):
310
        """Delete all versions with the specified
311
           parent and cluster, and return
312
           the hashes and size of versions deleted.
313
           Clears out nodes with no remaining versions.
314
        """
315
        #update statistics
316
        c1 = select([self.nodes.c.node],
317
                    self.nodes.c.parent == parent)
318
        where_clause = and_(self.versions.c.node.in_(c1),
319
                            self.versions.c.cluster == cluster)
320
        s = select([func.count(self.versions.c.serial),
321
                    func.sum(self.versions.c.size)])
322
        s = s.where(where_clause)
323
        if before != inf:
324
            s = s.where(self.versions.c.mtime <= before)
325
        r = self.conn.execute(s)
326
        row = r.fetchone()
327
        r.close()
328
        if not row:
329
            return (), 0
330
        nr, size = row[0], -row[1] if row[1] else 0
331
        mtime = time()
332
        self.statistics_update(parent, -nr, size, mtime, cluster)
333
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
334

    
335
        s = select([self.versions.c.hash, self.versions.c.serial])
336
        s = s.where(where_clause)
337
        r = self.conn.execute(s)
338
        hashes = []
339
        serials = []
340
        for row in r.fetchall():
341
            hashes += [row[0]]
342
            serials += [row[1]]
343
        r.close()
344

    
345
        #delete versions
346
        s = self.versions.delete().where(where_clause)
347
        r = self.conn.execute(s)
348
        r.close()
349

    
350
        #delete nodes
351
        s = select([self.nodes.c.node],
352
                   and_(self.nodes.c.parent == parent,
353
                        select([func.count(self.versions.c.serial)],
354
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
355
        rp = self.conn.execute(s)
356
        nodes = [r[0] for r in rp.fetchall()]
357
        rp.close()
358
        if nodes:
359
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
360
            self.conn.execute(s).close()
361

    
362
        return hashes, size, serials
363

    
364
    def node_purge(self, node, before=inf, cluster=0):
365
        """Delete all versions with the specified
366
           node and cluster, and return
367
           the hashes and size of versions deleted.
368
           Clears out the node if it has no remaining versions.
369
        """
370

    
371
        #update statistics
372
        s = select([func.count(self.versions.c.serial),
373
                    func.sum(self.versions.c.size)])
374
        where_clause = and_(self.versions.c.node == node,
375
                            self.versions.c.cluster == cluster)
376
        s = s.where(where_clause)
377
        if before != inf:
378
            s = s.where(self.versions.c.mtime <= before)
379
        r = self.conn.execute(s)
380
        row = r.fetchone()
381
        nr, size = row[0], row[1]
382
        r.close()
383
        if not nr:
384
            return (), 0
385
        mtime = time()
386
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
387

    
388
        s = select([self.versions.c.hash, self.versions.c.serial])
389
        s = s.where(where_clause)
390
        r = self.conn.execute(s)
391
        hashes = []
392
        serials = []
393
        for row in r.fetchall():
394
            hashes += [row[0]]
395
            serials += [row[1]]
396
        r.close()
397

    
398
        #delete versions
399
        s = self.versions.delete().where(where_clause)
400
        r = self.conn.execute(s)
401
        r.close()
402

    
403
        #delete nodes
404
        s = select([self.nodes.c.node],
405
                   and_(self.nodes.c.node == node,
406
                        select([func.count(self.versions.c.serial)],
407
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
408
        r = self.conn.execute(s)
409
        nodes = r.fetchall()
410
        r.close()
411
        if nodes:
412
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
413
            self.conn.execute(s).close()
414

    
415
        return hashes, size, serials
416

    
417
    def node_remove(self, node):
418
        """Remove the node specified.
419
           Return false if the node has children or is not found.
420
        """
421

    
422
        if self.node_count_children(node):
423
            return False
424

    
425
        mtime = time()
426
        s = select([func.count(self.versions.c.serial),
427
                    func.sum(self.versions.c.size),
428
                    self.versions.c.cluster])
429
        s = s.where(self.versions.c.node == node)
430
        s = s.group_by(self.versions.c.cluster)
431
        r = self.conn.execute(s)
432
        for population, size, cluster in r.fetchall():
433
            self.statistics_update_ancestors(
434
                node, -population, -size, mtime, cluster)
435
        r.close()
436

    
437
        s = self.nodes.delete().where(self.nodes.c.node == node)
438
        self.conn.execute(s).close()
439
        return True
440

    
441
    def policy_get(self, node):
442
        s = select([self.policy.c.key, self.policy.c.value],
443
                   self.policy.c.node == node)
444
        r = self.conn.execute(s)
445
        d = dict(r.fetchall())
446
        r.close()
447
        return d
448

    
449
    def policy_set(self, node, policy):
450
        #insert or replace
451
        for k, v in policy.iteritems():
452
            s = self.policy.update().where(and_(self.policy.c.node == node,
453
                                                self.policy.c.key == k))
454
            s = s.values(value=v)
455
            rp = self.conn.execute(s)
456
            rp.close()
457
            if rp.rowcount == 0:
458
                s = self.policy.insert()
459
                values = {'node': node, 'key': k, 'value': v}
460
                r = self.conn.execute(s, values)
461
                r.close()
462

    
463
    def statistics_get(self, node, cluster=0):
464
        """Return population, total size and last mtime
465
           for all versions under node that belong to the cluster.
466
        """
467

    
468
        s = select([self.statistics.c.population,
469
                    self.statistics.c.size,
470
                    self.statistics.c.mtime])
471
        s = s.where(and_(self.statistics.c.node == node,
472
                         self.statistics.c.cluster == cluster))
473
        r = self.conn.execute(s)
474
        row = r.fetchone()
475
        r.close()
476
        return row
477

    
478
    def statistics_update(self, node, population, size, mtime, cluster=0):
479
        """Update the statistics of the given node.
480
           Statistics keep track the population, total
481
           size of objects and mtime in the node's namespace.
482
           May be zero or positive or negative numbers.
483
        """
484
        s = select([self.statistics.c.population, self.statistics.c.size],
485
                   and_(self.statistics.c.node == node,
486
                        self.statistics.c.cluster == cluster))
487
        rp = self.conn.execute(s)
488
        r = rp.fetchone()
489
        rp.close()
490
        if not r:
491
            prepopulation, presize = (0, 0)
492
        else:
493
            prepopulation, presize = r
494
        population += prepopulation
495
        population = max(population, 0)
496
        size += presize
497

    
498
        #insert or replace
499
        #TODO better upsert
500
        u = self.statistics.update().where(and_(self.statistics.c.node == node,
501
                                           self.statistics.c.cluster == cluster))
502
        u = u.values(population=population, size=size, mtime=mtime)
503
        rp = self.conn.execute(u)
504
        rp.close()
505
        if rp.rowcount == 0:
506
            ins = self.statistics.insert()
507
            ins = ins.values(node=node, population=population, size=size,
508
                             mtime=mtime, cluster=cluster)
509
            self.conn.execute(ins).close()
510

    
511
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
512
        """Update the statistics of the given node's parent.
513
           Then recursively update all parents up to the root.
514
           Population is not recursive.
515
        """
516

    
517
        while True:
518
            if node == ROOTNODE:
519
                break
520
            props = self.node_get_properties(node)
521
            if props is None:
522
                break
523
            parent, path = props
524
            self.statistics_update(parent, population, size, mtime, cluster)
525
            node = parent
526
            population = 0  # Population isn't recursive
527

    
528
    def statistics_latest(self, node, before=inf, except_cluster=0):
529
        """Return population, total size and last mtime
530
           for all latest versions under node that
531
           do not belong to the cluster.
532
        """
533

    
534
        # The node.
535
        props = self.node_get_properties(node)
536
        if props is None:
537
            return None
538
        parent, path = props
539

    
540
        # The latest version.
541
        s = select([self.versions.c.serial,
542
                    self.versions.c.node,
543
                    self.versions.c.hash,
544
                    self.versions.c.size,
545
                    self.versions.c.type,
546
                    self.versions.c.source,
547
                    self.versions.c.mtime,
548
                    self.versions.c.muser,
549
                    self.versions.c.uuid,
550
                    self.versions.c.checksum,
551
                    self.versions.c.cluster])
552
        if before != inf:
553
            filtered = select([func.max(self.versions.c.serial)],
554
                              self.versions.c.node == node)
555
            filtered = filtered.where(self.versions.c.mtime < before)
556
        else:
557
            filtered = select([self.nodes.c.latest_version],
558
                              self.versions.c.node == node)
559
        s = s.where(and_(self.versions.c.cluster != except_cluster,
560
                         self.versions.c.serial == filtered))
561
        r = self.conn.execute(s)
562
        props = r.fetchone()
563
        r.close()
564
        if not props:
565
            return None
566
        mtime = props[MTIME]
567

    
568
        # First level, just under node (get population).
569
        v = self.versions.alias('v')
570
        s = select([func.count(v.c.serial),
571
                    func.sum(v.c.size),
572
                    func.max(v.c.mtime)])
573
        if before != inf:
574
            c1 = select([func.max(self.versions.c.serial)])
575
            c1 = c1.where(self.versions.c.mtime < before)
576
            c1.where(self.versions.c.node == v.c.node)
577
        else:
578
            c1 = select([self.nodes.c.latest_version])
579
            c1.where(self.nodes.c.node == v.c.node)
580
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
581
        s = s.where(and_(v.c.serial == c1,
582
                         v.c.cluster != except_cluster,
583
                         v.c.node.in_(c2)))
584
        rp = self.conn.execute(s)
585
        r = rp.fetchone()
586
        rp.close()
587
        if not r:
588
            return None
589
        count = r[0]
590
        mtime = max(mtime, r[2])
591
        if count == 0:
592
            return (0, 0, mtime)
593

    
594
        # All children (get size and mtime).
595
        # This is why the full path is stored.
596
        s = select([func.count(v.c.serial),
597
                    func.sum(v.c.size),
598
                    func.max(v.c.mtime)])
599
        if before != inf:
600
            c1 = select([func.max(self.versions.c.serial)],
601
                        self.versions.c.node == v.c.node)
602
            c1 = c1.where(self.versions.c.mtime < before)
603
        else:
604
            c1 = select([self.nodes.c.serial],
605
                        self.nodes.c.node == v.c.node)
606
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(
607
            self.escape_like(path) + '%', escape='\\'))
608
        s = s.where(and_(v.c.serial == c1,
609
                         v.c.cluster != except_cluster,
610
                         v.c.node.in_(c2)))
611
        rp = self.conn.execute(s)
612
        r = rp.fetchone()
613
        rp.close()
614
        if not r:
615
            return None
616
        size = r[1] - props[SIZE]
617
        mtime = max(mtime, r[2])
618
        return (count, size, mtime)
619

    
620
    def nodes_set_latest_version(self, node, serial):
621
        s = self.nodes.update().where(self.nodes.c.node == node)
622
        s = s.values(latest_version=serial)
623
        self.conn.execute(s).close()
624

    
625
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
626
        """Create a new version from the given properties.
627
           Return the (serial, mtime) of the new version.
628
        """
629

    
630
        mtime = time()
631
        s = self.versions.insert(
632
        ).values(node=node, hash=hash, size=size, type=type, source=source,
633
                 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
634
        serial = self.conn.execute(s).inserted_primary_key[0]
635
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
636

    
637
        self.nodes_set_latest_version(node, serial)
638

    
639
        return serial, mtime
640

    
641
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
642
        """Lookup the current version of the given node.
643
           Return a list with its properties:
644
           (serial, node, hash, size, type, source, mtime,
645
            muser, uuid, checksum, cluster)
646
           or None if the current version is not found in the given cluster.
647
        """
648

    
649
        v = self.versions.alias('v')
650
        if not all_props:
651
            s = select([v.c.serial])
652
        else:
653
            s = select([v.c.serial, v.c.node, v.c.hash,
654
                        v.c.size, v.c.type, v.c.source,
655
                        v.c.mtime, v.c.muser, v.c.uuid,
656
                        v.c.checksum, v.c.cluster])
657
        if before != inf:
658
            c = select([func.max(self.versions.c.serial)],
659
                       self.versions.c.node == node)
660
            c = c.where(self.versions.c.mtime < before)
661
        else:
662
            c = select([self.nodes.c.latest_version],
663
                       self.nodes.c.node == node)
664
        s = s.where(and_(v.c.serial == c,
665
                         v.c.cluster == cluster))
666
        r = self.conn.execute(s)
667
        props = r.fetchone()
668
        r.close()
669
        if props:
670
            return props
671
        return None
672

    
673
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
674
        """Lookup the current versions of the given nodes.
675
           Return a list with their properties:
676
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
677
        """
678
        if not nodes:
679
            return ()
680
        v = self.versions.alias('v')
681
        if not all_props:
682
            s = select([v.c.serial])
683
        else:
684
            s = select([v.c.serial, v.c.node, v.c.hash,
685
                        v.c.size, v.c.type, v.c.source,
686
                        v.c.mtime, v.c.muser, v.c.uuid,
687
                        v.c.checksum, v.c.cluster])
688
        if before != inf:
689
            c = select([func.max(self.versions.c.serial)],
690
                       self.versions.c.node.in_(nodes))
691
            c = c.where(self.versions.c.mtime < before)
692
            c = c.group_by(self.versions.c.node)
693
        else:
694
            c = select([self.nodes.c.latest_version],
695
                       self.nodes.c.node.in_(nodes))
696
        s = s.where(and_(v.c.serial.in_(c),
697
                         v.c.cluster == cluster))
698
        s = s.order_by(v.c.node)
699
        r = self.conn.execute(s)
700
        rproxy = r.fetchall()
701
        r.close()
702
        return (tuple(row.values()) for row in rproxy)
703

    
704
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
705
        """Return a sequence of values for the properties of
706
           the version specified by serial and the keys, in the order given.
707
           If keys is empty, return all properties in the order
708
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
709
        """
710

    
711
        v = self.versions.alias()
712
        s = select([v.c.serial, v.c.node, v.c.hash,
713
                    v.c.size, v.c.type, v.c.source,
714
                    v.c.mtime, v.c.muser, v.c.uuid,
715
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
716
        rp = self.conn.execute(s)
717
        r = rp.fetchone()
718
        rp.close()
719
        if r is None:
720
            return r
721

    
722
        if not keys:
723
            return r
724
        return [r[propnames[k]] for k in keys if k in propnames]
725

    
726
    def version_put_property(self, serial, key, value):
727
        """Set value for the property of version specified by key."""
728

    
729
        if key not in _propnames:
730
            return
731
        s = self.versions.update()
732
        s = s.where(self.versions.c.serial == serial)
733
        s = s.values(**{key: value})
734
        self.conn.execute(s).close()
735

    
736
    def version_recluster(self, serial, cluster):
737
        """Move the version into another cluster."""
738

    
739
        props = self.version_get_properties(serial)
740
        if not props:
741
            return
742
        node = props[NODE]
743
        size = props[SIZE]
744
        oldcluster = props[CLUSTER]
745
        if cluster == oldcluster:
746
            return
747

    
748
        mtime = time()
749
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
750
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
751

    
752
        s = self.versions.update()
753
        s = s.where(self.versions.c.serial == serial)
754
        s = s.values(cluster=cluster)
755
        self.conn.execute(s).close()
756

    
757
    def version_remove(self, serial):
758
        """Remove the serial specified."""
759

    
760
        props = self.version_get_properties(serial)
761
        if not props:
762
            return
763
        node = props[NODE]
764
        hash = props[HASH]
765
        size = props[SIZE]
766
        cluster = props[CLUSTER]
767

    
768
        mtime = time()
769
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
770

    
771
        s = self.versions.delete().where(self.versions.c.serial == serial)
772
        self.conn.execute(s).close()
773

    
774
        props = self.version_lookup(node, cluster=cluster, all_props=False)
775
        if props:
776
            self.nodes_set_latest_version(node, serial)
777

    
778
        return hash, size
779

    
780
    def attribute_get(self, serial, domain, keys=()):
781
        """Return a list of (key, value) pairs of the version specified by serial.
782
           If keys is empty, return all attributes.
783
           Othwerise, return only those specified.
784
        """
785

    
786
        if keys:
787
            attrs = self.attributes.alias()
788
            s = select([attrs.c.key, attrs.c.value])
789
            s = s.where(and_(attrs.c.key.in_(keys),
790
                             attrs.c.serial == serial,
791
                             attrs.c.domain == domain))
792
        else:
793
            attrs = self.attributes.alias()
794
            s = select([attrs.c.key, attrs.c.value])
795
            s = s.where(and_(attrs.c.serial == serial,
796
                             attrs.c.domain == domain))
797
        r = self.conn.execute(s)
798
        l = r.fetchall()
799
        r.close()
800
        return l
801

    
802
    def attribute_set(self, serial, domain, items):
803
        """Set the attributes of the version specified by serial.
804
           Receive attributes as an iterable of (key, value) pairs.
805
        """
806
        #insert or replace
807
        #TODO better upsert
808
        for k, v in items:
809
            s = self.attributes.update()
810
            s = s.where(and_(self.attributes.c.serial == serial,
811
                             self.attributes.c.domain == domain,
812
                             self.attributes.c.key == k))
813
            s = s.values(value=v)
814
            rp = self.conn.execute(s)
815
            rp.close()
816
            if rp.rowcount == 0:
817
                s = self.attributes.insert()
818
                s = s.values(serial=serial, domain=domain, key=k, value=v)
819
                self.conn.execute(s).close()
820

    
821
    def attribute_del(self, serial, domain, keys=()):
822
        """Delete attributes of the version specified by serial.
823
           If keys is empty, delete all attributes.
824
           Otherwise delete those specified.
825
        """
826

    
827
        if keys:
828
            #TODO more efficient way to do this?
829
            for key in keys:
830
                s = self.attributes.delete()
831
                s = s.where(and_(self.attributes.c.serial == serial,
832
                                 self.attributes.c.domain == domain,
833
                                 self.attributes.c.key == key))
834
                self.conn.execute(s).close()
835
        else:
836
            s = self.attributes.delete()
837
            s = s.where(and_(self.attributes.c.serial == serial,
838
                             self.attributes.c.domain == domain))
839
            self.conn.execute(s).close()
840

    
841
    def attribute_copy(self, source, dest):
842
        s = select(
843
            [dest, self.attributes.c.domain,
844
                self.attributes.c.key, self.attributes.c.value],
845
            self.attributes.c.serial == source)
846
        rp = self.conn.execute(s)
847
        attributes = rp.fetchall()
848
        rp.close()
849
        for dest, domain, k, v in attributes:
850
            #insert or replace
851
            s = self.attributes.update().where(and_(
852
                self.attributes.c.serial == dest,
853
                self.attributes.c.domain == domain,
854
                self.attributes.c.key == k))
855
            rp = self.conn.execute(s, value=v)
856
            rp.close()
857
            if rp.rowcount == 0:
858
                s = self.attributes.insert()
859
                values = {'serial': dest, 'domain': domain,
860
                          'key': k, 'value': v}
861
                self.conn.execute(s, values).close()
862

    
863
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
864
        """Return a list with all keys pairs defined
865
           for all latest versions under parent that
866
           do not belong to the cluster.
867
        """
868

    
869
        # TODO: Use another table to store before=inf results.
870
        a = self.attributes.alias('a')
871
        v = self.versions.alias('v')
872
        n = self.nodes.alias('n')
873
        s = select([a.c.key]).distinct()
874
        if before != inf:
875
            filtered = select([func.max(self.versions.c.serial)])
876
            filtered = filtered.where(self.versions.c.mtime < before)
877
            filtered = filtered.where(self.versions.c.node == v.c.node)
878
        else:
879
            filtered = select([self.nodes.c.latest_version])
880
            filtered = filtered.where(self.nodes.c.node == v.c.node)
881
        s = s.where(v.c.serial == filtered)
882
        s = s.where(v.c.cluster != except_cluster)
883
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
884
                                        self.nodes.c.parent == parent)))
885
        s = s.where(a.c.serial == v.c.serial)
886
        s = s.where(a.c.domain == domain)
887
        s = s.where(n.c.node == v.c.node)
888
        conj = []
889
        for path, match in pathq:
890
            if match == MATCH_PREFIX:
891
                conj.append(
892
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
893
            elif match == MATCH_EXACT:
894
                conj.append(n.c.path == path)
895
        if conj:
896
            s = s.where(or_(*conj))
897
        rp = self.conn.execute(s)
898
        rows = rp.fetchall()
899
        rp.close()
900
        return [r[0] for r in rows]
901

    
902
    def latest_version_list(self, parent, prefix='', delimiter=None,
903
                            start='', limit=10000, before=inf,
904
                            except_cluster=0, pathq=[], domain=None,
905
                            filterq=[], sizeq=None, all_props=False):
906
        """Return a (list of (path, serial) tuples, list of common prefixes)
907
           for the current versions of the paths with the given parent,
908
           matching the following criteria.
909

910
           The property tuple for a version is returned if all
911
           of these conditions are true:
912

913
                a. parent matches
914

915
                b. path > start
916

917
                c. path starts with prefix (and paths in pathq)
918

919
                d. version is the max up to before
920

921
                e. version is not in cluster
922

923
                f. the path does not have the delimiter occuring
924
                   after the prefix, or ends with the delimiter
925

926
                g. serial matches the attribute filter query.
927

928
                   A filter query is a comma-separated list of
929
                   terms in one of these three forms:
930

931
                   key
932
                       an attribute with this key must exist
933

934
                   !key
935
                       an attribute with this key must not exist
936

937
                   key ?op value
938
                       the attribute with this key satisfies the value
939
                       where ?op is one of ==, != <=, >=, <, >.
940

941
                h. the size is in the range set by sizeq
942

943
           The list of common prefixes includes the prefixes
944
           matching up to the first delimiter after prefix,
945
           and are reported only once, as "virtual directories".
946
           The delimiter is included in the prefixes.
947

948
           If arguments are None, then the corresponding matching rule
949
           will always match.
950

951
           Limit applies to the first list of tuples returned.
952

953
           If all_props is True, return all properties after path, not just serial.
954
        """
955

    
956
        if not start or start < prefix:
957
            start = strprevling(prefix)
958
        nextling = strnextling(prefix)
959

    
960
        v = self.versions.alias('v')
961
        n = self.nodes.alias('n')
962
        if not all_props:
963
            s = select([n.c.path, v.c.serial]).distinct()
964
        else:
965
            s = select([n.c.path,
966
                        v.c.serial, v.c.node, v.c.hash,
967
                        v.c.size, v.c.type, v.c.source,
968
                        v.c.mtime, v.c.muser, v.c.uuid,
969
                        v.c.checksum, v.c.cluster]).distinct()
970
        if before != inf:
971
            filtered = select([func.max(self.versions.c.serial)])
972
            filtered = filtered.where(self.versions.c.mtime < before)
973
        else:
974
            filtered = select([self.nodes.c.latest_version])
975
        s = s.where(
976
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
977
        s = s.where(v.c.cluster != except_cluster)
978
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
979
                                        self.nodes.c.parent == parent)))
980

    
981
        s = s.where(n.c.node == v.c.node)
982
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
983
        conj = []
984
        for path, match in pathq:
985
            if match == MATCH_PREFIX:
986
                conj.append(
987
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
988
            elif match == MATCH_EXACT:
989
                conj.append(n.c.path == path)
990
        if conj:
991
            s = s.where(or_(*conj))
992

    
993
        if sizeq and len(sizeq) == 2:
994
            if sizeq[0]:
995
                s = s.where(v.c.size >= sizeq[0])
996
            if sizeq[1]:
997
                s = s.where(v.c.size < sizeq[1])
998

    
999
        if domain and filterq:
1000
            a = self.attributes.alias('a')
1001
            included, excluded, opers = parse_filters(filterq)
1002
            if included:
1003
                subs = select([1])
1004
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1005
                subs = subs.where(a.c.domain == domain)
1006
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1007
                s = s.where(exists(subs))
1008
            if excluded:
1009
                subs = select([1])
1010
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1011
                subs = subs.where(a.c.domain == domain)
1012
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1013
                s = s.where(not_(exists(subs)))
1014
            if opers:
1015
                for k, o, val in opers:
1016
                    subs = select([1])
1017
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1018
                    subs = subs.where(a.c.domain == domain)
1019
                    subs = subs.where(
1020
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1021
                    s = s.where(exists(subs))
1022

    
1023
        s = s.order_by(n.c.path)
1024

    
1025
        if not delimiter:
1026
            s = s.limit(limit)
1027
            rp = self.conn.execute(s, start=start)
1028
            r = rp.fetchall()
1029
            rp.close()
1030
            return r, ()
1031

    
1032
        pfz = len(prefix)
1033
        dz = len(delimiter)
1034
        count = 0
1035
        prefixes = []
1036
        pappend = prefixes.append
1037
        matches = []
1038
        mappend = matches.append
1039

    
1040
        rp = self.conn.execute(s, start=start)
1041
        while True:
1042
            props = rp.fetchone()
1043
            if props is None:
1044
                break
1045
            path = props[0]
1046
            serial = props[1]
1047
            idx = path.find(delimiter, pfz)
1048

    
1049
            if idx < 0:
1050
                mappend(props)
1051
                count += 1
1052
                if count >= limit:
1053
                    break
1054
                continue
1055

    
1056
            if idx + dz == len(path):
1057
                mappend(props)
1058
                count += 1
1059
                continue  # Get one more, in case there is a path.
1060
            pf = path[:idx + dz]
1061
            pappend(pf)
1062
            if count >= limit:
1063
                break
1064

    
1065
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1066
        rp.close()
1067

    
1068
        return matches, prefixes
1069

    
1070
    def latest_uuid(self, uuid):
1071
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
1072

    
1073
        v = self.versions.alias('v')
1074
        n = self.nodes.alias('n')
1075
        s = select([n.c.path, v.c.serial])
1076
        filtered = select([func.max(self.versions.c.serial)])
1077
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
1078
        s = s.where(n.c.node == v.c.node)
1079

    
1080
        r = self.conn.execute(s)
1081
        l = r.fetchone()
1082
        r.close()
1083
        return l