Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 388ea25f

History | View | Annotate | Download (39.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41
from sqlalchemy.exc import NoSuchTableError
42

    
43
from dbworker import DBWorker
44

    
45
from pithos.backends.filter import parse_filters
46

    
47

    
48
ROOTNODE = 0
49

    
50
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
51
 CLUSTER) = range(11)
52

    
53
(MATCH_PREFIX, MATCH_EXACT) = range(2)
54

    
55
inf = float('inf')
56

    
57

    
58
def strnextling(prefix):
59
    """Return the first unicode string
60
       greater than but not starting with given prefix.
61
       strnextling('hello') -> 'hellp'
62
    """
63
    if not prefix:
64
        ## all strings start with the null string,
65
        ## therefore we have to approximate strnextling('')
66
        ## with the last unicode character supported by python
67
        ## 0x10ffff for wide (32-bit unicode) python builds
68
        ## 0x00ffff for narrow (16-bit unicode) python builds
69
        ## We will not autodetect. 0xffff is safe enough.
70
        return unichr(0xffff)
71
    s = prefix[:-1]
72
    c = ord(prefix[-1])
73
    if c >= 0xffff:
74
        raise RuntimeError
75
    s += unichr(c + 1)
76
    return s
77

    
78

    
79
def strprevling(prefix):
80
    """Return an approximation of the last unicode string
81
       less than but not starting with given prefix.
82
       strprevling(u'hello') -> u'helln\\xffff'
83
    """
84
    if not prefix:
85
        ## There is no prevling for the null string
86
        return prefix
87
    s = prefix[:-1]
88
    c = ord(prefix[-1])
89
    if c > 0:
90
        s += unichr(c - 1) + unichr(0xffff)
91
    return s
92

    
93
_propnames = {
94
    'serial': 0,
95
    'node': 1,
96
    'hash': 2,
97
    'size': 3,
98
    'type': 4,
99
    'source': 5,
100
    'mtime': 6,
101
    'muser': 7,
102
    'uuid': 8,
103
    'checksum': 9,
104
    'cluster': 10
105
}
106

    
107

    
108
def create_tables(engine):
109
    metadata = MetaData()
110

    
111
    #create nodes table
112
    columns = []
113
    columns.append(Column('node', Integer, primary_key=True))
114
    columns.append(Column('parent', Integer,
115
                          ForeignKey('nodes.node',
116
                                     ondelete='CASCADE',
117
                                     onupdate='CASCADE'),
118
                          autoincrement=False))
119
    columns.append(Column('latest_version', Integer))
120
    columns.append(Column('path', String(2048), default='', nullable=False))
121
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
122
    Index('idx_nodes_path', nodes.c.path, unique=True)
123
    Index('idx_nodes_parent', nodes.c.parent)
124

    
125
    #create policy table
126
    columns = []
127
    columns.append(Column('node', Integer,
128
                          ForeignKey('nodes.node',
129
                                     ondelete='CASCADE',
130
                                     onupdate='CASCADE'),
131
                          primary_key=True))
132
    columns.append(Column('key', String(128), primary_key=True))
133
    columns.append(Column('value', String(256)))
134
    policy = Table('policy', metadata, *columns, mysql_engine='InnoDB')
135

    
136
    #create statistics table
137
    columns = []
138
    columns.append(Column('node', Integer,
139
                          ForeignKey('nodes.node',
140
                                     ondelete='CASCADE',
141
                                     onupdate='CASCADE'),
142
                          primary_key=True))
143
    columns.append(Column('population', Integer, nullable=False, default=0))
144
    columns.append(Column('size', BigInteger, nullable=False, default=0))
145
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
146
    columns.append(Column('cluster', Integer, nullable=False, default=0,
147
                          primary_key=True, autoincrement=False))
148
    statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
149

    
150
    #create versions table
151
    columns = []
152
    columns.append(Column('serial', Integer, primary_key=True))
153
    columns.append(Column('node', Integer,
154
                          ForeignKey('nodes.node',
155
                                     ondelete='CASCADE',
156
                                     onupdate='CASCADE')))
157
    columns.append(Column('hash', String(256)))
158
    columns.append(Column('size', BigInteger, nullable=False, default=0))
159
    columns.append(Column('type', String(256), nullable=False, default=''))
160
    columns.append(Column('source', Integer))
161
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
162
    columns.append(Column('muser', String(256), nullable=False, default=''))
163
    columns.append(Column('uuid', String(64), nullable=False, default=''))
164
    columns.append(Column('checksum', String(256), nullable=False, default=''))
