Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / sqlalchemy / node.py @ 2c2513fc

History | View | Annotate | Download (48 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from time import time
35
from operator import itemgetter
36
from itertools import groupby
37

    
38
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39
                        Column, String, MetaData, ForeignKey)
40
from sqlalchemy.schema import Index
41
from sqlalchemy.sql import func, and_, or_, not_, select, bindparam, exists
42
from sqlalchemy.exc import NoSuchTableError
43

    
44
from dbworker import DBWorker, ESCAPE_CHAR
45

    
46
from pithos.backends.filter import parse_filters
47

    
48

    
49
ROOTNODE = 0
50

    
51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER) = range(11)
53

    
54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55

    
56
inf = float('inf')
57

    
58

    
59
def strnextling(prefix):
60
    """Return the first unicode string
61
       greater than but not starting with given prefix.
62
       strnextling('hello') -> 'hellp'
63
    """
64
    if not prefix:
65
        ## all strings start with the null string,
66
        ## therefore we have to approximate strnextling('')
67
        ## with the last unicode character supported by python
68
        ## 0x10ffff for wide (32-bit unicode) python builds
69
        ## 0x00ffff for narrow (16-bit unicode) python builds
70
        ## We will not autodetect. 0xffff is safe enough.
71
        return unichr(0xffff)
72
    s = prefix[:-1]
73
    c = ord(prefix[-1])
74
    if c >= 0xffff:
75
        raise RuntimeError
76
    s += unichr(c + 1)
77
    return s
78

    
79

    
80
def strprevling(prefix):
81
    """Return an approximation of the last unicode string
82
       less than but not starting with given prefix.
83
       strprevling(u'hello') -> u'helln\\xffff'
84
    """
85
    if not prefix:
86
        ## There is no prevling for the null string
87
        return prefix
88
    s = prefix[:-1]
89
    c = ord(prefix[-1])
90
    if c > 0:
91
        s += unichr(c - 1) + unichr(0xffff)
92
    return s
93

    
94
_propnames = {
95
    'serial': 0,
96
    'node': 1,
97
    'hash': 2,
98
    'size': 3,
99
    'type': 4,
100
    'source': 5,
101
    'mtime': 6,
102
    'muser': 7,
103
    'uuid': 8,
104
    'checksum': 9,
105
    'cluster': 10,
106
}
107

    
108

    
109
def create_tables(engine):
110
    metadata = MetaData()
111

    
112
    #create nodes table
113
    columns = []
114
    columns.append(Column('node', Integer, primary_key=True))
115
    columns.append(Column('parent', Integer,
116
                          ForeignKey('nodes.node',
117
                                     ondelete='CASCADE',
118
                                     onupdate='CASCADE'),
119
                          autoincrement=False))
120
    columns.append(Column('latest_version', Integer))
121
    columns.append(Column('path', String(2048), default='', nullable=False))
122
    nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123
    Index('idx_nodes_path', nodes.c.path, unique=True)
124
    Index('idx_nodes_parent', nodes.c.parent)
125

    
126
    #create policy table
127
    columns = []
128
    columns.append(Column('node', Integer,
129
                          ForeignKey('nodes.node',
130
                                     ondelete='CASCADE',
131
                                     onupdate='CASCADE'),
132
                          primary_key=True))
133
    columns.append(Column('key', String(128), primary_key=True))
134
    columns.append(Column('value', String(256)))
135
    Table('policy', metadata, *columns, mysql_engine='InnoDB')
136

    
137
    #create statistics table
138
    columns = []
139
    columns.append(Column('node', Integer,
140
                          ForeignKey('nodes.node',
141
                                     ondelete='CASCADE',
142
                                     onupdate='CASCADE'),
143
                          primary_key=True))
144
    columns.append(Column('population', Integer, nullable=False, default=0))
145
    columns.append(Column('size', BigInteger, nullable=False, default=0))
146
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
147
    columns.append(Column('cluster', Integer, nullable=False, default=0,
148
                          primary_key=True, autoincrement=False))
149
    Table('statistics', metadata, *columns, mysql_engine='InnoDB')
150

    
151
    #create versions table
152
    columns = []
153
    columns.append(Column('serial', Integer, primary_key=True))
154
    columns.append(Column('node', Integer,
155
                          ForeignKey('nodes.node',
156
                                     ondelete='CASCADE',
157
                                     onupdate='CASCADE')))
158
    columns.append(Column('hash', String(256)))
159
    columns.append(Column('size', BigInteger, nullable=False, default=0))
160
    columns.append(Column('type', String(256), nullable=False, default=''))
161
    columns.append(Column('source', Integer))
162
    columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
163
    columns.append(Column('muser', String(256), nullable=False, default=''))
164
    columns.append(Column('uuid', String(64), nullable=False, default=''))
165
    columns.append(Column('checksum', String(256), nullable=False, default=''))
166
    columns.append(Column('cluster', Integer, nullable=False, default=0))
167
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
168
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
169
    Index('idx_versions_node_uuid', versions.c.uuid)
170

    
171
    #create attributes table
172
    columns = []
173
    columns.append(Column('serial', Integer,
174
                          ForeignKey('versions.serial',
175
                                     ondelete='CASCADE',
176
                                     onupdate='CASCADE'),
177
                          primary_key=True))
178
    columns.append(Column('domain', String(256), primary_key=True))
179
    columns.append(Column('key', String(128), primary_key=True))
180
    columns.append(Column('value', String(256)))
181
    columns.append(Column('node', Integer, nullable=False, default=0))
182
    columns.append(Column('is_latest', Boolean, nullable=False, default=True))
183
    attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
184
    Index('idx_attributes_domain', attributes.c.domain)
185
    Index('idx_attributes_serial_node', attributes.c.serial, attributes.c.node)
186

    
187
    metadata.create_all(engine)
188
    return metadata.sorted_tables
