Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ eecad161

History | View | Annotate | Download (40 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36
from sqlalchemy.types import Text
37
from sqlalchemy.schema import Index, Sequence
38
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text, exists
39
from sqlalchemy.ext.compiler import compiles
40
from sqlalchemy.engine.reflection import Inspector
41
from sqlalchemy.exc import NoSuchTableError
42

    
43
from dbworker import DBWorker
44

    
45
from pithos.backends.filter import parse_filters
46

    
47

    
48
ROOTNODE = 0
49

    
50
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
51
 CLUSTER) = range(11)
52

    
53
(MATCH_PREFIX, MATCH_EXACT) = range(2)
54

    
55
inf = float('inf')
56

    
57

    
58
def strnextling(prefix):
59
    """Return the first unicode string
60
       greater than but not starting with given prefix.
61
       strnextling('hello') -> 'hellp'
62
    """
63
    if not prefix:
64
        ## all strings start with the null string,
65
        ## therefore we have to approximate strnextling('')
66
        ## with the last unicode character supported by python
67
        ## 0x10ffff for wide (32-bit unicode) python builds
68
        ## 0x00ffff for narrow (16-bit unicode) python builds
69
        ## We will not autodetect. 0xffff is safe enough.
70
        return unichr(0xffff)
71
    s = prefix[:-1]
72
    c = ord(prefix[-1])
73
    if c >= 0xffff:
74
        raise RuntimeError
75
    s += unichr(c + 1)
76
    return s
77

    
78

    
79
def strprevling(prefix):
80
    """Return an approximation of the last unicode string
81
       less than but not starting with given prefix.
82
       strprevling(u'hello') -> u'helln\\xffff'
83
    """
84
    if not prefix:
85
        ## There is no prevling for the null string
86
        return prefix
87
    s = prefix[:-1]
88
    c = ord(prefix[-1])
89
    if c > 0:
90
        s += unichr(c - 1) + unichr(0xffff)
91
    return s
92

    
93
_propnames = {
94
    'serial': 0,
95
    'node': 1,
96
    'hash': 2,
97
    'size': 3,
98
    'type': 4,
99
    'source': 5,
100
    'mtime': 6,
101
    'muser': 7,
102
    'uuid': 8,
103
    'checksum': 9,
104
    'cluster': 10
105
}
106

    
107

    
108
def create_tables(engine):
109
    metadata = MetaData()
110

    
111
    #create nodes table
112
    columns = []
113
    columns.append(Column('node', Integer, primary_key=True))
114
    columns.append(Column('parent', Integer,
115
                          ForeignKey('nodes.node',
116
                                     ondelete='CASCADE',
117
                                     onupdate='CASCADE'),
118
                          autoincrement=False))
119
    columns.append(Column('latest_version', Integer))
120
    columns.append(Column('path', String(2048), default='', nullable=False))
121
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
122
    Index('idx_nodes_path', nodes.c.path, unique=True)
123
    Index('idx_nodes_parent', nodes.c.parent)
124

    
125
    #create policy table
126
    columns = []
127
    columns.append(Column('node', Integer,
128
                          ForeignKey('nodes.node',
129
                                     ondelete='CASCADE',
130
                                     onupdate='CASCADE'),
131
                          primary_key=True))
132
    columns.append(Column('key', String(128), primary_key=True))
133
    columns.append(Column('value', String(256)))
134
    policy = Table('policy', metadata, *columns, mysql_engine='InnoDB')
135

    
136
    #create statistics table
137
    columns = []
138
    columns.append(Column('node', Integer,
139
                          ForeignKey('nodes.node',
140
                                     ondelete='CASCADE',
141
                                     onupdate='CASCADE'),
142
                          primary_key=True))
143
    columns.append(Column('population', Integer, nullable=False, default=0))
144
    columns.append(Column('size', BigInteger, nullable=False, default=0))
145
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
146
    columns.append(Column('cluster', Integer, nullable=False, default=0,
147
                          primary_key=True, autoincrement=False))
148
    statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
149

    
150
    #create versions table
151
    columns = []
152
    columns.append(Column('serial', Integer, primary_key=True))
153
    columns.append(Column('node', Integer,
154
                          ForeignKey('nodes.node',
155
                                     ondelete='CASCADE',
156
                                     onupdate='CASCADE')))
157
    columns.append(Column('hash', String(256)))
158
    columns.append(Column('size', BigInteger, nullable=False, default=0))
159
    columns.append(Column('type', String(256), nullable=False, default=''))
160
    columns.append(Column('source', Integer))
161
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
162
    columns.append(Column('muser', String(256), nullable=False, default=''))
163
    columns.append(Column('uuid', String(64), nullable=False, default=''))