165
    columns.append(Column('cluster', Integer, nullable=False, default=0))
166
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
167
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
168
    Index('idx_versions_node_uuid', versions.c.uuid)
169

    
170
    #create attributes table
171
    columns = []
172
    columns.append(Column('serial', Integer,
173
                          ForeignKey('versions.serial',
174
                                     ondelete='CASCADE',
175
                                     onupdate='CASCADE'),
176
                          primary_key=True))
177
    columns.append(Column('domain', String(256), primary_key=True))
178
    columns.append(Column('key', String(128), primary_key=True))
179
    columns.append(Column('value', String(256)))
180
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
181

    
182
    metadata.create_all(engine)
183
    return metadata.sorted_tables
184

    
185

    
186
class Node(DBWorker):
187
    """Nodes store path organization and have multiple versions.
188
       Versions store object history and have multiple attributes.
189
       Attributes store metadata.
190
    """
191

    
192
    # TODO: Provide an interface for included and excluded clusters.
193

    
194
    def __init__(self, **params):
195
        DBWorker.__init__(self, **params)
196
        try:
197
            metadata = MetaData(self.engine)
198
            self.nodes = Table('nodes', metadata, autoload=True)
199
            self.policy = Table('policy', metadata, autoload=True)
200
            self.statistics = Table('statistics', metadata, autoload=True)
201
            self.versions = Table('versions', metadata, autoload=True)
202
            self.attributes = Table('attributes', metadata, autoload=True)
203
        except NoSuchTableError:
204
            tables = create_tables(self.engine)
205
            map(lambda t: self.__setattr__(t.name, t), tables)
206

    
207
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
208
                                           self.nodes.c.parent == ROOTNODE))
209
        rp = self.conn.execute(s)
210
        r = rp.fetchone()
211
        rp.close()
212
        if not r:
213
            s = self.nodes.insert(
214
            ).values(node=ROOTNODE, parent=ROOTNODE, path='')
215
            self.conn.execute(s)
216

    
217
    def node_create(self, parent, path):
218
        """Create a new node from the given properties.
219
           Return the node identifier of the new node.
220
        """
221
        #TODO catch IntegrityError?
222
        s = self.nodes.insert().values(parent=parent, path=path)
223
        r = self.conn.execute(s)
224
        inserted_primary_key = r.inserted_primary_key[0]
225
        r.close()
226
        return inserted_primary_key
227

    
228
    def node_lookup(self, path):
229
        """Lookup the current node of the given path.
230
           Return None if the path is not found.
231
        """
232

    
233
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
234
        s = select([self.nodes.c.node], self.nodes.c.path.like(
235
            self.escape_like(path), escape='\\'))
236
        r = self.conn.execute(s)
237
        row = r.fetchone()
238
        r.close()
239
        if row:
240
            return row[0]
241
        return None
242

    
243
    def node_lookup_bulk(self, paths):
244
        """Lookup the current nodes for the given paths.
245
           Return () if the path is not found.
246
        """
247

    
248
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
249
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
250
        r = self.conn.execute(s)
251
        rows = r.fetchall()
252
        r.close()
253
        return [row[0] for row in rows]
254

    
255
    def node_get_properties(self, node):
256
        """Return the node's (parent, path).
257
           Return None if the node is not found.
258
        """
259

    
260
        s = select([self.nodes.c.parent, self.nodes.c.path])
261
        s = s.where(self.nodes.c.node == node)
262
        r = self.conn.execute(s)
263
        l = r.fetchone()
264
        r.close()
265
        return l
266

    
267
    def node_get_versions(self, node, keys=(), propnames=_propnames):
268
        """Return the properties of all versions at node.
269
           If keys is empty, return all properties in the order
270
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
271
        """
272

    
273
        s = select([self.versions.c.serial,
274
                    self.versions.c.node,
275
                    self.versions.c.hash,
276
                    self.versions.c.size,
277
                    self.versions.c.type,
278
                    self.versions.c.source,
279
                    self.versions.c.mtime,
280
                    self.versions.c.muser,
281
                    self.versions.c.uuid,
282
                    self.versions.c.checksum,
283
                    self.versions.c.cluster], self.versions.c.node == node)
284
        s = s.order_by(self.versions.c.serial)
285
        r = self.conn.execute(s)
286
        rows = r.fetchall()
287
        r.close()
288
        if not rows:
289
            return rows
290

    
291
        if not keys:
292
            return rows
293

    
294
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
295

    
296
    def node_count_children(self, node):
297
        """Return node's child count."""