189

    
190

    
191
class Node(DBWorker):
192
    """Nodes store path organization and have multiple versions.
193
       Versions store object history and have multiple attributes.
194
       Attributes store metadata.
195
    """
196

    
197
    # TODO: Provide an interface for included and excluded clusters.
198

    
199
    def __init__(self, **params):
200
        DBWorker.__init__(self, **params)
201
        try:
202
            metadata = MetaData(self.engine)
203
            self.nodes = Table('nodes', metadata, autoload=True)
204
            self.policy = Table('policy', metadata, autoload=True)
205
            self.statistics = Table('statistics', metadata, autoload=True)
206
            self.versions = Table('versions', metadata, autoload=True)
207
            self.attributes = Table('attributes', metadata, autoload=True)
208
        except NoSuchTableError:
209
            tables = create_tables(self.engine)
210
            map(lambda t: self.__setattr__(t.name, t), tables)
211

    
212
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
213
                                           self.nodes.c.parent == ROOTNODE))
214
        wrapper = self.wrapper
215
        wrapper.execute()
216
        try:
217
            rp = self.conn.execute(s)
218
            r = rp.fetchone()
219
            rp.close()
220
            if not r:
221
                s = self.nodes.insert(
222
                ).values(node=ROOTNODE, parent=ROOTNODE, path='')
223
                self.conn.execute(s)
224
        finally:
225
            wrapper.commit()
226

    
227
    def node_create(self, parent, path):
228
        """Create a new node from the given properties.
229
           Return the node identifier of the new node.
230
        """
231
        #TODO catch IntegrityError?
232
        s = self.nodes.insert().values(parent=parent, path=path)
233
        r = self.conn.execute(s)
234
        inserted_primary_key = r.inserted_primary_key[0]
235
        r.close()
236
        return inserted_primary_key
237

    
238
    def node_lookup(self, path, for_update=False):
239
        """Lookup the current node of the given path.
240
           Return None if the path is not found.
241
        """
242

    
243
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
244
        s = select([self.nodes.c.node],
245
                   self.nodes.c.path.like(self.escape_like(path),
246
                                          escape=ESCAPE_CHAR),
247
                   for_update=for_update)
248
        r = self.conn.execute(s)
249
        row = r.fetchone()
250
        r.close()
251
        if row:
252
            return row[0]
253
        return None
254

    
255
    def node_lookup_bulk(self, paths):
256
        """Lookup the current nodes for the given paths.
257
           Return () if the path is not found.
258
        """
259

    
260
        if not paths:
261
            return ()
262
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
263
        s = select([self.nodes.c.node], self.nodes.c.path.in_(paths))
264
        r = self.conn.execute(s)
265
        rows = r.fetchall()
266
        r.close()
267
        return [row[0] for row in rows]
268

    
269
    def node_get_properties(self, node):
270
        """Return the node's (parent, path).
271
           Return None if the node is not found.
272
        """
273

    
274
        s = select([self.nodes.c.parent, self.nodes.c.path])
275
        s = s.where(self.nodes.c.node == node)
276
        r = self.conn.execute(s)
277
        l = r.fetchone()
278
        r.close()
279
        return l
280

    
281
    def node_get_versions(self, node, keys=(), propnames=_propnames):
282
        """Return the properties of all versions at node.
283
           If keys is empty, return all properties in the order
284
           (serial, node, hash, size, type, source, mtime, muser, uuid,
285
            checksum, cluster).
286
        """
287

    
288
        s = select([self.versions.c.serial,
289
                    self.versions.c.node,
290
                    self.versions.c.hash,
291
                    self.versions.c.size,
292
                    self.versions.c.type,
293
                    self.versions.c.source,
294
                    self.versions.c.mtime,
295
                    self.versions.c.muser,
296
                    self.versions.c.uuid,
297
                    self.versions.c.checksum,
298
                    self.versions.c.cluster], self.versions.c.node == node)
299
        s = s.order_by(self.versions.c.serial)
300
        r = self.conn.execute(s)
301
        rows = r.fetchall()
302
        r.close()
303
        if not rows:
304
            return rows
305

    
306
        if not keys:
307
            return rows
308

    
309
        return [[p[propnames[k]] for k in keys if k in propnames] for
310
                p in rows]
311

    
312
    def node_count_children(self, node):
313
        """Return node's child count."""
314

    
315
        s = select([func.count(self.nodes.c.node)])
316
        s = s.where(and_(self.nodes.c.parent == node,
317
                         self.nodes.c.node != ROOTNODE))
318
        r = self.conn.execute(s)
319
        row = r.fetchone()
320
        r.close()
321
        return row[0]
322

    
323
    def node_purge_children(self, parent, before=inf, cluster=0,
324
                            update_statistics_ancestors_depth=None):
325
        """Delete all versions with the specified
326
           parent and cluster, and return
327
           the hashes, the total size and the serials of versions deleted.
328
           Clears out nodes with no remaining versions.
329
        """
330
        #update statistics
331
        c1 = select([self.nodes.c.node],
332
                    self.nodes.c.parent == parent)
333
        where_clause = and_(self.versions.c.node.in_(c1),
334
                            self.versions.c.cluster == cluster)
335
        if before != inf:
336
            where_clause = and_(where_clause,
337
                                self.versions.c.mtime <= before)
338
        s = select([func.count(self.versions.c.serial),
339
                    func.sum(self.versions.c.size)])
340
        s = s.where(where_clause)
341
        r = self.conn.execute(s)
342
        row = r.fetchone()
343
        r.close()
344
        if not row:
345
            return (), 0, ()
346
        nr, size = row[0], row[1] if row[1] else 0
347
        mtime = time()
348
        self.statistics_update(parent, -nr, -size, mtime, cluster)
349
        self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster,
350
                                         update_statistics_ancestors_depth)
351

    
352
        s = select([self.versions.c.hash, self.versions.c.serial])