164
    columns.append(Column('checksum', String(256), nullable=False, default=''))
165
    columns.append(Column('cluster', Integer, nullable=False, default=0))
166
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
167
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
168
    Index('idx_versions_node_uuid', versions.c.uuid)
169

    
170
    #create attributes table
171
    columns = []
172
    columns.append(Column('serial', Integer,
173
                          ForeignKey('versions.serial',
174
                                     ondelete='CASCADE',
175
                                     onupdate='CASCADE'),
176
                          primary_key=True))
177
    columns.append(Column('domain', String(256), primary_key=True))
178
    columns.append(Column('key', String(128), primary_key=True))
179
    columns.append(Column('value', String(256)))
180
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
181

    
182
    metadata.create_all(engine)
183
    return metadata.sorted_tables
184

    
185

    
186
class Node(DBWorker):
187
    """Nodes store path organization and have multiple versions.
188
       Versions store object history and have multiple attributes.
189
       Attributes store metadata.
190
    """
191

    
192
    # TODO: Provide an interface for included and excluded clusters.
193

    
194
    def __init__(self, **params):
195
        DBWorker.__init__(self, **params)
196
        try:
197
            metadata = MetaData(self.engine)
198
            self.nodes = Table('nodes', metadata, autoload=True)
199
            self.policy = Table('policy', metadata, autoload=True)
200
            self.statistics = Table('statistics', metadata, autoload=True)
201
            self.versions = Table('versions', metadata, autoload=True)
202
            self.attributes = Table('attributes', metadata, autoload=True)
203
        except NoSuchTableError:
204
            tables = create_tables(self.engine)
205
            map(lambda t: self.__setattr__(t.name, t), tables)
206

    
207
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
208
                                           self.nodes.c.parent == ROOTNODE))
209
        rp = self.conn.execute(s)
210
        r = rp.fetchone()
211
        rp.close()
212
        if not r:
213
            s = self.nodes.insert(
214
            ).values(node=ROOTNODE, parent=ROOTNODE, path='')
215
            self.conn.execute(s)
216

    
217
    def node_create(self, parent, path):
218
        """Create a new node from the given properties.
219
           Return the node identifier of the new node.
220
        """
221
        #TODO catch IntegrityError?
222
        s = self.nodes.insert().values(parent=parent, path=path)
223
        r = self.conn.execute(s)
224
        inserted_primary_key = r.inserted_primary_key[0]
225
        r.close()
226
        return inserted_primary_key
227

    
228
    def node_lookup(self, path):
229
        """Lookup the current node of the given path.
230
           Return None if the path is not found.
231
        """
232

    
233
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
234
        s = select([self.nodes.c.node], self.nodes.c.path.like(
235
            self.escape_like(path), escape='\\'))
236
        r = self.conn.execute(s)
237
        row = r.fetchone()
238
        r.close()
239
        if row:
240
            return row[0]
241
        return None
242

    
243
    def node_lookup_bulk(self, paths):
244
        """Lookup the current nodes for the given paths.
245
           Return () if the path is not found.
246
        """
247

    
248
        if not paths:
249
            return ()
250
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
251
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
252
        r = self.conn.execute(s)
253
        rows = r.fetchall()
254
        r.close()
255
        return [row[0] for row in rows]
256

    
257
    def node_get_properties(self, node):
258
        """Return the node's (parent, path).
259
           Return None if the node is not found.
260
        """
261

    
262
        s = select([self.nodes.c.parent, self.nodes.c.path])
263
        s = s.where(self.nodes.c.node == node)
264
        r = self.conn.execute(s)
265
        l = r.fetchone()
266
        r.close()
267
        return l
268

    
269
    def node_get_versions(self, node, keys=(), propnames=_propnames):
270
        """Return the properties of all versions at node.
271
           If keys is empty, return all properties in the order
272
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
273
        """
274

    
275
        s = select([self.versions.c.serial,
276
                    self.versions.c.node,
277
                    self.versions.c.hash,
278
                    self.versions.c.size,
279
                    self.versions.c.type,
280
                    self.versions.c.source,
281
                    self.versions.c.mtime,
282
                    self.versions.c.muser,
283
                    self.versions.c.uuid,
284
                    self.versions.c.checksum,
285
                    self.versions.c.cluster], self.versions.c.node == node)
286
        s = s.order_by(self.versions.c.serial)
287
        r = self.conn.execute(s)
288
        rows = r.fetchall()
289
        r.close()
290
        if not rows:
291
            return rows
292

    
293
        if not keys:
294
            return rows
295

    
296
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
297

    
298
    def node_count_children(self, node):
299
        """Return node's child count."""