298

    
299
        s = select([func.count(self.nodes.c.node)])
300
        s = s.where(and_(self.nodes.c.parent == node,
301
                         self.nodes.c.node != ROOTNODE))
302
        r = self.conn.execute(s)
303
        row = r.fetchone()
304
        r.close()
305
        return row[0]
306

    
307
    def node_purge_children(self, parent, before=inf, cluster=0):
308
        """Delete all versions with the specified
309
           parent and cluster, and return
310
           the hashes and size of versions deleted.
311
           Clears out nodes with no remaining versions.
312
        """
313
        #update statistics
314
        c1 = select([self.nodes.c.node],
315
                    self.nodes.c.parent == parent)
316
        where_clause = and_(self.versions.c.node.in_(c1),
317
                            self.versions.c.cluster == cluster)
318
        s = select([func.count(self.versions.c.serial),
319
                    func.sum(self.versions.c.size)])
320
        s = s.where(where_clause)
321
        if before != inf:
322
            s = s.where(self.versions.c.mtime <= before)
323
        r = self.conn.execute(s)
324
        row = r.fetchone()
325
        r.close()
326
        if not row:
327
            return (), 0
328
        nr, size = row[0], -row[1] if row[1] else 0
329
        mtime = time()
330
        self.statistics_update(parent, -nr, size, mtime, cluster)
331
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
332

    
333
        s = select([self.versions.c.hash, self.versions.c.serial])
334
        s = s.where(where_clause)
335
        r = self.conn.execute(s)
336
        hashes = []
337
        serials = []
338
        for row in r.fetchall():
339
            hashes += [row[0]]
340
            serials += [row[1]]
341
        r.close()
342

    
343
        #delete versions
344
        s = self.versions.delete().where(where_clause)
345
        r = self.conn.execute(s)
346
        r.close()
347

    
348
        #delete nodes