353
        s = s.where(where_clause)
354
        r = self.conn.execute(s)
355
        hashes = []
356
        serials = []
357
        for row in r.fetchall():
358
            hashes += [row[0]]
359
            serials += [row[1]]
360
        r.close()
361

    
362
        #delete versions
363
        s = self.versions.delete().where(where_clause)
364
        r = self.conn.execute(s)
365
        r.close()
366

    
367
        #delete nodes
368
        s = select([self.nodes.c.node],
369
                   and_(self.nodes.c.parent == parent,
370
                        select([func.count(self.versions.c.serial)],
371
                               self.versions.c.node == self.nodes.c.node).
372
                        as_scalar() == 0))
373
        rp = self.conn.execute(s)
374
        nodes = [row[0] for row in rp.fetchall()]
375
        rp.close()
376
        if nodes:
377
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
378
            self.conn.execute(s).close()
379

    
380
        return hashes, size, serials
381

    
382
    def node_purge(self, node, before=inf, cluster=0,
383
                   update_statistics_ancestors_depth=None):
384
        """Delete all versions with the specified
385
           node and cluster, and return
386
           the hashes and size of versions deleted.
387
           Clears out the node if it has no remaining versions.
388
        """
389

    
390
        #update statistics
391
        s = select([func.count(self.versions.c.serial),
392
                    func.sum(self.versions.c.size)])
393
        where_clause = and_(self.versions.c.node == node,
394
                            self.versions.c.cluster == cluster)
395
        if before != inf:
396
            where_clause = and_(where_clause,
397
                                self.versions.c.mtime <= before)
398
        s = s.where(where_clause)
399
        r = self.conn.execute(s)
400
        row = r.fetchone()
401
        nr, size = row[0], row[1]
402
        r.close()
403
        if not nr:
404
            return (), 0, ()
405
        mtime = time()
406
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster,
407
                                         update_statistics_ancestors_depth)
408

    
409
        s = select([self.versions.c.hash, self.versions.c.serial])
410
        s = s.where(where_clause)
411
        r = self.conn.execute(s)
412
        hashes = []
413
        serials = []
414
        for row in r.fetchall():
415
            hashes += [row[0]]
416
            serials += [row[1]]
417
        r.close()
418

    
419
        #delete versions
420
        s = self.versions.delete().where(where_clause)
421
        r = self.conn.execute(s)
422
        r.close()
423

    
424
        #delete nodes
425
        s = select([self.nodes.c.node],
426
                   and_(self.nodes.c.node == node,
427
                        select([func.count(self.versions.c.serial)],
428
                               self.versions.c.node == self.nodes.c.node).
429
                        as_scalar() == 0))
430
        rp = self.conn.execute(s)
431
        nodes = [row[0] for row in rp.fetchall()]
432
        rp.close()
433
        if nodes:
434
            s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
435
            self.conn.execute(s).close()
436

    
437
        return hashes, size, serials
438

    
439
    def node_remove(self, node, update_statistics_ancestors_depth=None):
440
        """Remove the node specified.
441
           Return false if the node has children or is not found.
442
        """
443

    
444
        if self.node_count_children(node):
445
            return False
446

    
447
        mtime = time()
448
        s = select([func.count(self.versions.c.serial),
449
                    func.sum(self.versions.c.size),
450
                    self.versions.c.cluster])
451
        s = s.where(self.versions.c.node == node)
452
        s = s.group_by(self.versions.c.cluster)
453
        r = self.conn.execute(s)
454
        for population, size, cluster in r.fetchall():
455
            self.statistics_update_ancestors(
456
                node, -population, -size, mtime, cluster,
457
                update_statistics_ancestors_depth)
458
        r.close()
459

    
460
        s = self.nodes.delete().where(self.nodes.c.node == node)
461
        self.conn.execute(s).close()
462
        return True
463

    
464
    def node_accounts(self, accounts=()):
465
        s = select([self.nodes.c.path, self.nodes.c.node])
466
        s = s.where(and_(self.nodes.c.node != 0,
467
                         self.nodes.c.parent == 0))
468
        if accounts:
469
            s = s.where(self.nodes.c.path.in_(accounts))
470
        r = self.conn.execute(s)
471
        rows = r.fetchall()
472
        r.close()
473
        return rows
474

    
475
    def node_account_quotas(self):
476
        s = select([self.nodes.c.path, self.policy.c.value])
477
        s = s.where(and_(self.nodes.c.node != 0,
478
                         self.nodes.c.parent == 0))
479
        s = s.where(self.nodes.c.node == self.policy.c.node)
480
        s = s.where(self.policy.c.key == 'quota')
481
        r = self.conn.execute(s)
482
        rows = r.fetchall()
483
        r.close()
484
        return dict(rows)
485

    
486
    def node_account_usage(self, account=None, cluster=0):
487
        """Return usage for a specific account.
488

489
        Keyword arguments:
490
        account -- (default None: list usage for all the accounts)
491
        cluster -- list current, history or deleted usage (default 0: normal)
492
        """
493

    
494
        n1 = self.nodes.alias('n1')
495
        n2 = self.nodes.alias('n2')
496
        n3 = self.nodes.alias('n3')
497

    
498
        s = select([n3.c.path, func.sum(self.versions.c.size)])
499
        s = s.where(n1.c.node == self.versions.c.node)
500
        s = s.where(self.versions.c.cluster == cluster)
501
        s = s.where(n1.c.parent == n2.c.node)
502
        s = s.where(n2.c.parent == n3.c.node)
503
        s = s.where(n3.c.parent == 0)
504
        s = s.where(n3.c.node != 0)
505
        if account:
506
            s = s.where(n3.c.path == account)
507
        s = s.group_by(n3.c.path)
508
        r = self.conn.execute(s)
509
        usage = r.fetchall()
510
        r.close()
511
        return dict(usage)
512

    
513
    def policy_get(self, node):