300

    
301
        s = select([func.count(self.nodes.c.node)])
302
        s = s.where(and_(self.nodes.c.parent == node,
303
                         self.nodes.c.node != ROOTNODE))
304
        r = self.conn.execute(s)
305
        row = r.fetchone()
306
        r.close()
307
        return row[0]
308

    
309
    def node_purge_children(self, parent, before=inf, cluster=0):
310
        """Delete all versions with the specified
311
           parent and cluster, and return
312
           the hashes and size of versions deleted.
313
           Clears out nodes with no remaining versions.
314
        """
315
        #update statistics
316
        c1 = select([self.nodes.c.node],
317
                    self.nodes.c.parent == parent)
318
        where_clause = and_(self.versions.c.node.in_(c1),
319
                            self.versions.c.cluster == cluster)
320
        s = select([func.count(self.versions.c.serial),
321
                    func.sum(self.versions.c.size)])
322
        s = s.where(where_clause)
323
        if before != inf:
324
            s = s.where(self.versions.c.mtime <= before)
325
        r = self.conn.execute(s)
326
        row = r.fetchone()
327
        r.close()
328
        if not row:
329
            return (), 0
330
        nr, size = row[0], -row[1] if row[1] else 0
331
        mtime = time()
332
        self.statistics_update(parent, -nr, size, mtime, cluster)
333
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
334

    
335
        s = select([self.versions.c.hash, self.versions.c.serial])
336
        s = s.where(where_clause)
337
        r = self.conn.execute(s)
338
        hashes = []
339
        serials = []
340
        for row in r.fetchall():
341
            hashes += [row[0]]
342
            serials += [row[1]]
343
        r.close()
344

    
345
        #delete versions
346
        s = self.versions.delete().where(where_clause)
347
        r = self.conn.execute(s)
348
        r.close()
349

    
350
        #delete nodes