349
        s = select([self.nodes.c.node],
350
                   and_(self.nodes.c.parent == parent,
351
                        select([func.count(self.versions.c.serial)],
352
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
353
        rp = self.conn.execute(s)
354
        nodes = [r[0] for r in rp.fetchall()]
355
        rp.close()
356
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
357
        self.conn.execute(s).close()
358

    
359
        return hashes, size, serials
360

    
361
    def node_purge(self, node, before=inf, cluster=0):
362
        """Delete all versions with the specified
363
           node and cluster, and return
364
           the hashes and size of versions deleted.
365
           Clears out the node if it has no remaining versions.
366
        """
367

    
368
        #update statistics
369
        s = select([func.count(self.versions.c.serial),
370
                    func.sum(self.versions.c.size)])
371
        where_clause = and_(self.versions.c.node == node,
372
                            self.versions.c.cluster == cluster)
373
        s = s.where(where_clause)
374
        if before != inf:
375
            s = s.where(self.versions.c.mtime <= before)
376
        r = self.conn.execute(s)
377
        row = r.fetchone()
378
        nr, size = row[0], row[1]
379
        r.close()
380
        if not nr:
381
            return (), 0
382
        mtime = time()
383
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
384

    
385
        s = select([self.versions.c.hash, self.versions.c.serial])
386
        s = s.where(where_clause)
387
        r = self.conn.execute(s)
388
        hashes = []
389
        serials = []
390
        for row in r.fetchall():
391
            hashes += [row[0]]
392
            serials += [row[1]]
393
        r.close()
394

    
395
        #delete versions
396
        s = self.versions.delete().where(where_clause)
397
        r = self.conn.execute(s)
398
        r.close()
399

    
400
        #delete nodes
401
        s = select([self.nodes.c.node],
402
                   and_(self.nodes.c.node == node,
403
                        select([func.count(self.versions.c.serial)],
404
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
405
        r = self.conn.execute(s)
406
        nodes = r.fetchall()
407
        r.close()
408
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
409
        self.conn.execute(s).close()
410

    
411
        return hashes, size, serials
412

    
413
    def node_remove(self, node):
414
        """Remove the node specified.
415
           Return false if the node has children or is not found.
416
        """
417

    
418
        if self.node_count_children(node):
419
            return False
420

    
421
        mtime = time()
422
        s = select([func.count(self.versions.c.serial),
423
                    func.sum(self.versions.c.size),
424
                    self.versions.c.cluster])
425
        s = s.where(self.versions.c.node == node)
426
        s = s.group_by(self.versions.c.cluster)
427
        r = self.conn.execute(s)
428
        for population, size, cluster in r.fetchall():
429
            self.statistics_update_ancestors(
430
                node, -population, -size, mtime, cluster)
431
        r.close()
432

    
433
        s = self.nodes.delete().where(self.nodes.c.node == node)
434
        self.conn.execute(s).close()
435
        return True
436

    
437
    def policy_get(self, node):
438
        s = select([self.policy.c.key, self.policy.c.value],
439
                   self.policy.c.node == node)
440
        r = self.conn.execute(s)
441
        d = dict(r.fetchall())
442
        r.close()
443
        return d
444

    
445
    def policy_set(self, node, policy):
446
        #insert or replace
447
        for k, v in policy.iteritems():
448
            s = self.policy.update().where(and_(self.policy.c.node == node,
449
                                                self.policy.c.key == k))
450
            s = s.values(value=v)
451
            rp = self.conn.execute(s)
452
            rp.close()
453
            if rp.rowcount == 0:
454
                s = self.policy.insert()
455
                values = {'node': node, 'key': k, 'value': v}
456
                r = self.conn.execute(s, values)
457
                r.close()
458

    
459
    def statistics_get(self, node, cluster=0):
460
        """Return population, total size and last mtime
461
           for all versions under node that belong to the cluster.
462
        """
463

    
464
        s = select([self.statistics.c.population,
465
                    self.statistics.c.size,
466
                    self.statistics.c.mtime])
467
        s = s.where(and_(self.statistics.c.node == node,
468
                         self.statistics.c.cluster == cluster))
469
        r = self.conn.execute(s)
470
        row = r.fetchone()
471
        r.close()
472
        return row
473

    
474
    def statistics_update(self, node, population, size, mtime, cluster=0):
475
        """Update the statistics of the given node.
476
           Statistics keep track the population, total
477
           size of objects and mtime in the node's namespace.
478
           May be zero or positive or negative numbers.
479
        """
480
        s = select([self.statistics.c.population, self.statistics.c.size],
481
                   and_(self.statistics.c.node == node,
482
                        self.statistics.c.cluster == cluster))
483
        rp = self.conn.execute(s)
484
        r = rp.fetchone()
485
        rp.close()
486
        if not r:
487
            prepopulation, presize = (0, 0)
488
        else:
489
            prepopulation, presize = r
490
        population += prepopulation
491
        size += presize
492

    
493
        #insert or replace
494
        #TODO better upsert
495
        u = self.statistics.update().where(and_(self.statistics.c.node == node,
496
                                           self.statistics.c.cluster == cluster))
497
        u = u.values(population=population, size=size, mtime=mtime)
498
        rp = self.conn.execute(u)
499
        rp.close()
500
        if rp.rowcount == 0:
501
            ins = self.statistics.insert()
502
            ins = ins.values(node=node, population=population, size=size,
503
                             mtime=mtime, cluster=cluster)
504
            self.conn.execute(ins).close()
505

    
506
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
507
        """Update the statistics of the given node's parent.
508
           Then recursively update all parents up to the root.
509
           Population is not recursive.
510
        """
511

    
512
        while True:
513
            if node == ROOTNODE:
514
                break
515
            props = self.node_get_properties(node)
516
            if props is None:
517
                break
518
            parent, path = props
519
            self.statistics_update(parent, population, size, mtime, cluster)
520
            node = parent
521
            population = 0  # Population isn't recursive
522

    
523
    def statistics_latest(self, node, before=inf, except_cluster=0):
524
        """Return population, total size and last mtime
525
           for all latest versions under node that
526
           do not belong to the cluster.
527
        """
528

    
529
        # The node.
530
        props = self.node_get_properties(node)
531
        if props is None:
532
            return None
533
        parent, path = props
534

    
535
        # The latest version.
536
        s = select([self.versions.c.serial,
537
                    self.versions.c.node,
538
                    self.versions.c.hash,
539
                    self.versions.c.size,
540
                    self.versions.c.type,
541
                    self.versions.c.source,
542
                    self.versions.c.mtime,
543
                    self.versions.c.muser,
544
                    self.versions.c.uuid,
545
                    self.versions.c.checksum,
546
                    self.versions.c.cluster])
547
        if before != inf:
548
            filtered = select([func.max(self.versions.c.serial)],
549
                              self.versions.c.node == node)
550
            filtered = filtered.where(self.versions.c.mtime < before)
551
        else:
552
            filtered = select([self.nodes.c.latest_version],
553
                              self.versions.c.node == node)
554
        s = s.where(and_(self.versions.c.cluster != except_cluster,
555
                         self.versions.c.serial == filtered))
556
        r = self.conn.execute(s)
557
        props = r.fetchone()
558
        r.close()
559
        if not props:
560
            return None
561
        mtime = props[MTIME]
562

    
563
        # First level, just under node (get population).
564
        v = self.versions.alias('v')
565
        s = select([func.count(v.c.serial),
566
                    func.sum(v.c.size),
567
                    func.max(v.c.mtime)])
568
        if before != inf:
569
            c1 = select([func.max(self.versions.c.serial)])
570
            c1 = c1.where(self.versions.c.mtime < before)
571
            c1.where(self.versions.c.node == v.c.node)
572
        else:
573
            c1 = select([self.nodes.c.latest_version])
574
            c1.where(self.nodes.c.node == v.c.node)
575
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
576
        s = s.where(and_(v.c.serial == c1,
577
                         v.c.cluster != except_cluster,
578
                         v.c.node.in_(c2)))
579
        rp = self.conn.execute(s)
580
        r = rp.fetchone()
581
        rp.close()
582
        if not r:
583
            return None
584
        count = r[0]
585
        mtime = max(mtime, r[2])
586
        if count == 0:
587
            return (0, 0, mtime)
588

    
589
        # All children (get size and mtime).
590
        # This is why the full path is stored.
591
        s = select([func.count(v.c.serial),
592
                    func.sum(v.c.size),
593
                    func.max(v.c.mtime)])
594
        if before != inf:
595
            c1 = select([func.max(self.versions.c.serial)],
596
                        self.versions.c.node == v.c.node)
597
            c1 = c1.where(self.versions.c.mtime < before)
598
        else:
599
            c1 = select([self.nodes.c.serial],
600
                        self.nodes.c.node == v.c.node)
601
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(
602
            self.escape_like(path) + '%', escape='\\'))
603
        s = s.where(and_(v.c.serial == c1,
604
                         v.c.cluster != except_cluster,
605
                         v.c.node.in_(c2)))
606
        rp = self.conn.execute(s)
607
        r = rp.fetchone()
608
        rp.close()
609
        if not r:
610
            return None
611
        size = r[1] - props[SIZE]
612
        mtime = max(mtime, r[2])
613
        return (count, size, mtime)
614

    
615
    def nodes_set_latest_version(self, node, serial):
616
        s = self.nodes.update().where(self.nodes.c.node == node)
617
        s = s.values(latest_version=serial)
618
        self.conn.execute(s).close()
619

    
620
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
621
        """Create a new version from the given properties.
622
           Return the (serial, mtime) of the new version.
623
        """
624

    
625
        mtime = time()
626
        s = self.versions.insert(
627
        ).values(node=node, hash=hash, size=size, type=type, source=source,
628
                 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
629
        serial = self.conn.execute(s).inserted_primary_key[0]
630
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
631

    
632
        self.nodes_set_latest_version(node, serial)
633

    
634
        return serial, mtime
635

    
636
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
637
        """Lookup the current version of the given node.
638
           Return a list with its properties:
639
           (serial, node, hash, size, type, source, mtime,
640
            muser, uuid, checksum, cluster)
641
           or None if the current version is not found in the given cluster.
642
        """
643

    
644
        v = self.versions.alias('v')
645
        if not all_props:
646
            s = select([v.c.serial])
647
        else:
648
            s = select([v.c.serial, v.c.node, v.c.hash,
649
                        v.c.size, v.c.type, v.c.source,
650
                        v.c.mtime, v.c.muser, v.c.uuid,
651
                        v.c.checksum, v.c.cluster])
652
        if before != inf:
653
            c = select([func.max(self.versions.c.serial)],
654
                       self.versions.c.node == node)
655
            c = c.where(self.versions.c.mtime < before)
656
        else:
657
            c = select([self.nodes.c.latest_version],
658
                       self.nodes.c.node == node)
659
        s = s.where(and_(v.c.serial == c,
660
                         v.c.cluster == cluster))
661
        r = self.conn.execute(s)
662
        props = r.fetchone()
663
        r.close()
664
        if props:
665
            return props
666
        return None
667

    
668
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
669
        """Lookup the current versions of the given nodes.
670
           Return a list with their properties:
671
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
672
        """
673
        if not nodes:
674
            return ()
675
        v = self.versions.alias('v')
676
        if not all_props:
677
            s = select([v.c.serial])
678
        else:
679
            s = select([v.c.serial, v.c.node, v.c.hash,
680
                        v.c.size, v.c.type, v.c.source,
681
                        v.c.mtime, v.c.muser, v.c.uuid,
682
                        v.c.checksum, v.c.cluster])
683
        if before != inf:
684
            c = select([func.max(self.versions.c.serial)],
685
                       self.versions.c.node.in_(nodes))
686
            c = c.where(self.versions.c.mtime < before)
687
            c = c.group_by(self.versions.c.node)
688
        else:
689
            c = select([self.nodes.c.latest_version],
690
                       self.nodes.c.node.in_(nodes))
691
        s = s.where(and_(v.c.serial.in_(c),
692
                         v.c.cluster == cluster))
693
        s = s.order_by(v.c.node)
694
        r = self.conn.execute(s)
695
        rproxy = r.fetchall()
696
        r.close()
697
        return (tuple(row.values()) for row in rproxy)
698

    
699
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
700
        """Return a sequence of values for the properties of
701
           the version specified by serial and the keys, in the order given.
702
           If keys is empty, return all properties in the order
703
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
704
        """
705

    
706
        v = self.versions.alias()
707
        s = select([v.c.serial, v.c.node, v.c.hash,
708
                    v.c.size, v.c.type, v.c.source,
709
                    v.c.mtime, v.c.muser, v.c.uuid,
710
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
711
        rp = self.conn.execute(s)
712
        r = rp.fetchone()
713
        rp.close()
714
        if r is None:
715
            return r
716

    
717
        if not keys:
718
            return r
719
        return [r[propnames[k]] for k in keys if k in propnames]
720

    
721
    def version_put_property(self, serial, key, value):
722
        """Set value for the property of version specified by key."""
723

    
724
        if key not in _propnames:
725
            return
726
        s = self.versions.update()
727
        s = s.where(self.versions.c.serial == serial)
728
        s = s.values(**{key: value})
729
        self.conn.execute(s).close()
730

    
731
    def version_recluster(self, serial, cluster):
732
        """Move the version into another cluster."""
733

    
734
        props = self.version_get_properties(serial)
735
        if not props:
736
            return
737
        node = props[NODE]
738
        size = props[SIZE]
739
        oldcluster = props[CLUSTER]
740
        if cluster == oldcluster:
741
            return
742

    
743
        mtime = time()
744
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
745
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
746

    
747
        s = self.versions.update()
748
        s = s.where(self.versions.c.serial == serial)
749
        s = s.values(cluster=cluster)
750
        self.conn.execute(s).close()
751

    
752
    def version_remove(self, serial):
753
        """Remove the serial specified."""
754

    
755
        props = self.version_get_properties(serial)
756
        if not props:
757
            return
758
        node = props[NODE]
759
        hash = props[HASH]
760
        size = props[SIZE]
761
        cluster = props[CLUSTER]
762

    
763
        mtime = time()
764
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
765

    
766
        s = self.versions.delete().where(self.versions.c.serial == serial)
767
        self.conn.execute(s).close()
768

    
769
        props = self.version_lookup(node, cluster=cluster, all_props=False)
770
        if props:
771
            self.nodes_set_latest_version(v.node, serial)
772

    
773
        return hash, size
774

    
775
    def attribute_get(self, serial, domain, keys=()):
776
        """Return a list of (key, value) pairs of the version specified by serial.
777
           If keys is empty, return all attributes.
778
           Othwerise, return only those specified.
779
        """
780

    
781
        if keys:
782
            attrs = self.attributes.alias()
783
            s = select([attrs.c.key, attrs.c.value])
784
            s = s.where(and_(attrs.c.key.in_(keys),
785
                             attrs.c.serial == serial,
786
                             attrs.c.domain == domain))
787
        else:
788
            attrs = self.attributes.alias()
789
            s = select([attrs.c.key, attrs.c.value])
790
            s = s.where(and_(attrs.c.serial == serial,
791
                             attrs.c.domain == domain))
792
        r = self.conn.execute(s)
793
        l = r.fetchall()
794
        r.close()
795
        return l
796

    
797
    def attribute_set(self, serial, domain, items):
798
        """Set the attributes of the version specified by serial.
799
           Receive attributes as an iterable of (key, value) pairs.
800
        """
801
        #insert or replace
802
        #TODO better upsert
803
        for k, v in items:
804
            s = self.attributes.update()
805
            s = s.where(and_(self.attributes.c.serial == serial,
806
                             self.attributes.c.domain == domain,
807
                             self.attributes.c.key == k))
808
            s = s.values(value=v)
809
            rp = self.conn.execute(s)
810
            rp.close()
811
            if rp.rowcount == 0:
812
                s = self.attributes.insert()
813
                s = s.values(serial=serial, domain=domain, key=k, value=v)
814
                self.conn.execute(s).close()
815

    
816
    def attribute_del(self, serial, domain, keys=()):
817
        """Delete attributes of the version specified by serial.
818
           If keys is empty, delete all attributes.
819
           Otherwise delete those specified.
820
        """
821

    
822
        if keys:
823
            #TODO more efficient way to do this?
824
            for key in keys:
825
                s = self.attributes.delete()
826
                s = s.where(and_(self.attributes.c.serial == serial,
827
                                 self.attributes.c.domain == domain,
828
                                 self.attributes.c.key == key))
829
                self.conn.execute(s).close()
830
        else:
831
            s = self.attributes.delete()
832
            s = s.where(and_(self.attributes.c.serial == serial,
833
                             self.attributes.c.domain == domain))
834
            self.conn.execute(s).close()
835

    
836
    def attribute_copy(self, source, dest):
837
        s = select(
838
            [dest, self.attributes.c.domain,
839
                self.attributes.c.key, self.attributes.c.value],
840
            self.attributes.c.serial == source)
841
        rp = self.conn.execute(s)
842
        attributes = rp.fetchall()
843
        rp.close()
844
        for dest, domain, k, v in attributes:
845
            #insert or replace
846
            s = self.attributes.update().where(and_(
847
                self.attributes.c.serial == dest,
848
                self.attributes.c.domain == domain,
849
                self.attributes.c.key == k))
850
            rp = self.conn.execute(s, value=v)
851
            rp.close()
852
            if rp.rowcount == 0:
853
                s = self.attributes.insert()
854
                values = {'serial': dest, 'domain': domain,
855
                          'key': k, 'value': v}
856
                self.conn.execute(s, values).close()
857

    
858
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
859
        """Return a list with all keys pairs defined
860
           for all latest versions under parent that
861
           do not belong to the cluster.
862
        """
863

    
864
        # TODO: Use another table to store before=inf results.
865
        a = self.attributes.alias('a')
866
        v = self.versions.alias('v')
867
        n = self.nodes.alias('n')
868
        s = select([a.c.key]).distinct()
869
        if before != inf:
870
            filtered = select([func.max(self.versions.c.serial)])
871
            filtered = filtered.where(self.versions.c.mtime < before)
872
            filtered = filtered.where(self.versions.c.node == v.c.node)
873
        else:
874
            filtered = select([self.nodes.c.latest_version])
875
            filtered = filtered.where(self.nodes.c.node == v.c.node)
876
        s = s.where(v.c.serial == filtered)
877
        s = s.where(v.c.cluster != except_cluster)
878
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
879
                                        self.nodes.c.parent == parent)))
880
        s = s.where(a.c.serial == v.c.serial)
881
        s = s.where(a.c.domain == domain)
882
        s = s.where(n.c.node == v.c.node)
883
        conj = []
884
        for path, match in pathq:
885
            if match == MATCH_PREFIX:
886
                conj.append(
887
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
888
            elif match == MATCH_EXACT:
889
                conj.append(n.c.path == path)
890
        if conj:
891
            s = s.where(or_(*conj))
892
        rp = self.conn.execute(s)
893
        rows = rp.fetchall()
894
        rp.close()
895
        return [r[0] for r in rows]
896

    
897
    def latest_version_list(self, parent, prefix='', delimiter=None,
898
                            start='', limit=10000, before=inf,
899
                            except_cluster=0, pathq=[], domain=None,
900
                            filterq=[], sizeq=None, all_props=False):
901
        """Return a (list of (path, serial) tuples, list of common prefixes)
902
           for the current versions of the paths with the given parent,
903
           matching the following criteria.
904

905
           The property tuple for a version is returned if all
906
           of these conditions are true:
907

908
                a. parent matches
909

910
                b. path > start
911

912
                c. path starts with prefix (and paths in pathq)
913

914
                d. version is the max up to before
915

916
                e. version is not in cluster
917

918
                f. the path does not have the delimiter occuring
919
                   after the prefix, or ends with the delimiter
920

921
                g. serial matches the attribute filter query.
922

923
                   A filter query is a comma-separated list of
924
                   terms in one of these three forms:
925

926
                   key
927
                       an attribute with this key must exist
928

929
                   !key
930
                       an attribute with this key must not exist
931

932
                   key ?op value
933
                       the attribute with this key satisfies the value
934
                       where ?op is one of ==, != <=, >=, <, >.
935

936
                h. the size is in the range set by sizeq
937

938
           The list of common prefixes includes the prefixes
939
           matching up to the first delimiter after prefix,
940
           and are reported only once, as "virtual directories".
941
           The delimiter is included in the prefixes.
942

943
           If arguments are None, then the corresponding matching rule
944
           will always match.
945

946
           Limit applies to the first list of tuples returned.
947

948
           If all_props is True, return all properties after path, not just serial.
949
        """
950

    
951
        if not start or start < prefix:
952
            start = strprevling(prefix)
953
        nextling = strnextling(prefix)
954

    
955
        v = self.versions.alias('v')
956
        n = self.nodes.alias('n')
957
        if not all_props:
958
            s = select([n.c.path, v.c.serial]).distinct()
959
        else:
960
            s = select([n.c.path,
961
                        v.c.serial, v.c.node, v.c.hash,
962
                        v.c.size, v.c.type, v.c.source,
963
                        v.c.mtime, v.c.muser, v.c.uuid,
964
                        v.c.checksum, v.c.cluster]).distinct()
965
        if before != inf:
966
            filtered = select([func.max(self.versions.c.serial)])
967
            filtered = filtered.where(self.versions.c.mtime < before)
968
        else:
969
            filtered = select([self.nodes.c.latest_version])
970
        s = s.where(
971
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
972
        s = s.where(v.c.cluster != except_cluster)
973
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
974
                                        self.nodes.c.parent == parent)))
975

    
976
        s = s.where(n.c.node == v.c.node)
977
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
978
        conj = []
979
        for path, match in pathq:
980
            if match == MATCH_PREFIX:
981
                conj.append(
982
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
983
            elif match == MATCH_EXACT:
984
                conj.append(n.c.path == path)
985
        if conj:
986
            s = s.where(or_(*conj))
987

    
988
        if sizeq and len(sizeq) == 2:
989
            if sizeq[0]:
990
                s = s.where(v.c.size >= sizeq[0])
991
            if sizeq[1]:
992
                s = s.where(v.c.size < sizeq[1])
993

    
994
        if domain and filterq:
995
            a = self.attributes.alias('a')
996
            included, excluded, opers = parse_filters(filterq)
997
            if included:
998
                subs = select([1])
999
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1000
                subs = subs.where(a.c.domain == domain)
1001
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1002
                s = s.where(exists(subs))
1003
            if excluded:
1004
                subs = select([1])
1005
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1006
                subs = subs.where(a.c.domain == domain)
1007
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1008
                s = s.where(not_(exists(subs)))
1009
            if opers:
1010
                for k, o, val in opers:
1011
                    subs = select([1])
1012
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1013
                    subs = subs.where(a.c.domain == domain)
1014
                    subs = subs.where(
1015
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1016
                    s = s.where(exists(subs))
1017

    
1018
        s = s.order_by(n.c.path)
1019

    
1020
        if not delimiter:
1021
            s = s.limit(limit)
1022
            rp = self.conn.execute(s, start=start)
1023
            r = rp.fetchall()
1024
            rp.close()
1025
            return r, ()
1026

    
1027
        pfz = len(prefix)
1028
        dz = len(delimiter)
1029
        count = 0
1030
        prefixes = []
1031
        pappend = prefixes.append
1032
        matches = []
1033
        mappend = matches.append
1034

    
1035
        rp = self.conn.execute(s, start=start)
1036
        while True:
1037
            props = rp.fetchone()
1038
            if props is None:
1039
                break
1040
            path = props[0]
1041
            serial = props[1]
1042
            idx = path.find(delimiter, pfz)
1043

    
1044
            if idx < 0:
1045
                mappend(props)
1046
                count += 1
1047
                if count >= limit:
1048
                    break
1049
                continue
1050

    
1051
            if idx + dz == len(path):
1052
                mappend(props)
1053
                count += 1
1054
                continue  # Get one more, in case there is a path.
1055
            pf = path[:idx + dz]
1056
            pappend(pf)
1057
            if count >= limit:
1058
                break
1059

    
1060
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1061
        rp.close()
1062

    
1063
        return matches, prefixes
1064

    
1065
    def latest_uuid(self, uuid):
1066
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
1067

    
1068
        v = self.versions.alias('v')
1069
        n = self.nodes.alias('n')
1070
        s = select([n.c.path, v.c.serial])
1071
        filtered = select([func.max(self.versions.c.serial)])
1072
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
1073
        s = s.where(n.c.node == v.c.node)
1074

    
1075
        r = self.conn.execute(s)
1076
        l = r.fetchone()
1077
        r.close()
1078
        return l