514
        s = select([self.policy.c.key, self.policy.c.value],
515
                   self.policy.c.node == node)
516
        r = self.conn.execute(s)
517
        d = dict(r.fetchall())
518
        r.close()
519
        return d
520

    
521
    def policy_set(self, node, policy):
522
        #insert or replace
523
        for k, v in policy.iteritems():
524
            s = self.policy.update().where(and_(self.policy.c.node == node,
525
                                                self.policy.c.key == k))
526
            s = s.values(value=v)
527
            rp = self.conn.execute(s)
528
            rp.close()
529
            if rp.rowcount == 0:
530
                s = self.policy.insert()
531
                values = {'node': node, 'key': k, 'value': v}
532
                r = self.conn.execute(s, values)
533
                r.close()
534

    
535
    def statistics_get(self, node, cluster=0):
536
        """Return population, total size and last mtime
537
           for all versions under node that belong to the cluster.
538
        """
539

    
540
        s = select([self.statistics.c.population,
541
                    self.statistics.c.size,
542
                    self.statistics.c.mtime])
543
        s = s.where(and_(self.statistics.c.node == node,
544
                         self.statistics.c.cluster == cluster))
545
        r = self.conn.execute(s)
546
        row = r.fetchone()
547
        r.close()
548
        return row
549

    
550
    def statistics_update(self, node, population, size, mtime, cluster=0):
551
        """Update the statistics of the given node.
552
           Statistics keep track the population, total
553
           size of objects and mtime in the node's namespace.
554
           May be zero or positive or negative numbers.
555
        """
556
        s = select([self.statistics.c.population, self.statistics.c.size],
557
                   and_(self.statistics.c.node == node,
558
                        self.statistics.c.cluster == cluster))
559
        rp = self.conn.execute(s)
560
        r = rp.fetchone()
561
        rp.close()
562
        if not r:
563
            prepopulation, presize = (0, 0)
564
        else:
565
            prepopulation, presize = r
566
        population += prepopulation
567
        population = max(population, 0)
568
        size += presize
569

    
570
        #insert or replace
571
        #TODO better upsert
572
        u = self.statistics.update().where(and_(
573
            self.statistics.c.node == node,
574
            self.statistics.c.cluster == cluster))
575
        u = u.values(population=population, size=size, mtime=mtime)
576
        rp = self.conn.execute(u)
577
        rp.close()
578
        if rp.rowcount == 0:
579
            ins = self.statistics.insert()
580
            ins = ins.values(node=node, population=population, size=size,
581
                             mtime=mtime, cluster=cluster)
582
            self.conn.execute(ins).close()
583

    
584
    def statistics_update_ancestors(self, node, population, size, mtime,
585
                                    cluster=0, recursion_depth=None):
586
        """Update the statistics of the given node's parent.
587
           Then recursively update all parents up to the root
588
           or up to the ``recursion_depth`` (if not None).
589
           Population is not recursive.
590
        """
591

    
592
        i = 0
593
        while True:
594
            if node == ROOTNODE:
595
                break
596
            if recursion_depth and recursion_depth <= i:
597
                break
598
            props = self.node_get_properties(node)
599
            if props is None:
600
                break
601
            parent, path = props
602
            self.statistics_update(parent, population, size, mtime, cluster)
603
            node = parent
604
            population = 0  # Population isn't recursive
605
            i += 1
606

    
607
    def statistics_latest(self, node, before=inf, except_cluster=0):
608
        """Return population, total size and last mtime
609
           for all latest versions under node that
610
           do not belong to the cluster.
611
        """
612

    
613
        # The node.
614
        props = self.node_get_properties(node)
615
        if props is None:
616
            return None
617
        parent, path = props
618

    
619
        # The latest version.
620
        s = select([self.versions.c.serial,
621
                    self.versions.c.node,
622
                    self.versions.c.hash,
623
                    self.versions.c.size,
624
                    self.versions.c.type,
625
                    self.versions.c.source,
626
                    self.versions.c.mtime,
627
                    self.versions.c.muser,
628
                    self.versions.c.uuid,
629
                    self.versions.c.checksum,
630
                    self.versions.c.cluster])
631
        if before != inf:
632
            filtered = select([func.max(self.versions.c.serial)],
633
                              self.versions.c.node == node)
634
            filtered = filtered.where(self.versions.c.mtime < before)
635
        else:
636
            filtered = select([self.nodes.c.latest_version],
637
                              self.nodes.c.node == node)
638
        s = s.where(and_(self.versions.c.cluster != except_cluster,
639
                         self.versions.c.serial == filtered))
640
        r = self.conn.execute(s)
641
        props = r.fetchone()
642
        r.close()
643
        if not props:
644
            return None
645
        mtime = props[MTIME]
646

    
647
        # First level, just under node (get population).
648
        v = self.versions.alias('v')
649
        s = select([func.count(v.c.serial),
650
                    func.sum(v.c.size),
651
                    func.max(v.c.mtime)])
652
        if before != inf:
653
            c1 = select([func.max(self.versions.c.serial)],
654
                        and_(self.versions.c.mtime < before,
655
                             self.versions.c.node == v.c.node))
656
        else:
657
            c1 = select([self.nodes.c.latest_version])
658
            c1 = c1.where(self.nodes.c.node == v.c.node)
659
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
660
        s = s.where(and_(v.c.serial == c1,
661
                         v.c.cluster != except_cluster,
662
                         v.c.node.in_(c2)))
663
        rp = self.conn.execute(s)
664
        r = rp.fetchone()
665
        rp.close()
666
        if not r:
667
            return None
668
        count = r[0]
669
        mtime = max(mtime, r[2])
670
        if count == 0:
671
            return (0, 0, mtime)
672

    
673
        # All children (get size and mtime).
674
        # This is why the full path is stored.
675
        if before != inf:
676
            s = select([func.count(v.c.serial),
677
                    func.sum(v.c.size),
678
                    func.max(v.c.mtime)])