351
        s = select([self.nodes.c.node],
352
                   and_(self.nodes.c.parent == parent,
353
                        select([func.count(self.versions.c.serial)],
354
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
355
        rp = self.conn.execute(s)
356
        nodes = [r[0] for r in rp.fetchall()]
357
        rp.close()
358
        if nodes:
359
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
360
            self.conn.execute(s).close()
361

    
362
        return hashes, size, serials
363

    
364
    def node_purge(self, node, before=inf, cluster=0):
365
        """Delete all versions with the specified
366
           node and cluster, and return
367
           the hashes and size of versions deleted.
368
           Clears out the node if it has no remaining versions.
369
        """
370

    
371
        #update statistics
372
        s = select([func.count(self.versions.c.serial),
373
                    func.sum(self.versions.c.size)])
374
        where_clause = and_(self.versions.c.node == node,
375
                            self.versions.c.cluster == cluster)
376
        s = s.where(where_clause)
377
        if before != inf:
378
            s = s.where(self.versions.c.mtime <= before)
379
        r = self.conn.execute(s)
380
        row = r.fetchone()
381
        nr, size = row[0], row[1]
382
        r.close()
383
        if not nr:
384
            return (), 0
385
        mtime = time()
386
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
387

    
388
        s = select([self.versions.c.hash, self.versions.c.serial])
389
        s = s.where(where_clause)
390
        r = self.conn.execute(s)
391
        hashes = []
392
        serials = []
393
        for row in r.fetchall():
394
            hashes += [row[0]]
395
            serials += [row[1]]
396
        r.close()
397

    
398
        #delete versions
399
        s = self.versions.delete().where(where_clause)
400
        r = self.conn.execute(s)
401
        r.close()
402

    
403
        #delete nodes
404
        s = select([self.nodes.c.node],
405
                   and_(self.nodes.c.node == node,
406
                        select([func.count(self.versions.c.serial)],
407
                               self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
408
        r = self.conn.execute(s)
409
        nodes = r.fetchall()
410
        r.close()
411
        if nodes:
412
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
413
            self.conn.execute(s).close()
414

    
415
        return hashes, size, serials
416

    
417
    def node_remove(self, node):
418
        """Remove the node specified.
419
           Return false if the node has children or is not found.
420
        """
421

    
422
        if self.node_count_children(node):
423
            return False
424

    
425
        mtime = time()
426
        s = select([func.count(self.versions.c.serial),
427
                    func.sum(self.versions.c.size),
428
                    self.versions.c.cluster])
429
        s = s.where(self.versions.c.node == node)
430
        s = s.group_by(self.versions.c.cluster)
431
        r = self.conn.execute(s)
432
        for population, size, cluster in r.fetchall():
433
            self.statistics_update_ancestors(
434
                node, -population, -size, mtime, cluster)
435
        r.close()
436

    
437
        s = self.nodes.delete().where(self.nodes.c.node == node)
438
        self.conn.execute(s).close()
439
        return True
440

    
441
    def policy_get(self, node):
442
        s = select([self.policy.c.key, self.policy.c.value],
443
                   self.policy.c.node == node)
444
        r = self.conn.execute(s)
445
        d = dict(r.fetchall())
446
        r.close()
447
        return d
448

    
449
    def policy_set(self, node, policy):
450
        #insert or replace
451
        for k, v in policy.iteritems():
452
            s = self.policy.update().where(and_(self.policy.c.node == node,
453
                                                self.policy.c.key == k))
454
            s = s.values(value=v)
455
            rp = self.conn.execute(s)
456
            rp.close()
457
            if rp.rowcount == 0:
458
                s = self.policy.insert()
459
                values = {'node': node, 'key': k, 'value': v}
460
                r = self.conn.execute(s, values)
461
                r.close()
462

    
463
    def statistics_get(self, node, cluster=0):
464
        """Return population, total size and last mtime
465
           for all versions under node that belong to the cluster.
466
        """
467

    
468
        s = select([self.statistics.c.population,
469
                    self.statistics.c.size,
470
                    self.statistics.c.mtime])
471
        s = s.where(and_(self.statistics.c.node == node,
472
                         self.statistics.c.cluster == cluster))
473
        r = self.conn.execute(s)
474
        row = r.fetchone()
475
        r.close()
476
        return row
477

    
478
    def statistics_update(self, node, population, size, mtime, cluster=0):
479
        """Update the statistics of the given node.
480
           Statistics keep track the population, total
481
           size of objects and mtime in the node's namespace.
482
           May be zero or positive or negative numbers.
483
        """
484
        s = select([self.statistics.c.population, self.statistics.c.size],
485
                   and_(self.statistics.c.node == node,
486
                        self.statistics.c.cluster == cluster))
487
        rp = self.conn.execute(s)
488
        r = rp.fetchone()
489
        rp.close()
490
        if not r:
491
            prepopulation, presize = (0, 0)
492
        else:
493
            prepopulation, presize = r
494
        population += prepopulation
495
        size += presize
496

    
497
        #insert or replace
498
        #TODO better upsert
499
        u = self.statistics.update().where(and_(self.statistics.c.node == node,
500
                                           self.statistics.c.cluster == cluster))
501
        u = u.values(population=population, size=size, mtime=mtime)
502
        rp = self.conn.execute(u)
503
        rp.close()
504
        if rp.rowcount == 0:
505
            ins = self.statistics.insert()
506
            ins = ins.values(node=node, population=population, size=size,
507
                             mtime=mtime, cluster=cluster)
508
            self.conn.execute(ins).close()
509

    
510
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
511
        """Update the statistics of the given node's parent.
512
           Then recursively update all parents up to the root.
513
           Population is not recursive.
514
        """
515

    
516
        while True:
517
            if node == ROOTNODE:
518
                break
519
            props = self.node_get_properties(node)
520
            if props is None:
521
                break
522
            parent, path = props
523
            self.statistics_update(parent, population, size, mtime, cluster)
524
            node = parent
525
            population = 0  # Population isn't recursive
526

    
527
    def statistics_latest(self, node, before=inf, except_cluster=0):
528
        """Return population, total size and last mtime
529
           for all latest versions under node that
530
           do not belong to the cluster.
531
        """
532

    
533
        # The node.
534
        props = self.node_get_properties(node)
535
        if props is None:
536
            return None
537
        parent, path = props
538

    
539
        # The latest version.
540
        s = select([self.versions.c.serial,
541
                    self.versions.c.node,
542
                    self.versions.c.hash,
543
                    self.versions.c.size,
544
                    self.versions.c.type,
545
                    self.versions.c.source,
546
                    self.versions.c.mtime,
547
                    self.versions.c.muser,
548
                    self.versions.c.uuid,
549
                    self.versions.c.checksum,
550
                    self.versions.c.cluster])
551
        if before != inf:
552
            filtered = select([func.max(self.versions.c.serial)],
553
                              self.versions.c.node == node)
554
            filtered = filtered.where(self.versions.c.mtime < before)
555
        else:
556
            filtered = select([self.nodes.c.latest_version],
557
                              self.versions.c.node == node)
558
        s = s.where(and_(self.versions.c.cluster != except_cluster,
559
                         self.versions.c.serial == filtered))
560
        r = self.conn.execute(s)
561
        props = r.fetchone()
562
        r.close()
563
        if not props:
564
            return None
565
        mtime = props[MTIME]
566

    
567
        # First level, just under node (get population).
568
        v = self.versions.alias('v')
569
        s = select([func.count(v.c.serial),
570
                    func.sum(v.c.size),
571
                    func.max(v.c.mtime)])
572
        if before != inf:
573
            c1 = select([func.max(self.versions.c.serial)])
574
            c1 = c1.where(self.versions.c.mtime < before)
575
            c1.where(self.versions.c.node == v.c.node)
576
        else:
577
            c1 = select([self.nodes.c.latest_version])
578
            c1.where(self.nodes.c.node == v.c.node)
579
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
580
        s = s.where(and_(v.c.serial == c1,
581
                         v.c.cluster != except_cluster,
582
                         v.c.node.in_(c2)))
583
        rp = self.conn.execute(s)
584
        r = rp.fetchone()
585
        rp.close()
586
        if not r:
587
            return None
588
        count = r[0]
589
        mtime = max(mtime, r[2])
590
        if count == 0:
591
            return (0, 0, mtime)
592

    
593
        # All children (get size and mtime).
594
        # This is why the full path is stored.
595
        s = select([func.count(v.c.serial),
596
                    func.sum(v.c.size),
597
                    func.max(v.c.mtime)])
598
        if before != inf:
599
            c1 = select([func.max(self.versions.c.serial)],
600
                        self.versions.c.node == v.c.node)
601
            c1 = c1.where(self.versions.c.mtime < before)
602
        else:
603
            c1 = select([self.nodes.c.serial],
604
                        self.nodes.c.node == v.c.node)
605
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(
606
            self.escape_like(path) + '%', escape='\\'))
607
        s = s.where(and_(v.c.serial == c1,
608
                         v.c.cluster != except_cluster,
609
                         v.c.node.in_(c2)))
610
        rp = self.conn.execute(s)
611
        r = rp.fetchone()
612
        rp.close()
613
        if not r:
614
            return None
615
        size = r[1] - props[SIZE]
616
        mtime = max(mtime, r[2])
617
        return (count, size, mtime)
618

    
619
    def nodes_set_latest_version(self, node, serial):
620
        s = self.nodes.update().where(self.nodes.c.node == node)
621
        s = s.values(latest_version=serial)
622
        self.conn.execute(s).close()
623

    
624
    def version_create(self, node, hash, size, type, source, muser, uuid, checksum, cluster=0):
625
        """Create a new version from the given properties.
626
           Return the (serial, mtime) of the new version.
627
        """
628

    
629
        mtime = time()
630
        s = self.versions.insert(
631
        ).values(node=node, hash=hash, size=size, type=type, source=source,
632
                 mtime=mtime, muser=muser, uuid=uuid, checksum=checksum, cluster=cluster)
633
        serial = self.conn.execute(s).inserted_primary_key[0]
634
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
635

    
636
        self.nodes_set_latest_version(node, serial)
637

    
638
        return serial, mtime
639

    
640
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
641
        """Lookup the current version of the given node.
642
           Return a list with its properties:
643
           (serial, node, hash, size, type, source, mtime,
644
            muser, uuid, checksum, cluster)
645
           or None if the current version is not found in the given cluster.
646
        """
647

    
648
        v = self.versions.alias('v')
649
        if not all_props:
650
            s = select([v.c.serial])
651
        else:
652
            s = select([v.c.serial, v.c.node, v.c.hash,
653
                        v.c.size, v.c.type, v.c.source,
654
                        v.c.mtime, v.c.muser, v.c.uuid,
655
                        v.c.checksum, v.c.cluster])
656
        if before != inf:
657
            c = select([func.max(self.versions.c.serial)],
658
                       self.versions.c.node == node)
659
            c = c.where(self.versions.c.mtime < before)
660
        else:
661
            c = select([self.nodes.c.latest_version],
662
                       self.nodes.c.node == node)
663
        s = s.where(and_(v.c.serial == c,
664
                         v.c.cluster == cluster))
665
        r = self.conn.execute(s)
666
        props = r.fetchone()
667
        r.close()
668
        if props:
669
            return props
670
        return None
671

    
672
    def version_lookup_bulk(self, nodes, before=inf, cluster=0, all_props=True):
673
        """Lookup the current versions of the given nodes.
674
           Return a list with their properties:
675
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
676
        """
677
        if not nodes:
678
            return ()
679
        v = self.versions.alias('v')
680
        if not all_props:
681
            s = select([v.c.serial])
682
        else:
683
            s = select([v.c.serial, v.c.node, v.c.hash,
684
                        v.c.size, v.c.type, v.c.source,
685
                        v.c.mtime, v.c.muser, v.c.uuid,
686
                        v.c.checksum, v.c.cluster])
687
        if before != inf:
688
            c = select([func.max(self.versions.c.serial)],
689
                       self.versions.c.node.in_(nodes))
690
            c = c.where(self.versions.c.mtime < before)
691
            c = c.group_by(self.versions.c.node)
692
        else:
693
            c = select([self.nodes.c.latest_version],
694
                       self.nodes.c.node.in_(nodes))
695
        s = s.where(and_(v.c.serial.in_(c),
696
                         v.c.cluster == cluster))
697
        s = s.order_by(v.c.node)
698
        r = self.conn.execute(s)
699
        rproxy = r.fetchall()
700
        r.close()
701
        return (tuple(row.values()) for row in rproxy)
702

    
703
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
704
        """Return a sequence of values for the properties of
705
           the version specified by serial and the keys, in the order given.
706
           If keys is empty, return all properties in the order
707
           (serial, node, hash, size, type, source, mtime, muser, uuid, checksum, cluster).
708
        """
709

    
710
        v = self.versions.alias()
711
        s = select([v.c.serial, v.c.node, v.c.hash,
712
                    v.c.size, v.c.type, v.c.source,
713
                    v.c.mtime, v.c.muser, v.c.uuid,
714
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
715
        rp = self.conn.execute(s)
716
        r = rp.fetchone()
717
        rp.close()
718
        if r is None:
719
            return r
720

    
721
        if not keys:
722
            return r
723
        return [r[propnames[k]] for k in keys if k in propnames]
724

    
725
    def version_put_property(self, serial, key, value):
726
        """Set value for the property of version specified by key."""
727

    
728
        if key not in _propnames:
729
            return
730
        s = self.versions.update()
731
        s = s.where(self.versions.c.serial == serial)
732
        s = s.values(**{key: value})
733
        self.conn.execute(s).close()
734

    
735
    def version_recluster(self, serial, cluster):
736
        """Move the version into another cluster."""
737

    
738
        props = self.version_get_properties(serial)
739
        if not props:
740
            return
741
        node = props[NODE]
742
        size = props[SIZE]
743
        oldcluster = props[CLUSTER]
744
        if cluster == oldcluster:
745
            return
746

    
747
        mtime = time()
748
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
749
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
750

    
751
        s = self.versions.update()
752
        s = s.where(self.versions.c.serial == serial)
753
        s = s.values(cluster=cluster)
754
        self.conn.execute(s).close()
755

    
756
    def version_remove(self, serial):
757
        """Remove the serial specified."""
758

    
759
        props = self.version_get_properties(serial)
760
        if not props:
761
            return
762
        node = props[NODE]
763
        hash = props[HASH]
764
        size = props[SIZE]
765
        cluster = props[CLUSTER]
766

    
767
        mtime = time()
768
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
769

    
770
        s = self.versions.delete().where(self.versions.c.serial == serial)
771
        self.conn.execute(s).close()
772

    
773
        props = self.version_lookup(node, cluster=cluster, all_props=False)
774
        if props:
775
            self.nodes_set_latest_version(node, serial)
776

    
777
        return hash, size
778

    
779
    def attribute_get(self, serial, domain, keys=()):
780
        """Return a list of (key, value) pairs of the version specified by serial.
781
           If keys is empty, return all attributes.
782
           Othwerise, return only those specified.
783
        """
784

    
785
        if keys:
786
            attrs = self.attributes.alias()
787
            s = select([attrs.c.key, attrs.c.value])
788
            s = s.where(and_(attrs.c.key.in_(keys),
789
                             attrs.c.serial == serial,
790
                             attrs.c.domain == domain))
791
        else:
792
            attrs = self.attributes.alias()
793
            s = select([attrs.c.key, attrs.c.value])
794
            s = s.where(and_(attrs.c.serial == serial,
795
                             attrs.c.domain == domain))
796
        r = self.conn.execute(s)
797
        l = r.fetchall()
798
        r.close()
799
        return l
800

    
801
    def attribute_set(self, serial, domain, items):
802
        """Set the attributes of the version specified by serial.
803
           Receive attributes as an iterable of (key, value) pairs.
804
        """
805
        #insert or replace
806
        #TODO better upsert
807
        for k, v in items:
808
            s = self.attributes.update()
809
            s = s.where(and_(self.attributes.c.serial == serial,
810
                             self.attributes.c.domain == domain,
811
                             self.attributes.c.key == k))
812
            s = s.values(value=v)
813
            rp = self.conn.execute(s)
814
            rp.close()
815
            if rp.rowcount == 0:
816
                s = self.attributes.insert()
817
                s = s.values(serial=serial, domain=domain, key=k, value=v)
818
                self.conn.execute(s).close()
819

    
820
    def attribute_del(self, serial, domain, keys=()):
821
        """Delete attributes of the version specified by serial.
822
           If keys is empty, delete all attributes.
823
           Otherwise delete those specified.
824
        """
825

    
826
        if keys:
827
            #TODO more efficient way to do this?
828
            for key in keys:
829
                s = self.attributes.delete()
830
                s = s.where(and_(self.attributes.c.serial == serial,
831
                                 self.attributes.c.domain == domain,
832
                                 self.attributes.c.key == key))
833
                self.conn.execute(s).close()
834
        else:
835
            s = self.attributes.delete()
836
            s = s.where(and_(self.attributes.c.serial == serial,
837
                             self.attributes.c.domain == domain))
838
            self.conn.execute(s).close()
839

    
840
    def attribute_copy(self, source, dest):
841
        s = select(
842
            [dest, self.attributes.c.domain,
843
                self.attributes.c.key, self.attributes.c.value],
844
            self.attributes.c.serial == source)
845
        rp = self.conn.execute(s)
846
        attributes = rp.fetchall()
847
        rp.close()
848
        for dest, domain, k, v in attributes:
849
            #insert or replace
850
            s = self.attributes.update().where(and_(
851
                self.attributes.c.serial == dest,
852
                self.attributes.c.domain == domain,
853
                self.attributes.c.key == k))
854
            rp = self.conn.execute(s, value=v)
855
            rp.close()
856
            if rp.rowcount == 0:
857
                s = self.attributes.insert()
858
                values = {'serial': dest, 'domain': domain,
859
                          'key': k, 'value': v}
860
                self.conn.execute(s, values).close()
861

    
862
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
863
        """Return a list with all keys pairs defined
864
           for all latest versions under parent that
865
           do not belong to the cluster.
866
        """
867

    
868
        # TODO: Use another table to store before=inf results.
869
        a = self.attributes.alias('a')
870
        v = self.versions.alias('v')
871
        n = self.nodes.alias('n')
872
        s = select([a.c.key]).distinct()
873
        if before != inf:
874
            filtered = select([func.max(self.versions.c.serial)])
875
            filtered = filtered.where(self.versions.c.mtime < before)
876
            filtered = filtered.where(self.versions.c.node == v.c.node)
877
        else:
878
            filtered = select([self.nodes.c.latest_version])
879
            filtered = filtered.where(self.nodes.c.node == v.c.node)
880
        s = s.where(v.c.serial == filtered)
881
        s = s.where(v.c.cluster != except_cluster)
882
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
883
                                        self.nodes.c.parent == parent)))
884
        s = s.where(a.c.serial == v.c.serial)
885
        s = s.where(a.c.domain == domain)
886
        s = s.where(n.c.node == v.c.node)
887
        conj = []
888
        for path, match in pathq:
889
            if match == MATCH_PREFIX:
890
                conj.append(
891
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
892
            elif match == MATCH_EXACT:
893
                conj.append(n.c.path == path)
894
        if conj:
895
            s = s.where(or_(*conj))
896
        rp = self.conn.execute(s)
897
        rows = rp.fetchall()
898
        rp.close()
899
        return [r[0] for r in rows]
900

    
901
    def latest_version_list(self, parent, prefix='', delimiter=None,
902
                            start='', limit=10000, before=inf,
903
                            except_cluster=0, pathq=[], domain=None,
904
                            filterq=[], sizeq=None, all_props=False):
905
        """Return a (list of (path, serial) tuples, list of common prefixes)
906
           for the current versions of the paths with the given parent,
907
           matching the following criteria.
908

909
           The property tuple for a version is returned if all
910
           of these conditions are true:
911

912
                a. parent matches
913

914
                b. path > start
915

916
                c. path starts with prefix (and paths in pathq)
917

918
                d. version is the max up to before
919

920
                e. version is not in cluster
921

922
                f. the path does not have the delimiter occuring
923
                   after the prefix, or ends with the delimiter
924

925
                g. serial matches the attribute filter query.
926

927
                   A filter query is a comma-separated list of
928
                   terms in one of these three forms:
929

930
                   key
931
                       an attribute with this key must exist
932

933
                   !key
934
                       an attribute with this key must not exist
935

936
                   key ?op value
937
                       the attribute with this key satisfies the value
938
                       where ?op is one of ==, != <=, >=, <, >.
939

940
                h. the size is in the range set by sizeq
941

942
           The list of common prefixes includes the prefixes
943
           matching up to the first delimiter after prefix,
944
           and are reported only once, as "virtual directories".
945
           The delimiter is included in the prefixes.
946

947
           If arguments are None, then the corresponding matching rule
948
           will always match.
949

950
           Limit applies to the first list of tuples returned.
951

952
           If all_props is True, return all properties after path, not just serial.
953
        """
954

    
955
        if not start or start < prefix:
956
            start = strprevling(prefix)
957
        nextling = strnextling(prefix)
958

    
959
        v = self.versions.alias('v')
960
        n = self.nodes.alias('n')
961
        if not all_props:
962
            s = select([n.c.path, v.c.serial]).distinct()
963
        else:
964
            s = select([n.c.path,
965
                        v.c.serial, v.c.node, v.c.hash,
966
                        v.c.size, v.c.type, v.c.source,
967
                        v.c.mtime, v.c.muser, v.c.uuid,
968
                        v.c.checksum, v.c.cluster]).distinct()
969
        if before != inf:
970
            filtered = select([func.max(self.versions.c.serial)])
971
            filtered = filtered.where(self.versions.c.mtime < before)
972
        else:
973
            filtered = select([self.nodes.c.latest_version])
974
        s = s.where(
975
            v.c.serial == filtered.where(self.nodes.c.node == v.c.node))
976
        s = s.where(v.c.cluster != except_cluster)
977
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
978
                                        self.nodes.c.parent == parent)))
979

    
980
        s = s.where(n.c.node == v.c.node)
981
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
982
        conj = []
983
        for path, match in pathq:
984
            if match == MATCH_PREFIX:
985
                conj.append(
986
                    n.c.path.like(self.escape_like(path) + '%', escape='\\'))
987
            elif match == MATCH_EXACT:
988
                conj.append(n.c.path == path)
989
        if conj:
990
            s = s.where(or_(*conj))
991

    
992
        if sizeq and len(sizeq) == 2:
993
            if sizeq[0]:
994
                s = s.where(v.c.size >= sizeq[0])
995
            if sizeq[1]:
996
                s = s.where(v.c.size < sizeq[1])
997

    
998
        if domain and filterq:
999
            a = self.attributes.alias('a')
1000
            included, excluded, opers = parse_filters(filterq)
1001
            if included:
1002
                subs = select([1])
1003
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1004
                subs = subs.where(a.c.domain == domain)
1005
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in included]))
1006
                s = s.where(exists(subs))