679
            c1 = select([func.max(self.versions.c.serial)],
680
                        and_(self.versions.c.mtime < before,
681
                             self.versions.c.node == v.c.node))
682
        else:
683
            inner_join = \
684
                    self.versions.join(self.nodes, onclause=\
685
                    self.versions.c.serial == self.nodes.c.latest_version)
686
            s = select([func.count(self.versions.c.serial),
687
                    func.sum(self.versions.c.size),
688
                    func.max(self.versions.c.mtime)], from_obj=[inner_join])
689

    
690
        c2 = select([self.nodes.c.node],
691
                    self.nodes.c.path.like(self.escape_like(path) + '%',
692
                                           escape=ESCAPE_CHAR))
693
        if before != inf:
694
            s = s.where(and_(v.c.serial == c1,
695
                         v.c.cluster != except_cluster,
696
                         v.c.node.in_(c2)))
697
        else:
698
            s = s.where(and_(self.versions.c.cluster != except_cluster,
699
                        self.versions.c.node.in_(c2)))
700

    
701
        rp = self.conn.execute(s)
702
        r = rp.fetchone()
703
        rp.close()
704
        if not r:
705
            return None
706
        size = long(r[1] - props[SIZE])
707
        mtime = max(mtime, r[2])
708
        return (count, size, mtime)
709

    
710
    def nodes_set_latest_version(self, node, serial):
711
        s = self.nodes.update().where(self.nodes.c.node == node)
712
        s = s.values(latest_version=serial)
713
        self.conn.execute(s).close()
714

    
715
    def version_create(self, node, hash, size, type, source, muser, uuid,
716
                       checksum, cluster=0,
717
                       update_statistics_ancestors_depth=None):
718
        """Create a new version from the given properties.
719
           Return the (serial, mtime) of the new version.
720
        """
721

    
722
        mtime = time()
723
        s = self.versions.insert().values(
724
            node=node, hash=hash, size=size, type=type, source=source,
725
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
726
            cluster=cluster)
727
        serial = self.conn.execute(s).inserted_primary_key[0]
728
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
729
                                         update_statistics_ancestors_depth)
730

    
731
        self.nodes_set_latest_version(node, serial)
732

    
733
        return serial, mtime
734

    
735
    def version_lookup(self, node, before=inf, cluster=0, all_props=True):
736
        """Lookup the current version of the given node.
737
           Return a list with its properties:
738
           (serial, node, hash, size, type, source, mtime,
739
            muser, uuid, checksum, cluster)
740
           or None if the current version is not found in the given cluster.
741
        """
742

    
743
        v = self.versions.alias('v')
744
        if not all_props:
745
            s = select([v.c.serial])
746
        else:
747
            s = select([v.c.serial, v.c.node, v.c.hash,
748
                        v.c.size, v.c.type, v.c.source,
749
                        v.c.mtime, v.c.muser, v.c.uuid,
750
                        v.c.checksum, v.c.cluster])
751
        if before != inf:
752
            c = select([func.max(self.versions.c.serial)],
753
                       self.versions.c.node == node)
754
            c = c.where(self.versions.c.mtime < before)
755
        else:
756
            c = select([self.nodes.c.latest_version],
757
                       self.nodes.c.node == node)
758
        s = s.where(and_(v.c.serial == c,
759
                         v.c.cluster == cluster))
760
        r = self.conn.execute(s)
761
        props = r.fetchone()
762
        r.close()
763
        if props:
764
            return props
765
        return None
766

    
767
    def version_lookup_bulk(self, nodes, before=inf, cluster=0,
768
                            all_props=True, order_by_path=False):
769
        """Lookup the current versions of the given nodes.
770
           Return a list with their properties:
771
           (serial, node, hash, size, type, source, mtime, muser, uuid,
772
            checksum, cluster).
773
        """
774
        if not nodes:
775
            return ()
776
        v = self.versions.alias('v')
777
        n = self.nodes.alias('n')
778
        if not all_props:
779
            s = select([v.c.serial])
780
        else:
781
            s = select([v.c.serial, v.c.node, v.c.hash,
782
                        v.c.size, v.c.type, v.c.source,
783
                        v.c.mtime, v.c.muser, v.c.uuid,
784
                        v.c.checksum, v.c.cluster])
785
        if before != inf:
786
            c = select([func.max(self.versions.c.serial)],
787
                       self.versions.c.node.in_(nodes))
788
            c = c.where(self.versions.c.mtime < before)
789
            c = c.group_by(self.versions.c.node)
790
        else:
791
            c = select([self.nodes.c.latest_version],
792
                       self.nodes.c.node.in_(nodes))
793
        s = s.where(and_(v.c.serial.in_(c),
794
                         v.c.cluster == cluster))
795
        if order_by_path:
796
            s = s.where(v.c.node == n.c.node)
797
            s = s.order_by(n.c.path)
798
        else:
799
            s = s.order_by(v.c.node)
800
        r = self.conn.execute(s)
801
        rproxy = r.fetchall()
802
        r.close()
803
        return (tuple(row.values()) for row in rproxy)
804

    
805
    def version_get_properties(self, serial, keys=(), propnames=_propnames,
806
                               node=None):
807
        """Return a sequence of values for the properties of
808
           the version specified by serial and the keys, in the order given.
809
           If keys is empty, return all properties in the order
810
           (serial, node, hash, size, type, source, mtime, muser, uuid,
811
            checksum, cluster).
812
        """
813

    
814
        v = self.versions.alias()
815
        s = select([v.c.serial, v.c.node, v.c.hash,
816
                    v.c.size, v.c.type, v.c.source,
817
                    v.c.mtime, v.c.muser, v.c.uuid,
818
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
819
        if node is not None:
820
            s = s.where(v.c.node == node)
821
        rp = self.conn.execute(s)
822
        r = rp.fetchone()
823
        rp.close()
824
        if r is None:
825
            return r
826

    
827
        if not keys:
828
            return r
829
        return [r[propnames[k]] for k in keys if k in propnames]
830

    
831
    def version_put_property(self, serial, key, value):
832
        """Set value for the property of version specified by key."""
833

    
834
        if key not in _propnames:
835
            return
836
        s = self.versions.update()
837
        s = s.where(self.versions.c.serial == serial)
838
        s = s.values(**{key: value})
839
        self.conn.execute(s).close()
840

    
841
    def version_recluster(self, serial, cluster,
842
                          update_statistics_ancestors_depth=None):
843
        """Move the version into another cluster."""
844

    
845
        props = self.version_get_properties(serial)
846
        if not props:
847
            return
848
        node = props[NODE]
849
        size = props[SIZE]
850
        oldcluster = props[CLUSTER]
851
        if cluster == oldcluster:
852
            return
853

    
854
        mtime = time()
855
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster,
856
                                         update_statistics_ancestors_depth)
857
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
858
                                         update_statistics_ancestors_depth)
859

    
860
        s = self.versions.update()
861
        s = s.where(self.versions.c.serial == serial)
862
        s = s.values(cluster=cluster)
863
        self.conn.execute(s).close()
864

    
865
    def version_remove(self, serial, update_statistics_ancestors_depth=None):
866
        """Remove the serial specified."""
867

    
868
        props = self.version_get_properties(serial)
869
        if not props:
870
            return
871
        node = props[NODE]
872
        hash = props[HASH]
873
        size = props[SIZE]
874
        cluster = props[CLUSTER]
875

    
876
        mtime = time()
877
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster,
878
                                         update_statistics_ancestors_depth)
879

    
880
        s = self.versions.delete().where(self.versions.c.serial == serial)
881
        self.conn.execute(s).close()
882

    
883
        props = self.version_lookup(node, cluster=cluster, all_props=False)
884
        if props:
885
            self.nodes_set_latest_version(node, serial)
886

    
887
        return hash, size
888

    
889
    def attribute_get(self, serial, domain, keys=()):
890
        """Return a list of (key, value) pairs of the specific version.
891

892
        If keys is empty, return all attributes.
893
        Othwerise, return only those specified.
894
        """
895

    
896
        if keys:
897
            attrs = self.attributes.alias()
898
            s = select([attrs.c.key, attrs.c.value])
899
            s = s.where(and_(attrs.c.key.in_(keys),
900
                             attrs.c.serial == serial,
901
                             attrs.c.domain == domain))
902
        else:
903
            attrs = self.attributes.alias()
904
            s = select([attrs.c.key, attrs.c.value])
905
            s = s.where(and_(attrs.c.serial == serial,
906
                             attrs.c.domain == domain))
907
        r = self.conn.execute(s)
908
        l = r.fetchall()
909
        r.close()
910
        return l
911

    
912
    def attribute_set(self, serial, domain, node, items, is_latest=True):
913
        """Set the attributes of the version specified by serial.
914
           Receive attributes as an iterable of (key, value) pairs.
915
        """
916
        #insert or replace
917
        #TODO better upsert
918
        for k, v in items:
919
            s = self.attributes.update()
920
            s = s.where(and_(self.attributes.c.serial == serial,
921
                             self.attributes.c.domain == domain,
922
                             self.attributes.c.key == k))
923
            s = s.values(value=v)
924
            rp = self.conn.execute(s)
925
            rp.close()
926
            if rp.rowcount == 0:
927
                s = self.attributes.insert()
928
                s = s.values(serial=serial, domain=domain, node=node,
929
                             is_latest=is_latest, key=k, value=v)
930
                self.conn.execute(s).close()
931

    
932
    def attribute_del(self, serial, domain, keys=()):
933
        """Delete attributes of the version specified by serial.
934
           If keys is empty, delete all attributes.
935
           Otherwise delete those specified.
936
        """
937

    
938
        if keys:
939
            #TODO more efficient way to do this?
940
            for key in keys:
941
                s = self.attributes.delete()
942
                s = s.where(and_(self.attributes.c.serial == serial,
943
                                 self.attributes.c.domain == domain,
944
                                 self.attributes.c.key == key))
945
                self.conn.execute(s).close()
946
        else:
947
            s = self.attributes.delete()
948
            s = s.where(and_(self.attributes.c.serial == serial,
949
                             self.attributes.c.domain == domain))
950
            self.conn.execute(s).close()
951

    
952
    def attribute_copy(self, source, dest):
953
        s = select(
954
            [dest, self.attributes.c.domain, self.attributes.c.node,
955
             self.attributes.c.key, self.attributes.c.value],
956
            self.attributes.c.serial == source)
957
        rp = self.conn.execute(s)
958
        attributes = rp.fetchall()
959
        rp.close()
960
        for dest, domain, node, k, v in attributes:
961
            select_src_node = select(
962
                [self.versions.c.node],
963
                self.versions.c.serial == dest)
964
            # insert or replace
965
            s = self.attributes.update().where(and_(
966
                self.attributes.c.serial == dest,
967
                self.attributes.c.domain == domain,
968
                self.attributes.c.key == k))
969
            s = s.values(node=select_src_node, value=v)
970
            rp = self.conn.execute(s)
971
            rp.close()
972
            if rp.rowcount == 0:
973
                s = self.attributes.insert()
974
                s = s.values(serial=dest, domain=domain, node=select_src_node,
975
                             is_latest=True, key=k, value=v)
976
            self.conn.execute(s).close()
977

    
978
    def attribute_unset_is_latest(self, node, exclude):
979
        u = self.attributes.update().where(and_(
980
            self.attributes.c.node == node,
981
            self.attributes.c.serial != exclude)).values({'is_latest': False})
982
        self.conn.execute(u)