1007
            if excluded:
1008
                subs = select([1])
1009
                subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1010
                subs = subs.where(a.c.domain == domain)
1011
                subs = subs.where(or_(*[a.c.key.op('=')(x) for x in excluded]))
1012
                s = s.where(not_(exists(subs)))
1013
            if opers:
1014
                for k, o, val in opers:
1015
                    subs = select([1])
1016
                    subs = subs.where(a.c.serial == v.c.serial).correlate(v)
1017
                    subs = subs.where(a.c.domain == domain)
1018
                    subs = subs.where(
1019
                        and_(a.c.key.op('=')(k), a.c.value.op(o)(val)))
1020
                    s = s.where(exists(subs))
1021

    
1022
        s = s.order_by(n.c.path)
1023

    
1024
        if not delimiter:
1025
            s = s.limit(limit)
1026
            rp = self.conn.execute(s, start=start)
1027
            r = rp.fetchall()
1028
            rp.close()
1029
            return r, ()
1030

    
1031
        pfz = len(prefix)
1032
        dz = len(delimiter)
1033
        count = 0
1034
        prefixes = []
1035
        pappend = prefixes.append
1036
        matches = []
1037
        mappend = matches.append
1038

    
1039
        rp = self.conn.execute(s, start=start)
1040
        while True:
1041
            props = rp.fetchone()
1042
            if props is None:
1043
                break
1044
            path = props[0]
1045
            serial = props[1]
1046
            idx = path.find(delimiter, pfz)
1047

    
1048
            if idx < 0:
1049
                mappend(props)
1050
                count += 1
1051
                if count >= limit:
1052
                    break
1053
                continue
1054

    
1055
            if idx + dz == len(path):
1056
                mappend(props)
1057
                count += 1
1058
                continue  # Get one more, in case there is a path.
1059
            pf = path[:idx + dz]
1060
            pappend(pf)
1061
            if count >= limit:
1062
                break
1063

    
1064
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1065
        rp.close()
1066

    
1067
        return matches, prefixes
1068

    
1069
    def latest_uuid(self, uuid):
1070
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
1071

    
1072
        v = self.versions.alias('v')
1073
        n = self.nodes.alias('n')
1074
        s = select([n.c.path, v.c.serial])
1075
        filtered = select([func.max(self.versions.c.serial)])
1076
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
1077
        s = s.where(n.c.node == v.c.node)
1078

    
1079
        r = self.conn.execute(s)
1080
        l = r.fetchone()
1081
        r.close()
1082
        return l