983

    
984
    def latest_attribute_keys(self, parent, domain, before=inf,
985
                              except_cluster=0, pathq=None):
986
        """Return a list with all keys pairs defined
987
           for all latest versions under parent that
988
           do not belong to the cluster.
989
        """
990

    
991
        pathq = pathq or []
992

    
993
        # TODO: Use another table to store before=inf results.
994
        v = self.versions.alias('v')
995
        s = select([self.attributes.c.key]).distinct()
996
        if before != inf:
997
            filtered = select([func.max(v.c.serial)],
998
                              and_(v.c.mtime < before,
999
                                   v.c.node == self.versions.c.node))
1000
        else:
1001
            filtered = select([self.nodes.c.latest_version])
1002
            filtered = filtered.where(self.nodes.c.node == \
1003
                            self.versions.c.node).correlate(self.versions)
1004
        s = s.where(self.versions.c.serial == filtered)
1005
        s = s.where(self.versions.c.cluster != except_cluster)
1006
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1007
                                        self.nodes.c.parent == parent)))
1008
        s = s.where(self.attributes.c.serial == self.versions.c.serial)
1009
        s = s.where(self.attributes.c.domain == domain)
1010
        s = s.where(self.nodes.c.node == self.versions.c.node)
1011
        s = s.order_by(self.attributes.c.key)
1012
        conja = []
1013
        conjb = []
1014
        for path, match in pathq:
1015
            if match == MATCH_PREFIX:
1016
                conja.append(self.nodes.c.path.like(self.escape_like(path) + '%',
1017
                                          escape=ESCAPE_CHAR))
1018
            elif match == MATCH_EXACT:
1019
                conjb.append(path)
1020
        if conja or conjb:
1021
            s = s.where(or_(self.nodes.c.path.in_(conjb),*conja))
1022
        rp = self.conn.execute(s)
1023
        rows = rp.fetchall()
1024
        rp.close()
1025
        return [r[0] for r in rows]
1026

    
1027
    def latest_version_list(self, parent, prefix='', delimiter=None,
1028
                            start='', limit=10000, before=inf,
1029
                            except_cluster=0, pathq=[], domain=None,
1030
                            filterq=[], sizeq=None, all_props=False):
1031
        """Return a (list of (path, serial) tuples, list of common prefixes)
1032
           for the current versions of the paths with the given parent,
1033
           matching the following criteria.
1034

1035
           The property tuple for a version is returned if all
1036
           of these conditions are true:
1037

1038
                a. parent matches
1039

1040
                b. path > start
1041

1042
                c. path starts with prefix (and paths in pathq)
1043

1044
                d. version is the max up to before
1045

1046
                e. version is not in cluster
1047

1048
                f. the path does not have the delimiter occuring
1049
                   after the prefix, or ends with the delimiter
1050

1051
                g. serial matches the attribute filter query.
1052

1053
                   A filter query is a comma-separated list of
1054
                   terms in one of these three forms:
1055

1056
                   key
1057
                       an attribute with this key must exist
1058

1059
                   !key
1060
                       an attribute with this key must not exist
1061

1062
                   key ?op value
1063
                       the attribute with this key satisfies the value
1064
                       where ?op is one of ==, != <=, >=, <, >.
1065

1066
                h. the size is in the range set by sizeq
1067

1068
           The list of common prefixes includes the prefixes
1069
           matching up to the first delimiter after prefix,
1070
           and are reported only once, as "virtual directories".
1071
           The delimiter is included in the prefixes.
1072

1073
           If arguments are None, then the corresponding matching rule
1074
           will always match.
1075

1076
           Limit applies to the first list of tuples returned.
1077

1078
           If all_props is True, return all properties after path,
1079
           not just serial.
1080
        """
1081

    
1082
        if not start or start < prefix:
1083
            start = strprevling(prefix)
1084
        nextling = strnextling(prefix)
1085

    
1086
        v = self.versions.alias('v')
1087
        n = self.nodes.alias('n')
1088
        if before != inf:
1089
            filtered = select([func.max(v.c.serial)],
1090
                              and_(v.c.mtime < before,
1091
                                   v.c.node == self.versions.c.node))
1092
            inner_join = \
1093
                    self.nodes.join(self.versions,
1094
                            onclause=self.versions.c.serial==filtered)
1095
        else:
1096
            filtered = select([self.nodes.c.latest_version])
1097
            filtered = filtered.where(self.nodes.c.node == self.versions.c.node).correlate(self.versions)
1098
            inner_join = \
1099
                    self.nodes.join(self.versions,
1100
                            onclause=\
1101
                            self.versions.c.serial==filtered)
1102
        if not all_props:
1103
            s = select([self.nodes.c.path,
1104
                self.versions.c.serial],from_obj=[inner_join]).distinct()
1105
        else:
1106
            s = select([self.nodes.c.path,
1107
                        self.versions.c.serial, self.versions.c.node,
1108
                        self.versions.c.hash,
1109
                        self.versions.c.size, self.versions.c.type,
1110
                        self.versions.c.source,
1111
                        self.versions.c.mtime, self.versions.c.muser,
1112
                        self.versions.c.uuid,
1113
                        self.versions.c.checksum,
1114
                        self.versions.c.cluster],from_obj=[inner_join]).distinct()
1115

    
1116
        s = s.where(self.versions.c.cluster != except_cluster)
1117
        s = s.where(self.versions.c.node.in_(select([self.nodes.c.node],
1118
                                             self.nodes.c.parent == parent)))
1119

    
1120
        s = s.where(self.versions.c.node == self.nodes.c.node)
1121
        s = s.where(and_(self.nodes.c.path > bindparam('start'),
1122
                         self.nodes.c.path < nextling))
1123
        conja = []
1124
        conjb = []
1125
        for path, match in pathq:
1126
            if match == MATCH_PREFIX:
1127
                conja.append(self.nodes.c.path.like(self.escape_like(path) + '%',
1128
                             escape=ESCAPE_CHAR))
1129
            elif match == MATCH_EXACT:
1130
                conjb.append(path)
1131
        if conja or conjb:
1132
            s = s.where(or_(self.nodes.c.path.in_(conjb),*conja))
1133

    
1134
        if sizeq and len(sizeq) == 2:
1135
            if sizeq[0]:
1136
                s = s.where(self.versions.c.size >= sizeq[0])
1137
            if sizeq[1]:
1138
                s = s.where(self.versions.c.size < sizeq[1])
1139

    
1140
        if domain and filterq:
1141
            a = self.attributes.alias('a')
1142
            included, excluded, opers = parse_filters(filterq)
1143
            if included:
1144
                subs = select([1])
1145
                subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1146
                subs = subs.where(self.attributes.c.domain == domain)
1147
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in included]))
1148
                s = s.where(exists(subs))
1149
            if excluded:
1150
                subs = select([1])
1151
                subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1152
                subs = subs.where(self.attributes.c.domain == domain)
1153
                subs = subs.where(or_(*[self.attributes.c.key.op('=')(x) for x in excluded]))
1154
                s = s.where(not_(exists(subs)))
1155
            if opers:
1156
                for k, o, val in opers:
1157
                    subs = select([1])
1158
                    subs = subs.where(self.attributes.c.serial == self.versions.c.serial).correlate(self.versions)
1159
                    subs = subs.where(self.attributes.c.domain == domain)
1160
                    subs = subs.where(
1161
                        and_(self.attributes.c.key.op('=')(k), self.attributes.c.value.op(o)(val)))
1162
                    s = s.where(exists(subs))
1163

    
1164
        s = s.order_by(self.nodes.c.path)
1165

    
1166
        if not delimiter:
1167
            s = s.limit(limit)
1168
            rp = self.conn.execute(s, start=start)
1169
            r = rp.fetchall()
1170
            rp.close()
1171
            return r, ()
1172

    
1173
        pfz = len(prefix)
1174
        dz = len(delimiter)
1175
        count = 0
1176
        prefixes = []
1177
        pappend = prefixes.append
1178
        matches = []
1179
        mappend = matches.append
1180

    
1181
        rp = self.conn.execute(s, start=start)
1182
        while True:
1183
            props = rp.fetchone()
1184
            if props is None:
1185
                break
1186
            path = props[0]
1187
            idx = path.find(delimiter, pfz)
1188

    
1189
            if idx < 0:
1190
                mappend(props)
1191
                count += 1
1192
                if count >= limit:
1193
                    break
1194
                continue
1195

    
1196
            if idx + dz == len(path):
1197
                mappend(props)
1198
                count += 1
1199
                continue  # Get one more, in case there is a path.
1200
            pf = path[:idx + dz]
1201
            pappend(pf)
1202
            if count >= limit:
1203
                break
1204

    
1205
            rp = self.conn.execute(s, start=strnextling(pf))  # New start.
1206
        rp.close()
1207

    
1208
        return matches, prefixes
1209

    
1210
    def latest_uuid(self, uuid, cluster):
1211
        """Return the latest version of the given uuid and cluster.
1212

1213
        Return a (path, serial) tuple.
1214
        If cluster is None, all clusters are considered.
1215

1216
        """
1217

    
1218
        v = self.versions.alias('v')
1219
        n = self.nodes.alias('n')
1220
        s = select([n.c.path, v.c.serial])
1221
        filtered = select([func.max(self.versions.c.serial)])
1222
        filtered = filtered.where(self.versions.c.uuid == uuid)
1223
        if cluster is not None:
1224
            filtered = filtered.where(self.versions.c.cluster == cluster)
1225
        s = s.where(v.c.serial == filtered)
1226
        s = s.where(n.c.node == v.c.node)
1227

    
1228
        r = self.conn.execute(s)
1229
        l = r.fetchone()
1230
        r.close()
1231
        return l
1232

    
1233
    def domain_object_list(self, domain, paths, cluster=None):
1234
        """Return a list of (path, property list, attribute dictionary)
1235
           for the objects in the specific domain and cluster.
1236
        """
1237

    
1238
        v = self.versions.alias('v')
1239
        n = self.nodes.alias('n')
1240
        a = self.attributes.alias('a')
1241

    
1242
        s = select([n.c.path, v.c.serial, v.c.node, v.c.hash, v.c.size,
1243
                    v.c.type, v.c.source, v.c.mtime, v.c.muser, v.c.uuid,
1244
                    v.c.checksum, v.c.cluster, a.c.key, a.c.value])
1245
        if cluster:
1246
            s = s.where(v.c.cluster == cluster)
1247
        s = s.where(v.c.serial == a.c.serial)
1248
        s = s.where(a.c.domain == domain)
1249
        s = s.where(a.c.node == n.c.node)
1250
        s = s.where(a.c.is_latest == True)
1251
        if paths:
1252
            s = s.where(n.c.path.in_(paths))
1253

    
1254
        r = self.conn.execute(s)
1255
        rows = r.fetchall()
1256
        r.close()
1257

    
1258
        group_by = itemgetter(slice(12))
1259
        rows.sort(key=group_by)
1260
        groups = groupby(rows, group_by)
1261
        return [(k[0], k[1:], dict([i[12:] for i in data])) for
1262
                (k, data) in groups]
1263

    
1264
    def get_props(self, paths):
1265
        inner_join = \
1266
            self.nodes.join(self.versions,
1267
                onclause=self.versions.c.serial == self.nodes.c.latest_version)
1268
        cc = self.nodes.c.path.in_(paths)
1269
        s = select([self.nodes.c.path, self.versions.c.type],
1270
                    from_obj=[inner_join]).where(cc).distinct()
1271
        r = self.conn.execute(s)
1272
        rows = r.fetchall()
1273
        r.close()
1274
        if rows:
1275
            return rows
1276
        return None