Statistics
| Branch: | Tag: | Revision:

root / pithos / backends / lib / sqlalchemy / node.py @ f897bea9

History | View | Annotate | Download (35.4 kB)

1 4f917833 Sofia Papagiannaki
# Copyright 2011 GRNET S.A. All rights reserved.
2 4f917833 Sofia Papagiannaki
# 
3 4f917833 Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
4 4f917833 Sofia Papagiannaki
# without modification, are permitted provided that the following
5 4f917833 Sofia Papagiannaki
# conditions are met:
6 4f917833 Sofia Papagiannaki
# 
7 4f917833 Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
8 4f917833 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
9 4f917833 Sofia Papagiannaki
#      disclaimer.
10 4f917833 Sofia Papagiannaki
# 
11 4f917833 Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
12 4f917833 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
13 4f917833 Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
14 4f917833 Sofia Papagiannaki
#      provided with the distribution.
15 4f917833 Sofia Papagiannaki
# 
16 4f917833 Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 4f917833 Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 4f917833 Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 4f917833 Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 4f917833 Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 4f917833 Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 4f917833 Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 4f917833 Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 4f917833 Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 4f917833 Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 4f917833 Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 4f917833 Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
28 4f917833 Sofia Papagiannaki
# 
29 4f917833 Sofia Papagiannaki
# The views and conclusions contained in the software and
30 4f917833 Sofia Papagiannaki
# documentation are those of the authors and should not be
31 4f917833 Sofia Papagiannaki
# interpreted as representing official policies, either expressed
32 4f917833 Sofia Papagiannaki
# or implied, of GRNET S.A.
33 4f917833 Sofia Papagiannaki
34 4f917833 Sofia Papagiannaki
from time import time
35 c48acbfd Sofia Papagiannaki
from sqlalchemy import Table, Integer, BigInteger, DECIMAL, Column, String, MetaData, ForeignKey
36 e414f54d Sofia Papagiannaki
from sqlalchemy.types import Text
37 18266c93 Sofia Papagiannaki
from sqlalchemy.schema import Index, Sequence
38 3d13f97a Sofia Papagiannaki
from sqlalchemy.sql import func, and_, or_, not_, null, select, bindparam, text
39 4f1bc0a6 Sofia Papagiannaki
from sqlalchemy.ext.compiler import compiles
40 e414f54d Sofia Papagiannaki
from sqlalchemy.engine.reflection import Inspector
41 4f1bc0a6 Sofia Papagiannaki
42 4f917833 Sofia Papagiannaki
from dbworker import DBWorker
43 4f917833 Sofia Papagiannaki
44 f897bea9 Sofia Papagiannaki
from pithos.lib.filter import parse_filters, OPERATORS
45 059857e2 Antony Chazapis
46 26f81fe9 Sofia Papagiannaki
ROOTNODE  = 0
47 4f917833 Sofia Papagiannaki
48 25ae8b75 Antony Chazapis
( SERIAL, NODE, HASH, SIZE, SOURCE, MTIME, MUSER, UUID, CLUSTER ) = range(9)
49 4f917833 Sofia Papagiannaki
50 4f917833 Sofia Papagiannaki
inf = float('inf')
51 4f917833 Sofia Papagiannaki
52 4f917833 Sofia Papagiannaki
53 4f917833 Sofia Papagiannaki
def strnextling(prefix):
54 4f917833 Sofia Papagiannaki
    """Return the first unicode string
55 4f917833 Sofia Papagiannaki
       greater than but not starting with given prefix.
56 4f917833 Sofia Papagiannaki
       strnextling('hello') -> 'hellp'
57 4f917833 Sofia Papagiannaki
    """
58 4f917833 Sofia Papagiannaki
    if not prefix:
59 4f917833 Sofia Papagiannaki
        ## all strings start with the null string,
60 4f917833 Sofia Papagiannaki
        ## therefore we have to approximate strnextling('')
61 4f917833 Sofia Papagiannaki
        ## with the last unicode character supported by python
62 4f917833 Sofia Papagiannaki
        ## 0x10ffff for wide (32-bit unicode) python builds
63 4f917833 Sofia Papagiannaki
        ## 0x00ffff for narrow (16-bit unicode) python builds
64 4f917833 Sofia Papagiannaki
        ## We will not autodetect. 0xffff is safe enough.
65 4f917833 Sofia Papagiannaki
        return unichr(0xffff)
66 4f917833 Sofia Papagiannaki
    s = prefix[:-1]
67 4f917833 Sofia Papagiannaki
    c = ord(prefix[-1])
68 4f917833 Sofia Papagiannaki
    if c >= 0xffff:
69 4f917833 Sofia Papagiannaki
        raise RuntimeError
70 4f917833 Sofia Papagiannaki
    s += unichr(c+1)
71 4f917833 Sofia Papagiannaki
    return s
72 4f917833 Sofia Papagiannaki
73 4f917833 Sofia Papagiannaki
def strprevling(prefix):
74 4f917833 Sofia Papagiannaki
    """Return an approximation of the last unicode string
75 4f917833 Sofia Papagiannaki
       less than but not starting with given prefix.
76 4f917833 Sofia Papagiannaki
       strprevling(u'hello') -> u'helln\\xffff'
77 4f917833 Sofia Papagiannaki
    """
78 4f917833 Sofia Papagiannaki
    if not prefix:
79 4f917833 Sofia Papagiannaki
        ## There is no prevling for the null string
80 4f917833 Sofia Papagiannaki
        return prefix
81 4f917833 Sofia Papagiannaki
    s = prefix[:-1]
82 4f917833 Sofia Papagiannaki
    c = ord(prefix[-1])
83 4f917833 Sofia Papagiannaki
    if c > 0:
84 2e2c7257 Sofia Papagiannaki
        s += unichr(c-1) + unichr(0xffff)
85 4f917833 Sofia Papagiannaki
    return s
86 4f917833 Sofia Papagiannaki
87 4f917833 Sofia Papagiannaki
_propnames = {
88 4f917833 Sofia Papagiannaki
    'serial'    : 0,
89 4f917833 Sofia Papagiannaki
    'node'      : 1,
90 1c2fc0ff Antony Chazapis
    'hash'      : 2,
91 1c2fc0ff Antony Chazapis
    'size'      : 3,
92 1c2fc0ff Antony Chazapis
    'source'    : 4,
93 1c2fc0ff Antony Chazapis
    'mtime'     : 5,
94 1c2fc0ff Antony Chazapis
    'muser'     : 6,
95 25ae8b75 Antony Chazapis
    'uuid'      : 7,
96 25ae8b75 Antony Chazapis
    'cluster'   : 8
97 4f917833 Sofia Papagiannaki
}
98 4f917833 Sofia Papagiannaki
99 4f917833 Sofia Papagiannaki
100 4f917833 Sofia Papagiannaki
class Node(DBWorker):
101 4f917833 Sofia Papagiannaki
    """Nodes store path organization and have multiple versions.
102 4f917833 Sofia Papagiannaki
       Versions store object history and have multiple attributes.
103 4f917833 Sofia Papagiannaki
       Attributes store metadata.
104 4f917833 Sofia Papagiannaki
    """
105 4f917833 Sofia Papagiannaki
    
106 4f917833 Sofia Papagiannaki
    # TODO: Provide an interface for included and excluded clusters.
107 4f917833 Sofia Papagiannaki
    
108 4f917833 Sofia Papagiannaki
    def __init__(self, **params):
109 4f917833 Sofia Papagiannaki
        DBWorker.__init__(self, **params)
110 4f917833 Sofia Papagiannaki
        metadata = MetaData()
111 4f917833 Sofia Papagiannaki
        
112 4f917833 Sofia Papagiannaki
        #create nodes table
113 4f917833 Sofia Papagiannaki
        columns=[]
114 4f917833 Sofia Papagiannaki
        columns.append(Column('node', Integer, primary_key=True))
115 4f917833 Sofia Papagiannaki
        columns.append(Column('parent', Integer,
116 4f917833 Sofia Papagiannaki
                              ForeignKey('nodes.node',
117 4f917833 Sofia Papagiannaki
                                         ondelete='CASCADE',
118 4f917833 Sofia Papagiannaki
                                         onupdate='CASCADE'),
119 18266c93 Sofia Papagiannaki
                              autoincrement=False))
120 e414f54d Sofia Papagiannaki
        path_length = 2048
121 3a4a6892 Sofia Papagiannaki
        columns.append(Column('path', String(path_length), default='', nullable=False))
122 8af4c26d Antony Chazapis
        self.nodes = Table('nodes', metadata, *columns, mysql_engine='InnoDB')
123 4f917833 Sofia Papagiannaki
        
124 5e7485da Antony Chazapis
        #create policy table
125 5e7485da Antony Chazapis
        columns=[]
126 5e7485da Antony Chazapis
        columns.append(Column('node', Integer,
127 5e7485da Antony Chazapis
                              ForeignKey('nodes.node',
128 5e7485da Antony Chazapis
                                         ondelete='CASCADE',
129 5e7485da Antony Chazapis
                                         onupdate='CASCADE'),
130 5e7485da Antony Chazapis
                              primary_key=True))
131 5e7485da Antony Chazapis
        columns.append(Column('key', String(255), primary_key=True))
132 5e7485da Antony Chazapis
        columns.append(Column('value', String(255)))
133 5e7485da Antony Chazapis
        self.policies = Table('policy', metadata, *columns, mysql_engine='InnoDB')
134 5e7485da Antony Chazapis
        
135 4f917833 Sofia Papagiannaki
        #create statistics table
136 4f917833 Sofia Papagiannaki
        columns=[]
137 4f917833 Sofia Papagiannaki
        columns.append(Column('node', Integer,
138 4f917833 Sofia Papagiannaki
                              ForeignKey('nodes.node',
139 4f917833 Sofia Papagiannaki
                                         ondelete='CASCADE',
140 4f917833 Sofia Papagiannaki
                                         onupdate='CASCADE'),
141 4f917833 Sofia Papagiannaki
                              primary_key=True))
142 18266c93 Sofia Papagiannaki
        columns.append(Column('population', Integer, nullable=False, default=0))
143 5bf54048 Sofia Papagiannaki
        columns.append(Column('size', BigInteger, nullable=False, default=0))
144 be266a11 Sofia Papagiannaki
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
145 18266c93 Sofia Papagiannaki
        columns.append(Column('cluster', Integer, nullable=False, default=0,
146 8ed4d90d Antony Chazapis
                              primary_key=True, autoincrement=False))
147 8af4c26d Antony Chazapis
        self.statistics = Table('statistics', metadata, *columns, mysql_engine='InnoDB')
148 4f917833 Sofia Papagiannaki
        
149 4f917833 Sofia Papagiannaki
        #create versions table
150 4f917833 Sofia Papagiannaki
        columns=[]
151 18266c93 Sofia Papagiannaki
        columns.append(Column('serial', Integer, primary_key=True))
152 4f917833 Sofia Papagiannaki
        columns.append(Column('node', Integer,
153 4f917833 Sofia Papagiannaki
                              ForeignKey('nodes.node',
154 4f917833 Sofia Papagiannaki
                                         ondelete='CASCADE',
155 18266c93 Sofia Papagiannaki
                                         onupdate='CASCADE')))
156 1c2fc0ff Antony Chazapis
        columns.append(Column('hash', String(255)))
157 5bf54048 Sofia Papagiannaki
        columns.append(Column('size', BigInteger, nullable=False, default=0))
158 18266c93 Sofia Papagiannaki
        columns.append(Column('source', Integer))
159 be266a11 Sofia Papagiannaki
        columns.append(Column('mtime', DECIMAL(precision=16, scale=6)))
160 4f917833 Sofia Papagiannaki
        columns.append(Column('muser', String(255), nullable=False, default=''))
161 25ae8b75 Antony Chazapis
        columns.append(Column('uuid', String(64), nullable=False, default=''))
162 18266c93 Sofia Papagiannaki
        columns.append(Column('cluster', Integer, nullable=False, default=0))
163 8af4c26d Antony Chazapis
        self.versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
164 25ae8b75 Antony Chazapis
        Index('idx_versions_node_mtime', self.versions.c.node, self.versions.c.mtime)
165 25ae8b75 Antony Chazapis
        Index('idx_versions_node_uuid', self.versions.c.uuid)
166 4f917833 Sofia Papagiannaki
        
167 4f917833 Sofia Papagiannaki
        #create attributes table
168 4f917833 Sofia Papagiannaki
        columns = []
169 4f917833 Sofia Papagiannaki
        columns.append(Column('serial', Integer,
170 4f917833 Sofia Papagiannaki
                              ForeignKey('versions.serial',
171 4f917833 Sofia Papagiannaki
                                         ondelete='CASCADE',
172 4f917833 Sofia Papagiannaki
                                         onupdate='CASCADE'),
173 4f917833 Sofia Papagiannaki
                              primary_key=True))
174 059857e2 Antony Chazapis
        columns.append(Column('domain', String(255), primary_key=True))
175 4f917833 Sofia Papagiannaki
        columns.append(Column('key', String(255), primary_key=True))
176 4f917833 Sofia Papagiannaki
        columns.append(Column('value', String(255)))
177 8af4c26d Antony Chazapis
        self.attributes = Table('attributes', metadata, *columns, mysql_engine='InnoDB')
178 4f917833 Sofia Papagiannaki
        
179 4f917833 Sofia Papagiannaki
        metadata.create_all(self.engine)
180 4f917833 Sofia Papagiannaki
        
181 e414f54d Sofia Papagiannaki
        # the following code creates an index of specific length
182 e414f54d Sofia Papagiannaki
        # this can be accompliced in sqlalchemy >= 0.7.3
183 e414f54d Sofia Papagiannaki
        # providing mysql_length option during index creation
184 e414f54d Sofia Papagiannaki
        insp = Inspector.from_engine(self.engine)
185 e414f54d Sofia Papagiannaki
        indexes = [elem['name'] for elem in insp.get_indexes('nodes')]
186 e414f54d Sofia Papagiannaki
        if 'idx_nodes_path' not in indexes:
187 3a4a6892 Sofia Papagiannaki
            explicit_length = '(%s)' %path_length if self.engine.name == 'mysql' else ''
188 12cd0417 Sofia Papagiannaki
            s = text('CREATE INDEX idx_nodes_path ON nodes (path%s)' %explicit_length)
189 e414f54d Sofia Papagiannaki
            self.conn.execute(s).close()
190 e414f54d Sofia Papagiannaki
        
191 d3be972a Sofia Papagiannaki
        s = self.nodes.select().where(and_(self.nodes.c.node == ROOTNODE,
192 d3be972a Sofia Papagiannaki
                                           self.nodes.c.parent == ROOTNODE))
193 9eb713e1 Sofia Papagiannaki
        rp = self.conn.execute(s)
194 9eb713e1 Sofia Papagiannaki
        r = rp.fetchone()
195 9eb713e1 Sofia Papagiannaki
        rp.close()
196 18266c93 Sofia Papagiannaki
        if not r:
197 18266c93 Sofia Papagiannaki
            s = self.nodes.insert().values(node=ROOTNODE, parent=ROOTNODE)
198 18266c93 Sofia Papagiannaki
            self.conn.execute(s)
199 4f917833 Sofia Papagiannaki
    
200 4f917833 Sofia Papagiannaki
    def node_create(self, parent, path):
201 4f917833 Sofia Papagiannaki
        """Create a new node from the given properties.
202 4f917833 Sofia Papagiannaki
           Return the node identifier of the new node.
203 4f917833 Sofia Papagiannaki
        """
204 18266c93 Sofia Papagiannaki
        #TODO catch IntegrityError?
205 18266c93 Sofia Papagiannaki
        s = self.nodes.insert().values(parent=parent, path=path)
206 18266c93 Sofia Papagiannaki
        r = self.conn.execute(s)
207 18266c93 Sofia Papagiannaki
        inserted_primary_key = r.inserted_primary_key[0]
208 18266c93 Sofia Papagiannaki
        r.close()
209 18266c93 Sofia Papagiannaki
        return inserted_primary_key
210 4f917833 Sofia Papagiannaki
    
211 4f917833 Sofia Papagiannaki
    def node_lookup(self, path):
212 4f917833 Sofia Papagiannaki
        """Lookup the current node of the given path.
213 4f917833 Sofia Papagiannaki
           Return None if the path is not found.
214 4f917833 Sofia Papagiannaki
        """
215 4f917833 Sofia Papagiannaki
        
216 6b20cfbc Antony Chazapis
        # Use LIKE for comparison to avoid MySQL problems with trailing spaces.
217 7759260d Antony Chazapis
        s = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path), escape='\\'))
218 18266c93 Sofia Papagiannaki
        r = self.conn.execute(s)
219 18266c93 Sofia Papagiannaki
        row = r.fetchone()
220 18266c93 Sofia Papagiannaki
        r.close()
221 18266c93 Sofia Papagiannaki
        if row:
222 18266c93 Sofia Papagiannaki
            return row[0]
223 4f917833 Sofia Papagiannaki
        return None
224 4f917833 Sofia Papagiannaki
    
225 4f917833 Sofia Papagiannaki
    def node_get_properties(self, node):
226 4f917833 Sofia Papagiannaki
        """Return the node's (parent, path).
227 4f917833 Sofia Papagiannaki
           Return None if the node is not found.
228 4f917833 Sofia Papagiannaki
        """
229 4f917833 Sofia Papagiannaki
        
230 18266c93 Sofia Papagiannaki
        s = select([self.nodes.c.parent, self.nodes.c.path])
231 18266c93 Sofia Papagiannaki
        s = s.where(self.nodes.c.node == node)
232 18266c93 Sofia Papagiannaki
        r = self.conn.execute(s)
233 18266c93 Sofia Papagiannaki
        l = r.fetchone()
234 18266c93 Sofia Papagiannaki
        r.close()
235 18266c93 Sofia Papagiannaki
        return l
236 4f917833 Sofia Papagiannaki
    
237 4f917833 Sofia Papagiannaki
    def node_get_versions(self, node, keys=(), propnames=_propnames):
238 4f917833 Sofia Papagiannaki
        """Return the properties of all versions at node.
239 4f917833 Sofia Papagiannaki
           If keys is empty, return all properties in the order
240 25ae8b75 Antony Chazapis
           (serial, node, hash, size, source, mtime, muser, uuid, cluster).
241 4f917833 Sofia Papagiannaki
        """
242 4f917833 Sofia Papagiannaki
        
243 1c2fc0ff Antony Chazapis
        s = select([self.versions.c.serial,
244 1c2fc0ff Antony Chazapis
                    self.versions.c.node,
245 1c2fc0ff Antony Chazapis
                    self.versions.c.hash,
246 1c2fc0ff Antony Chazapis
                    self.versions.c.size,
247 1c2fc0ff Antony Chazapis
                    self.versions.c.source,
248 1c2fc0ff Antony Chazapis
                    self.versions.c.mtime,
249 1c2fc0ff Antony Chazapis
                    self.versions.c.muser,
250 25ae8b75 Antony Chazapis
                    self.versions.c.uuid,
251 1c2fc0ff Antony Chazapis
                    self.versions.c.cluster], self.versions.c.node == node)
252 cc28f894 Sofia Papagiannaki
        s = s.order_by(self.versions.c.serial)
253 18266c93 Sofia Papagiannaki
        r = self.conn.execute(s)
254 18266c93 Sofia Papagiannaki
        rows = r.fetchall()
255 9eb713e1 Sofia Papagiannaki
        r.close()
256 18266c93 Sofia Papagiannaki
        if not rows:
257 18266c93 Sofia Papagiannaki
            return rows
258 4f917833 Sofia Papagiannaki
        
259 4f917833 Sofia Papagiannaki
        if not keys:
260 18266c93 Sofia Papagiannaki
            return rows
261 18266c93 Sofia Papagiannaki
        
262 18266c93 Sofia Papagiannaki
        return [[p[propnames[k]] for k in keys if k in propnames] for p in rows]
263 4f917833 Sofia Papagiannaki
    
264 4f917833 Sofia Papagiannaki
    def node_count_children(self, node):
265 4f917833 Sofia Papagiannaki
        """Return node's child count."""
266 4f917833 Sofia Papagiannaki
        
267 18266c93 Sofia Papagiannaki
        s = select([func.count(self.nodes.c.node)])
268 18266c93 Sofia Papagiannaki
        s = s.where(and_(self.nodes.c.parent == node,
269 18266c93 Sofia Papagiannaki
                         self.nodes.c.node != ROOTNODE))
270 18266c93 Sofia Papagiannaki
        r = self.conn.execute(s)
271 18266c93 Sofia Papagiannaki
        row = r.fetchone()
272 18266c93 Sofia Papagiannaki
        r.close()
273 18266c93 Sofia Papagiannaki
        return row[0]
274 4f917833 Sofia Papagiannaki
    
275 4f917833 Sofia Papagiannaki
    def node_purge_children(self, parent, before=inf, cluster=0):
276 4f917833 Sofia Papagiannaki
        """Delete all versions with the specified
277 4f917833 Sofia Papagiannaki
           parent and cluster, and return
278 04230536 Antony Chazapis
           the hashes of versions deleted.
279 4f917833 Sofia Papagiannaki
           Clears out nodes with no remaining versions.
280 4f917833 Sofia Papagiannaki
        """
281 b43d44ad Sofia Papagiannaki
        #update statistics
282 b43d44ad Sofia Papagiannaki
        c1 = select([self.nodes.c.node],
283 b43d44ad Sofia Papagiannaki
            self.nodes.c.parent == parent)
284 b43d44ad Sofia Papagiannaki
        where_clause = and_(self.versions.c.node.in_(c1),
285 62d938dc Sofia Papagiannaki
                            self.versions.c.cluster == cluster)
286 18266c93 Sofia Papagiannaki
        s = select([func.count(self.versions.c.serial),
287 18266c93 Sofia Papagiannaki
                    func.sum(self.versions.c.size)])
288 18266c93 Sofia Papagiannaki
        s = s.where(where_clause)
289 62d938dc Sofia Papagiannaki
        if before != inf:
290 62d938dc Sofia Papagiannaki
            s = s.where(self.versions.c.mtime <= before)
291 18266c93 Sofia Papagiannaki
        r = self.conn.execute(s)
292 18266c93 Sofia Papagiannaki
        row = r.fetchone()
293 18266c93 Sofia Papagiannaki
        r.close()
294 18266c93 Sofia Papagiannaki
        if not row:
295 4f917833 Sofia Papagiannaki
            return ()
296 18266c93 Sofia Papagiannaki
        nr, size = row[0], -row[1] if row[1] else 0
297 4f917833 Sofia Papagiannaki
        mtime = time()
298 18266c93 Sofia Papagiannaki
        self.statistics_update(parent, -nr, size, mtime, cluster)
299 18266c93 Sofia Papagiannaki
        self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
300 4f917833 Sofia Papagiannaki
        
301 e1f09a6e Antony Chazapis
        s = select([self.versions.c.hash])
302 18266c93 Sofia Papagiannaki
        s = s.where(where_clause)
303 18266c93 Sofia Papagiannaki
        r = self.conn.execute(s)
304 e1f09a6e Antony Chazapis
        hashes = [row[0] for row in r.fetchall()]
305 18266c93 Sofia Papagiannaki
        r.close()
306 18266c93 Sofia Papagiannaki
        
307 b43d44ad Sofia Papagiannaki
        #delete versions
308 18266c93 Sofia Papagiannaki
        s = self.versions.delete().where(where_clause)
309 18266c93 Sofia Papagiannaki
        r = self.conn.execute(s)
310 18266c93 Sofia Papagiannaki
        r.close()
311 18266c93 Sofia Papagiannaki
        
312 18266c93 Sofia Papagiannaki
        #delete nodes
313 b43d44ad Sofia Papagiannaki
        s = select([self.nodes.c.node],
314 b43d44ad Sofia Papagiannaki
            and_(self.nodes.c.parent == parent,
315 b43d44ad Sofia Papagiannaki
                 select([func.count(self.versions.c.serial)],
316 b43d44ad Sofia Papagiannaki
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
317 70516d86 Sofia Papagiannaki
        rp = self.conn.execute(s)
318 70516d86 Sofia Papagiannaki
        nodes = [r[0] for r in rp.fetchall()]
319 70516d86 Sofia Papagiannaki
        rp.close()
320 b43d44ad Sofia Papagiannaki
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
321 18266c93 Sofia Papagiannaki
        self.conn.execute(s).close()
322 b43d44ad Sofia Papagiannaki
        
323 04230536 Antony Chazapis
        return hashes
324 4f917833 Sofia Papagiannaki
    
325 4f917833 Sofia Papagiannaki
    def node_purge(self, node, before=inf, cluster=0):
326 4f917833 Sofia Papagiannaki
        """Delete all versions with the specified
327 4f917833 Sofia Papagiannaki
           node and cluster, and return
328 04230536 Antony Chazapis
           the hashes of versions deleted.
329 4f917833 Sofia Papagiannaki
           Clears out the node if it has no remaining versions.
330 4f917833 Sofia Papagiannaki
        """
331 4f917833 Sofia Papagiannaki
        
332 b43d44ad Sofia Papagiannaki
        #update statistics
333 b43d44ad Sofia Papagiannaki
        s = select([func.count(self.versions.c.serial),
334 b43d44ad Sofia Papagiannaki
                    func.sum(self.versions.c.size)])
335 b43d44ad Sofia Papagiannaki
        where_clause = and_(self.versions.c.node == node,
336 62d938dc Sofia Papagiannaki
                         self.versions.c.cluster == cluster)
337 b43d44ad Sofia Papagiannaki
        s = s.where(where_clause)
338 62d938dc Sofia Papagiannaki
        if before != inf:
339 62d938dc Sofia Papagiannaki
            s = s.where(self.versions.c.mtime <= before)
340 b43d44ad Sofia Papagiannaki
        r = self.conn.execute(s)
341 b43d44ad Sofia Papagiannaki
        row = r.fetchone()
342 b43d44ad Sofia Papagiannaki
        nr, size = row[0], row[1]
343 b43d44ad Sofia Papagiannaki
        r.close()
344 4f917833 Sofia Papagiannaki
        if not nr:
345 4f917833 Sofia Papagiannaki
            return ()
346 4f917833 Sofia Papagiannaki
        mtime = time()
347 4f917833 Sofia Papagiannaki
        self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
348 4f917833 Sofia Papagiannaki
        
349 e1f09a6e Antony Chazapis
        s = select([self.versions.c.hash])
350 b43d44ad Sofia Papagiannaki
        s = s.where(where_clause)
351 b43d44ad Sofia Papagiannaki
        r = self.conn.execute(s)
352 e1f09a6e Antony Chazapis
        hashes = [r[0] for r in r.fetchall()]
353 9eb713e1 Sofia Papagiannaki
        r.close()
354 b43d44ad Sofia Papagiannaki
        
355 b43d44ad Sofia Papagiannaki
        #delete versions
356 b43d44ad Sofia Papagiannaki
        s = self.versions.delete().where(where_clause)
357 b43d44ad Sofia Papagiannaki
        r = self.conn.execute(s)
358 b43d44ad Sofia Papagiannaki
        r.close()
359 b43d44ad Sofia Papagiannaki
        
360 b43d44ad Sofia Papagiannaki
        #delete nodes
361 b43d44ad Sofia Papagiannaki
        s = select([self.nodes.c.node],
362 b43d44ad Sofia Papagiannaki
            and_(self.nodes.c.node == node,
363 b43d44ad Sofia Papagiannaki
                 select([func.count(self.versions.c.serial)],
364 b43d44ad Sofia Papagiannaki
                    self.versions.c.node == self.nodes.c.node).as_scalar() == 0))
365 b43d44ad Sofia Papagiannaki
        r = self.conn.execute(s)
366 b43d44ad Sofia Papagiannaki
        nodes = r.fetchall()
367 b43d44ad Sofia Papagiannaki
        r.close()
368 b43d44ad Sofia Papagiannaki
        s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
369 b43d44ad Sofia Papagiannaki
        self.conn.execute(s).close()
370 b43d44ad Sofia Papagiannaki
        
371 04230536 Antony Chazapis
        return hashes
372 4f917833 Sofia Papagiannaki
    
373 4f917833 Sofia Papagiannaki
    def node_remove(self, node):
374 4f917833 Sofia Papagiannaki
        """Remove the node specified.
375 4f917833 Sofia Papagiannaki
           Return false if the node has children or is not found.
376 4f917833 Sofia Papagiannaki
        """
377 4f917833 Sofia Papagiannaki
        
378 4f917833 Sofia Papagiannaki
        if self.node_count_children(node):
379 4f917833 Sofia Papagiannaki
            return False
380 4f917833 Sofia Papagiannaki
        
381 4f917833 Sofia Papagiannaki
        mtime = time()
382 b43d44ad Sofia Papagiannaki
        s = select([func.count(self.versions.c.serial),
383 b43d44ad Sofia Papagiannaki
                    func.sum(self.versions.c.size),
384 b43d44ad Sofia Papagiannaki
                    self.versions.c.cluster])
385 b43d44ad Sofia Papagiannaki
        s = s.where(self.versions.c.node == node)
386 b43d44ad Sofia Papagiannaki
        s = s.group_by(self.versions.c.cluster)
387 b43d44ad Sofia Papagiannaki
        r = self.conn.execute(s)
388 b43d44ad Sofia Papagiannaki
        for population, size, cluster in r.fetchall():
389 4f917833 Sofia Papagiannaki
            self.statistics_update_ancestors(node, -population, -size, mtime, cluster)
390 b43d44ad Sofia Papagiannaki
        r.close()
391 4f917833 Sofia Papagiannaki
        
392 b43d44ad Sofia Papagiannaki
        s = self.nodes.delete().where(self.nodes.c.node == node)
393 b43d44ad Sofia Papagiannaki
        self.conn.execute(s).close()
394 4f917833 Sofia Papagiannaki
        return True
395 4f917833 Sofia Papagiannaki
    
396 5e7485da Antony Chazapis
    def policy_get(self, node):
397 5e7485da Antony Chazapis
        s = select([self.policies.c.key, self.policies.c.value],
398 5e7485da Antony Chazapis
            self.policies.c.node==node)
399 5e7485da Antony Chazapis
        r = self.conn.execute(s)
400 5e7485da Antony Chazapis
        d = dict(r.fetchall())
401 5e7485da Antony Chazapis
        r.close()
402 5e7485da Antony Chazapis
        return d
403 5e7485da Antony Chazapis
    
404 5e7485da Antony Chazapis
    def policy_set(self, node, policy):
405 5e7485da Antony Chazapis
        #insert or replace
406 5e7485da Antony Chazapis
        for k, v in policy.iteritems():
407 5e7485da Antony Chazapis
            s = self.policies.update().where(and_(self.policies.c.node == node,
408 5e7485da Antony Chazapis
                                                  self.policies.c.key == k))
409 5e7485da Antony Chazapis
            s = s.values(value = v)
410 5e7485da Antony Chazapis
            rp = self.conn.execute(s)
411 5e7485da Antony Chazapis
            rp.close()
412 5e7485da Antony Chazapis
            if rp.rowcount == 0:
413 5e7485da Antony Chazapis
                s = self.policies.insert()
414 5e7485da Antony Chazapis
                values = {'node':node, 'key':k, 'value':v}
415 5e7485da Antony Chazapis
                r = self.conn.execute(s, values)
416 5e7485da Antony Chazapis
                r.close()
417 5e7485da Antony Chazapis
    
418 4f917833 Sofia Papagiannaki
    def statistics_get(self, node, cluster=0):
419 4f917833 Sofia Papagiannaki
        """Return population, total size and last mtime
420 4f917833 Sofia Papagiannaki
           for all versions under node that belong to the cluster.
421 4f917833 Sofia Papagiannaki
        """
422 4f917833 Sofia Papagiannaki
        
423 b43d44ad Sofia Papagiannaki
        s = select([self.statistics.c.population,
424 b43d44ad Sofia Papagiannaki
                    self.statistics.c.size,
425 b43d44ad Sofia Papagiannaki
                    self.statistics.c.mtime])
426 b43d44ad Sofia Papagiannaki
        s = s.where(and_(self.statistics.c.node == node,
427 b43d44ad Sofia Papagiannaki
                         self.statistics.c.cluster == cluster))
428 b43d44ad Sofia Papagiannaki
        r = self.conn.execute(s)
429 b43d44ad Sofia Papagiannaki
        row = r.fetchone()
430 b43d44ad Sofia Papagiannaki
        r.close()
431 b43d44ad Sofia Papagiannaki
        return row
432 4f917833 Sofia Papagiannaki
    
433 4f917833 Sofia Papagiannaki
    def statistics_update(self, node, population, size, mtime, cluster=0):
434 4f917833 Sofia Papagiannaki
        """Update the statistics of the given node.
435 4f917833 Sofia Papagiannaki
           Statistics keep track the population, total
436 4f917833 Sofia Papagiannaki
           size of objects and mtime in the node's namespace.
437 4f917833 Sofia Papagiannaki
           May be zero or positive or negative numbers.
438 4f917833 Sofia Papagiannaki
        """
439 18266c93 Sofia Papagiannaki
        s = select([self.statistics.c.population, self.statistics.c.size],
440 18266c93 Sofia Papagiannaki
            and_(self.statistics.c.node == node,
441 18266c93 Sofia Papagiannaki
                 self.statistics.c.cluster == cluster))
442 b43d44ad Sofia Papagiannaki
        rp = self.conn.execute(s)
443 b43d44ad Sofia Papagiannaki
        r = rp.fetchone()
444 b43d44ad Sofia Papagiannaki
        rp.close()
445 18266c93 Sofia Papagiannaki
        if not r:
446 4f917833 Sofia Papagiannaki
            prepopulation, presize = (0, 0)
447 4f917833 Sofia Papagiannaki
        else:
448 4f917833 Sofia Papagiannaki
            prepopulation, presize = r
449 4f917833 Sofia Papagiannaki
        population += prepopulation
450 4f917833 Sofia Papagiannaki
        size += presize
451 18266c93 Sofia Papagiannaki
        
452 d3be972a Sofia Papagiannaki
        #insert or replace
453 70516d86 Sofia Papagiannaki
        #TODO better upsert
454 d3be972a Sofia Papagiannaki
        u = self.statistics.update().where(and_(self.statistics.c.node==node,
455 d3be972a Sofia Papagiannaki
                                           self.statistics.c.cluster==cluster))
456 d3be972a Sofia Papagiannaki
        u = u.values(population=population, size=size, mtime=mtime)
457 d3be972a Sofia Papagiannaki
        rp = self.conn.execute(u)
458 d3be972a Sofia Papagiannaki
        rp.close()
459 d3be972a Sofia Papagiannaki
        if rp.rowcount == 0:
460 d3be972a Sofia Papagiannaki
            ins = self.statistics.insert()
461 d3be972a Sofia Papagiannaki
            ins = ins.values(node=node, population=population, size=size,
462 d3be972a Sofia Papagiannaki
                             mtime=mtime, cluster=cluster)
463 d3be972a Sofia Papagiannaki
            self.conn.execute(ins).close()
464 4f917833 Sofia Papagiannaki
    
465 4f917833 Sofia Papagiannaki
    def statistics_update_ancestors(self, node, population, size, mtime, cluster=0):
466 4f917833 Sofia Papagiannaki
        """Update the statistics of the given node's parent.
467 4f917833 Sofia Papagiannaki
           Then recursively update all parents up to the root.
468 4f917833 Sofia Papagiannaki
           Population is not recursive.
469 4f917833 Sofia Papagiannaki
        """
470 4f917833 Sofia Papagiannaki
        
471 4f917833 Sofia Papagiannaki
        while True:
472 18266c93 Sofia Papagiannaki
            if node == ROOTNODE:
473 4f917833 Sofia Papagiannaki
                break
474 4f917833 Sofia Papagiannaki
            props = self.node_get_properties(node)
475 4f917833 Sofia Papagiannaki
            if props is None:
476 4f917833 Sofia Papagiannaki
                break
477 4f917833 Sofia Papagiannaki
            parent, path = props
478 4f917833 Sofia Papagiannaki
            self.statistics_update(parent, population, size, mtime, cluster)
479 4f917833 Sofia Papagiannaki
            node = parent
480 4f917833 Sofia Papagiannaki
            population = 0 # Population isn't recursive
481 4f917833 Sofia Papagiannaki
    
482 4f917833 Sofia Papagiannaki
    def statistics_latest(self, node, before=inf, except_cluster=0):
483 4f917833 Sofia Papagiannaki
        """Return population, total size and last mtime
484 4f917833 Sofia Papagiannaki
           for all latest versions under node that
485 4f917833 Sofia Papagiannaki
           do not belong to the cluster.
486 4f917833 Sofia Papagiannaki
        """
487 4f917833 Sofia Papagiannaki
        
488 4f917833 Sofia Papagiannaki
        # The node.
489 4f917833 Sofia Papagiannaki
        props = self.node_get_properties(node)
490 4f917833 Sofia Papagiannaki
        if props is None:
491 4f917833 Sofia Papagiannaki
            return None
492 4f917833 Sofia Papagiannaki
        parent, path = props
493 4f917833 Sofia Papagiannaki
        
494 4f917833 Sofia Papagiannaki
        # The latest version.
495 b43d44ad Sofia Papagiannaki
        s = select([self.versions.c.serial,
496 b43d44ad Sofia Papagiannaki
                    self.versions.c.node,
497 1c2fc0ff Antony Chazapis
                    self.versions.c.hash,
498 b43d44ad Sofia Papagiannaki
                    self.versions.c.size,
499 d0aacf54 Sofia Papagiannaki
                    self.versions.c.source,
500 b43d44ad Sofia Papagiannaki
                    self.versions.c.mtime,
501 b43d44ad Sofia Papagiannaki
                    self.versions.c.muser,
502 25ae8b75 Antony Chazapis
                    self.versions.c.uuid,
503 b43d44ad Sofia Papagiannaki
                    self.versions.c.cluster])
504 62d938dc Sofia Papagiannaki
        filtered = select([func.max(self.versions.c.serial)],
505 62d938dc Sofia Papagiannaki
                            self.versions.c.node == node)
506 62d938dc Sofia Papagiannaki
        if before != inf:
507 62d938dc Sofia Papagiannaki
            filtered = filtered.where(self.versions.c.mtime < before)
508 b43d44ad Sofia Papagiannaki
        s = s.where(and_(self.versions.c.cluster != except_cluster,
509 62d938dc Sofia Papagiannaki
                         self.versions.c.serial == filtered))
510 b43d44ad Sofia Papagiannaki
        r = self.conn.execute(s)
511 b43d44ad Sofia Papagiannaki
        props = r.fetchone()
512 b43d44ad Sofia Papagiannaki
        r.close()
513 b43d44ad Sofia Papagiannaki
        if not props:
514 4f917833 Sofia Papagiannaki
            return None
515 4f917833 Sofia Papagiannaki
        mtime = props[MTIME]
516 4f917833 Sofia Papagiannaki
        
517 4f917833 Sofia Papagiannaki
        # First level, just under node (get population).
518 b43d44ad Sofia Papagiannaki
        v = self.versions.alias('v')
519 b43d44ad Sofia Papagiannaki
        s = select([func.count(v.c.serial),
520 b43d44ad Sofia Papagiannaki
                    func.sum(v.c.size),
521 b43d44ad Sofia Papagiannaki
                    func.max(v.c.mtime)])
522 62d938dc Sofia Papagiannaki
        c1 = select([func.max(self.versions.c.serial)])
523 62d938dc Sofia Papagiannaki
        if before != inf:
524 62d938dc Sofia Papagiannaki
            c1 = c1.where(self.versions.c.mtime < before)
525 b43d44ad Sofia Papagiannaki
        c2 = select([self.nodes.c.node], self.nodes.c.parent == node)
526 62d938dc Sofia Papagiannaki
        s = s.where(and_(v.c.serial == c1.where(self.versions.c.node == v.c.node),
527 b43d44ad Sofia Papagiannaki
                         v.c.cluster != except_cluster,
528 b43d44ad Sofia Papagiannaki
                         v.c.node.in_(c2)))
529 b43d44ad Sofia Papagiannaki
        rp = self.conn.execute(s)
530 b43d44ad Sofia Papagiannaki
        r = rp.fetchone()
531 b43d44ad Sofia Papagiannaki
        rp.close()
532 b43d44ad Sofia Papagiannaki
        if not r:
533 4f917833 Sofia Papagiannaki
            return None
534 4f917833 Sofia Papagiannaki
        count = r[0]
535 4f917833 Sofia Papagiannaki
        mtime = max(mtime, r[2])
536 4f917833 Sofia Papagiannaki
        if count == 0:
537 4f917833 Sofia Papagiannaki
            return (0, 0, mtime)
538 4f917833 Sofia Papagiannaki
        
539 4f917833 Sofia Papagiannaki
        # All children (get size and mtime).
540 4f917833 Sofia Papagiannaki
        # XXX: This is why the full path is stored.
541 b43d44ad Sofia Papagiannaki
        s = select([func.count(v.c.serial),
542 b43d44ad Sofia Papagiannaki
                    func.sum(v.c.size),
543 b43d44ad Sofia Papagiannaki
                    func.max(v.c.mtime)])
544 b43d44ad Sofia Papagiannaki
        c1 = select([func.max(self.versions.c.serial)],
545 62d938dc Sofia Papagiannaki
            self.versions.c.node == v.c.node)
546 62d938dc Sofia Papagiannaki
        if before != inf:
547 62d938dc Sofia Papagiannaki
            c1 = c1.where(self.versions.c.mtime < before)
548 7759260d Antony Chazapis
        c2 = select([self.nodes.c.node], self.nodes.c.path.like(self.escape_like(path) + '%', escape='\\'))
549 b43d44ad Sofia Papagiannaki
        s = s.where(and_(v.c.serial == c1,
550 b43d44ad Sofia Papagiannaki
                         v.c.cluster != except_cluster,
551 b43d44ad Sofia Papagiannaki
                         v.c.node.in_(c2)))
552 b43d44ad Sofia Papagiannaki
        rp = self.conn.execute(s)
553 b43d44ad Sofia Papagiannaki
        r = rp.fetchone()
554 b43d44ad Sofia Papagiannaki
        rp.close()
555 b43d44ad Sofia Papagiannaki
        if not r:
556 4f917833 Sofia Papagiannaki
            return None
557 4f917833 Sofia Papagiannaki
        size = r[1] - props[SIZE]
558 4f917833 Sofia Papagiannaki
        mtime = max(mtime, r[2])
559 4f917833 Sofia Papagiannaki
        return (count, size, mtime)
560 4f917833 Sofia Papagiannaki
    
561 25ae8b75 Antony Chazapis
    def version_create(self, node, hash, size, source, muser, uuid, cluster=0):
562 4f917833 Sofia Papagiannaki
        """Create a new version from the given properties.
563 4f917833 Sofia Papagiannaki
           Return the (serial, mtime) of the new version.
564 4f917833 Sofia Papagiannaki
        """
565 4f917833 Sofia Papagiannaki
        
566 4f917833 Sofia Papagiannaki
        mtime = time()
567 1c2fc0ff Antony Chazapis
        s = self.versions.insert().values(node=node, hash=hash, size=size, source=source,
568 25ae8b75 Antony Chazapis
                                          mtime=mtime, muser=muser, uuid=uuid, cluster=cluster)
569 b43d44ad Sofia Papagiannaki
        serial = self.conn.execute(s).inserted_primary_key[0]
570 4f917833 Sofia Papagiannaki
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
571 4f917833 Sofia Papagiannaki
        return serial, mtime
572 4f917833 Sofia Papagiannaki
    
573 4f917833 Sofia Papagiannaki
    def version_lookup(self, node, before=inf, cluster=0):
574 4f917833 Sofia Papagiannaki
        """Lookup the current version of the given node.
575 4f917833 Sofia Papagiannaki
           Return a list with its properties:
576 25ae8b75 Antony Chazapis
           (serial, node, hash, size, source, mtime, muser, uuid, cluster)
577 4f917833 Sofia Papagiannaki
           or None if the current version is not found in the given cluster.
578 4f917833 Sofia Papagiannaki
        """
579 4f917833 Sofia Papagiannaki
        
580 b43d44ad Sofia Papagiannaki
        v = self.versions.alias('v')
581 25ae8b75 Antony Chazapis
        s = select([v.c.serial, v.c.node, v.c.hash,
582 25ae8b75 Antony Chazapis
                    v.c.size, v.c.source, v.c.mtime,
583 25ae8b75 Antony Chazapis
                    v.c.muser, v.c.uuid, v.c.cluster])
584 b43d44ad Sofia Papagiannaki
        c = select([func.max(self.versions.c.serial)],
585 62d938dc Sofia Papagiannaki
            self.versions.c.node == node)
586 62d938dc Sofia Papagiannaki
        if before != inf:
587 62d938dc Sofia Papagiannaki
            c = c.where(self.versions.c.mtime < before)
588 b43d44ad Sofia Papagiannaki
        s = s.where(and_(v.c.serial == c,
589 b43d44ad Sofia Papagiannaki
                         v.c.cluster == cluster))
590 b43d44ad Sofia Papagiannaki
        r = self.conn.execute(s)
591 b43d44ad Sofia Papagiannaki
        props = r.fetchone()
592 b43d44ad Sofia Papagiannaki
        r.close()
593 70516d86 Sofia Papagiannaki
        if props:
594 4f917833 Sofia Papagiannaki
            return props
595 4f917833 Sofia Papagiannaki
        return None
596 4f917833 Sofia Papagiannaki
    
597 4f917833 Sofia Papagiannaki
    def version_get_properties(self, serial, keys=(), propnames=_propnames):
598 4f917833 Sofia Papagiannaki
        """Return a sequence of values for the properties of
599 4f917833 Sofia Papagiannaki
           the version specified by serial and the keys, in the order given.
600 4f917833 Sofia Papagiannaki
           If keys is empty, return all properties in the order
601 37bee317 Antony Chazapis
           (serial, node, hash, size, source, mtime, muser, uuid, cluster).
602 4f917833 Sofia Papagiannaki
        """
603 4f917833 Sofia Papagiannaki
        
604 b43d44ad Sofia Papagiannaki
        v = self.versions.alias()
605 37bee317 Antony Chazapis
        s = select([v.c.serial, v.c.node, v.c.hash,
606 37bee317 Antony Chazapis
                    v.c.size, v.c.source, v.c.mtime,
607 37bee317 Antony Chazapis
                    v.c.muser, v.c.uuid, v.c.cluster], v.c.serial == serial)
608 b43d44ad Sofia Papagiannaki
        rp = self.conn.execute(s)
609 b43d44ad Sofia Papagiannaki
        r = rp.fetchone()
610 b43d44ad Sofia Papagiannaki
        rp.close()
611 4f917833 Sofia Papagiannaki
        if r is None:
612 4f917833 Sofia Papagiannaki
            return r
613 4f917833 Sofia Papagiannaki
        
614 4f917833 Sofia Papagiannaki
        if not keys:
615 4f917833 Sofia Papagiannaki
            return r
616 4f917833 Sofia Papagiannaki
        return [r[propnames[k]] for k in keys if k in propnames]
617 4f917833 Sofia Papagiannaki
    
618 4f917833 Sofia Papagiannaki
    def version_recluster(self, serial, cluster):
619 4f917833 Sofia Papagiannaki
        """Move the version into another cluster."""
620 4f917833 Sofia Papagiannaki
        
621 4f917833 Sofia Papagiannaki
        props = self.version_get_properties(serial)
622 4f917833 Sofia Papagiannaki
        if not props:
623 4f917833 Sofia Papagiannaki
            return
624 4f917833 Sofia Papagiannaki
        node = props[NODE]
625 4f917833 Sofia Papagiannaki
        size = props[SIZE]
626 4f917833 Sofia Papagiannaki
        oldcluster = props[CLUSTER]
627 4f917833 Sofia Papagiannaki
        if cluster == oldcluster:
628 4f917833 Sofia Papagiannaki
            return
629 4f917833 Sofia Papagiannaki
        
630 4f917833 Sofia Papagiannaki
        mtime = time()
631 4f917833 Sofia Papagiannaki
        self.statistics_update_ancestors(node, -1, -size, mtime, oldcluster)
632 4f917833 Sofia Papagiannaki
        self.statistics_update_ancestors(node, 1, size, mtime, cluster)
633 4f917833 Sofia Papagiannaki
        
634 b43d44ad Sofia Papagiannaki
        s = self.versions.update()
635 b43d44ad Sofia Papagiannaki
        s = s.where(self.versions.c.serial == serial)
636 b43d44ad Sofia Papagiannaki
        s = s.values(cluster = cluster)
637 b43d44ad Sofia Papagiannaki
        self.conn.execute(s).close()
638 4f917833 Sofia Papagiannaki
    
639 4f917833 Sofia Papagiannaki
    def version_remove(self, serial):
640 4f917833 Sofia Papagiannaki
        """Remove the serial specified."""
641 4f917833 Sofia Papagiannaki
        
642 5161c672 Antony Chazapis
        props = self.version_get_properties(serial)
643 4f917833 Sofia Papagiannaki
        if not props:
644 4f917833 Sofia Papagiannaki
            return
645 4f917833 Sofia Papagiannaki
        node = props[NODE]
646 5161c672 Antony Chazapis
        hash = props[HASH]
647 4f917833 Sofia Papagiannaki
        size = props[SIZE]
648 4f917833 Sofia Papagiannaki
        cluster = props[CLUSTER]
649 4f917833 Sofia Papagiannaki
        
650 4f917833 Sofia Papagiannaki
        mtime = time()
651 4f917833 Sofia Papagiannaki
        self.statistics_update_ancestors(node, -1, -size, mtime, cluster)
652 4f917833 Sofia Papagiannaki
        
653 b43d44ad Sofia Papagiannaki
        s = self.versions.delete().where(self.versions.c.serial == serial)
654 b43d44ad Sofia Papagiannaki
        self.conn.execute(s).close()
655 5161c672 Antony Chazapis
        return hash
656 4f917833 Sofia Papagiannaki
    
657 059857e2 Antony Chazapis
    def attribute_get(self, serial, domain, keys=()):
658 4f917833 Sofia Papagiannaki
        """Return a list of (key, value) pairs of the version specified by serial.
659 4f917833 Sofia Papagiannaki
           If keys is empty, return all attributes.
660 4f917833 Sofia Papagiannaki
           Othwerise, return only those specified.
661 4f917833 Sofia Papagiannaki
        """
662 4f917833 Sofia Papagiannaki
        
663 4f917833 Sofia Papagiannaki
        if keys:
664 b43d44ad Sofia Papagiannaki
            attrs = self.attributes.alias()
665 b43d44ad Sofia Papagiannaki
            s = select([attrs.c.key, attrs.c.value])
666 b43d44ad Sofia Papagiannaki
            s = s.where(and_(attrs.c.key.in_(keys),
667 059857e2 Antony Chazapis
                             attrs.c.serial == serial,
668 059857e2 Antony Chazapis
                             attrs.c.domain == domain))
669 4f917833 Sofia Papagiannaki
        else:
670 b43d44ad Sofia Papagiannaki
            attrs = self.attributes.alias()
671 b43d44ad Sofia Papagiannaki
            s = select([attrs.c.key, attrs.c.value])
672 059857e2 Antony Chazapis
            s = s.where(and_(attrs.c.serial == serial,
673 059857e2 Antony Chazapis
                             attrs.c.domain == domain))
674 b43d44ad Sofia Papagiannaki
        r = self.conn.execute(s)
675 b43d44ad Sofia Papagiannaki
        l = r.fetchall()
676 b43d44ad Sofia Papagiannaki
        r.close()
677 b43d44ad Sofia Papagiannaki
        return l
678 4f917833 Sofia Papagiannaki
    
679 059857e2 Antony Chazapis
    def attribute_set(self, serial, domain, items):
680 4f917833 Sofia Papagiannaki
        """Set the attributes of the version specified by serial.
681 4f917833 Sofia Papagiannaki
           Receive attributes as an iterable of (key, value) pairs.
682 4f917833 Sofia Papagiannaki
        """
683 70516d86 Sofia Papagiannaki
        #insert or replace
684 70516d86 Sofia Papagiannaki
        #TODO better upsert
685 70516d86 Sofia Papagiannaki
        for k, v in items:
686 70516d86 Sofia Papagiannaki
            s = self.attributes.update()
687 70516d86 Sofia Papagiannaki
            s = s.where(and_(self.attributes.c.serial == serial,
688 059857e2 Antony Chazapis
                             self.attributes.c.domain == domain,
689 70516d86 Sofia Papagiannaki
                             self.attributes.c.key == k))
690 70516d86 Sofia Papagiannaki
            s = s.values(value = v)
691 70516d86 Sofia Papagiannaki
            rp = self.conn.execute(s)
692 70516d86 Sofia Papagiannaki
            rp.close()
693 70516d86 Sofia Papagiannaki
            if rp.rowcount == 0:
694 70516d86 Sofia Papagiannaki
                s = self.attributes.insert()
695 059857e2 Antony Chazapis
                s = s.values(serial=serial, domain=domain, key=k, value=v)
696 70516d86 Sofia Papagiannaki
                self.conn.execute(s).close()
697 4f917833 Sofia Papagiannaki
    
698 059857e2 Antony Chazapis
    def attribute_del(self, serial, domain, keys=()):
699 4f917833 Sofia Papagiannaki
        """Delete attributes of the version specified by serial.
700 4f917833 Sofia Papagiannaki
           If keys is empty, delete all attributes.
701 4f917833 Sofia Papagiannaki
           Otherwise delete those specified.
702 4f917833 Sofia Papagiannaki
        """
703 4f917833 Sofia Papagiannaki
        
704 4f917833 Sofia Papagiannaki
        if keys:
705 b43d44ad Sofia Papagiannaki
            #TODO more efficient way to do this?
706 b43d44ad Sofia Papagiannaki
            for key in keys:
707 b43d44ad Sofia Papagiannaki
                s = self.attributes.delete()
708 b43d44ad Sofia Papagiannaki
                s = s.where(and_(self.attributes.c.serial == serial,
709 059857e2 Antony Chazapis
                                 self.attributes.c.domain == domain,
710 b43d44ad Sofia Papagiannaki
                                 self.attributes.c.key == key))
711 b43d44ad Sofia Papagiannaki
                self.conn.execute(s).close()
712 4f917833 Sofia Papagiannaki
        else:
713 b43d44ad Sofia Papagiannaki
            s = self.attributes.delete()
714 059857e2 Antony Chazapis
            s = s.where(and_(self.attributes.c.serial == serial,
715 059857e2 Antony Chazapis
                             self.attributes.c.domain == domain))
716 b43d44ad Sofia Papagiannaki
            self.conn.execute(s).close()
717 4f917833 Sofia Papagiannaki
    
718 4f917833 Sofia Papagiannaki
    def attribute_copy(self, source, dest):
719 059857e2 Antony Chazapis
        s = select([dest, self.attributes.c.domain, self.attributes.c.key, self.attributes.c.value],
720 b43d44ad Sofia Papagiannaki
            self.attributes.c.serial == source)
721 1b61dbc0 Sofia Papagiannaki
        rp = self.conn.execute(s)
722 1b61dbc0 Sofia Papagiannaki
        attributes = rp.fetchall()
723 1b61dbc0 Sofia Papagiannaki
        rp.close()
724 059857e2 Antony Chazapis
        for dest, domain, k, v in attributes:
725 dc9e6086 Sofia Papagiannaki
            #insert or replace
726 1b61dbc0 Sofia Papagiannaki
            s = self.attributes.update().where(and_(
727 1b61dbc0 Sofia Papagiannaki
                self.attributes.c.serial == dest,
728 059857e2 Antony Chazapis
                self.attributes.c.domain == domain,
729 1b61dbc0 Sofia Papagiannaki
                self.attributes.c.key == k))
730 1b61dbc0 Sofia Papagiannaki
            rp = self.conn.execute(s, value=v)
731 1b61dbc0 Sofia Papagiannaki
            rp.close()
732 1b61dbc0 Sofia Papagiannaki
            if rp.rowcount == 0:
733 1b61dbc0 Sofia Papagiannaki
                s = self.attributes.insert()
734 059857e2 Antony Chazapis
                values = {'serial':dest, 'domain':domain, 'key':k, 'value':v}
735 1b61dbc0 Sofia Papagiannaki
                self.conn.execute(s, values).close()
736 4f917833 Sofia Papagiannaki
    
737 059857e2 Antony Chazapis
    def latest_attribute_keys(self, parent, domain, before=inf, except_cluster=0, pathq=[]):
738 4f917833 Sofia Papagiannaki
        """Return a list with all keys pairs defined
739 4f917833 Sofia Papagiannaki
           for all latest versions under parent that
740 4f917833 Sofia Papagiannaki
           do not belong to the cluster.
741 4f917833 Sofia Papagiannaki
        """
742 4f917833 Sofia Papagiannaki
        
743 4f917833 Sofia Papagiannaki
        # TODO: Use another table to store before=inf results.
744 b43d44ad Sofia Papagiannaki
        a = self.attributes.alias('a')
745 b43d44ad Sofia Papagiannaki
        v = self.versions.alias('v')
746 b43d44ad Sofia Papagiannaki
        n = self.nodes.alias('n')
747 b43d44ad Sofia Papagiannaki
        s = select([a.c.key]).distinct()
748 62d938dc Sofia Papagiannaki
        filtered = select([func.max(self.versions.c.serial)])
749 62d938dc Sofia Papagiannaki
        if before != inf:
750 62d938dc Sofia Papagiannaki
            filtered = filtered.where(self.versions.c.mtime < before)
751 62d938dc Sofia Papagiannaki
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
752 b43d44ad Sofia Papagiannaki
        s = s.where(v.c.cluster != except_cluster)
753 b43d44ad Sofia Papagiannaki
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
754 b43d44ad Sofia Papagiannaki
            self.nodes.c.parent == parent)))
755 b43d44ad Sofia Papagiannaki
        s = s.where(a.c.serial == v.c.serial)
756 059857e2 Antony Chazapis
        s = s.where(a.c.domain == domain)
757 b43d44ad Sofia Papagiannaki
        s = s.where(n.c.node == v.c.node)
758 b43d44ad Sofia Papagiannaki
        conj = []
759 b43d44ad Sofia Papagiannaki
        for x in pathq:
760 7759260d Antony Chazapis
            conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
761 b43d44ad Sofia Papagiannaki
        if conj:
762 b43d44ad Sofia Papagiannaki
            s = s.where(or_(*conj))
763 b43d44ad Sofia Papagiannaki
        rp = self.conn.execute(s)
764 70516d86 Sofia Papagiannaki
        rows = rp.fetchall()
765 b43d44ad Sofia Papagiannaki
        rp.close()
766 70516d86 Sofia Papagiannaki
        return [r[0] for r in rows]
767 4f917833 Sofia Papagiannaki
    
768 4f917833 Sofia Papagiannaki
    def latest_version_list(self, parent, prefix='', delimiter=None,
769 4f917833 Sofia Papagiannaki
                            start='', limit=10000, before=inf,
770 059857e2 Antony Chazapis
                            except_cluster=0, pathq=[], domain=None, filterq=[]):
771 4f917833 Sofia Papagiannaki
        """Return a (list of (path, serial) tuples, list of common prefixes)
772 4f917833 Sofia Papagiannaki
           for the current versions of the paths with the given parent,
773 4f917833 Sofia Papagiannaki
           matching the following criteria.
774 4f917833 Sofia Papagiannaki
           
775 4f917833 Sofia Papagiannaki
           The property tuple for a version is returned if all
776 4f917833 Sofia Papagiannaki
           of these conditions are true:
777 4f917833 Sofia Papagiannaki
                
778 4f917833 Sofia Papagiannaki
                a. parent matches
779 4f917833 Sofia Papagiannaki
                
780 4f917833 Sofia Papagiannaki
                b. path > start
781 4f917833 Sofia Papagiannaki
                
782 4f917833 Sofia Papagiannaki
                c. path starts with prefix (and paths in pathq)
783 4f917833 Sofia Papagiannaki
                
784 4f917833 Sofia Papagiannaki
                d. version is the max up to before
785 4f917833 Sofia Papagiannaki
                
786 4f917833 Sofia Papagiannaki
                e. version is not in cluster
787 4f917833 Sofia Papagiannaki
                
788 4f917833 Sofia Papagiannaki
                f. the path does not have the delimiter occuring
789 4f917833 Sofia Papagiannaki
                   after the prefix, or ends with the delimiter
790 4f917833 Sofia Papagiannaki
                
791 4f917833 Sofia Papagiannaki
                g. serial matches the attribute filter query.
792 4f917833 Sofia Papagiannaki
                   
793 4f917833 Sofia Papagiannaki
                   A filter query is a comma-separated list of
794 4f917833 Sofia Papagiannaki
                   terms in one of these three forms:
795 4f917833 Sofia Papagiannaki
                   
796 4f917833 Sofia Papagiannaki
                   key
797 4f917833 Sofia Papagiannaki
                       an attribute with this key must exist
798 4f917833 Sofia Papagiannaki
                   
799 4f917833 Sofia Papagiannaki
                   !key
800 4f917833 Sofia Papagiannaki
                       an attribute with this key must not exist
801 4f917833 Sofia Papagiannaki
                   
802 4f917833 Sofia Papagiannaki
                   key ?op value
803 4f917833 Sofia Papagiannaki
                       the attribute with this key satisfies the value
804 4f917833 Sofia Papagiannaki
                       where ?op is one of ==, != <=, >=, <, >.
805 4f917833 Sofia Papagiannaki
           
806 4f917833 Sofia Papagiannaki
           The list of common prefixes includes the prefixes
807 4f917833 Sofia Papagiannaki
           matching up to the first delimiter after prefix,
808 4f917833 Sofia Papagiannaki
           and are reported only once, as "virtual directories".
809 4f917833 Sofia Papagiannaki
           The delimiter is included in the prefixes.
810 4f917833 Sofia Papagiannaki
           
811 4f917833 Sofia Papagiannaki
           If arguments are None, then the corresponding matching rule
812 4f917833 Sofia Papagiannaki
           will always match.
813 4f917833 Sofia Papagiannaki
           
814 4f917833 Sofia Papagiannaki
           Limit applies to the first list of tuples returned.
815 4f917833 Sofia Papagiannaki
        """
816 4f917833 Sofia Papagiannaki
        
817 4f917833 Sofia Papagiannaki
        if not start or start < prefix:
818 4f917833 Sofia Papagiannaki
            start = strprevling(prefix)
819 4f917833 Sofia Papagiannaki
        nextling = strnextling(prefix)
820 4f917833 Sofia Papagiannaki
        
821 b43d44ad Sofia Papagiannaki
        a = self.attributes.alias('a')
822 b43d44ad Sofia Papagiannaki
        v = self.versions.alias('v')
823 b43d44ad Sofia Papagiannaki
        n = self.nodes.alias('n')
824 b43d44ad Sofia Papagiannaki
        s = select([n.c.path, v.c.serial]).distinct()
825 62d938dc Sofia Papagiannaki
        filtered = select([func.max(self.versions.c.serial)])
826 62d938dc Sofia Papagiannaki
        if before != inf:
827 62d938dc Sofia Papagiannaki
            filtered = filtered.where(self.versions.c.mtime < before)
828 62d938dc Sofia Papagiannaki
        s = s.where(v.c.serial == filtered.where(self.versions.c.node == v.c.node))
829 b43d44ad Sofia Papagiannaki
        s = s.where(v.c.cluster != except_cluster)
830 b43d44ad Sofia Papagiannaki
        s = s.where(v.c.node.in_(select([self.nodes.c.node],
831 b43d44ad Sofia Papagiannaki
            self.nodes.c.parent == parent)))
832 b43d44ad Sofia Papagiannaki
        
833 b43d44ad Sofia Papagiannaki
        s = s.where(n.c.node == v.c.node)
834 b43d44ad Sofia Papagiannaki
        s = s.where(and_(n.c.path > bindparam('start'), n.c.path < nextling))
835 b43d44ad Sofia Papagiannaki
        conj = []
836 b43d44ad Sofia Papagiannaki
        for x in pathq:
837 7759260d Antony Chazapis
            conj.append(n.c.path.like(self.escape_like(x) + '%', escape='\\'))
838 b43d44ad Sofia Papagiannaki
        
839 b43d44ad Sofia Papagiannaki
        if conj:
840 b43d44ad Sofia Papagiannaki
            s = s.where(or_(*conj))
841 b43d44ad Sofia Papagiannaki
        
842 f897bea9 Sofia Papagiannaki
        s = s.order_by(n.c.path)
843 f897bea9 Sofia Papagiannaki
        
844 f897bea9 Sofia Papagiannaki
        def filterout(r):
845 f897bea9 Sofia Papagiannaki
            if not filterq:
846 f897bea9 Sofia Papagiannaki
                return False
847 f897bea9 Sofia Papagiannaki
            path, serial = r
848 3d13f97a Sofia Papagiannaki
            included, excluded, opers = parse_filters(filterq)
849 f897bea9 Sofia Papagiannaki
            
850 f897bea9 Sofia Papagiannaki
            #retrieve metadata
851 f897bea9 Sofia Papagiannaki
            s = select([a.c.key, a.c.value])
852 f897bea9 Sofia Papagiannaki
            s = s.where(a.c.domain == domain)
853 f897bea9 Sofia Papagiannaki
            s = s.where(a.c.serial == serial)
854 f897bea9 Sofia Papagiannaki
            rp = self.conn.execute(s)
855 f897bea9 Sofia Papagiannaki
            meta = dict(rp.fetchall())
856 f897bea9 Sofia Papagiannaki
            keyset= set([k.encode('utf8') for k in meta.keys()])
857 f897bea9 Sofia Papagiannaki
            
858 3d13f97a Sofia Papagiannaki
            if included:
859 f897bea9 Sofia Papagiannaki
                if not set(included) & keyset:
860 f897bea9 Sofia Papagiannaki
                    return True
861 3d13f97a Sofia Papagiannaki
            if excluded:
862 f897bea9 Sofia Papagiannaki
                if set(excluded) & keyset:
863 f897bea9 Sofia Papagiannaki
                    return True
864 f897bea9 Sofia Papagiannaki
            for k, op, v in opers:
865 f897bea9 Sofia Papagiannaki
                k = k.decode('utf8')
866 f897bea9 Sofia Papagiannaki
                v = v.decode('utf8')
867 f897bea9 Sofia Papagiannaki
                if k not in meta:
868 f897bea9 Sofia Papagiannaki
                    return True
869 f897bea9 Sofia Papagiannaki
                operation = OPERATORS[op]
870 f897bea9 Sofia Papagiannaki
                if not operation(meta[k], v):
871 f897bea9 Sofia Papagiannaki
                    return True
872 f897bea9 Sofia Papagiannaki
            return False
873 f897bea9 Sofia Papagiannaki
    
874 4f917833 Sofia Papagiannaki
        if not delimiter:
875 b43d44ad Sofia Papagiannaki
            s = s.limit(limit)
876 b43d44ad Sofia Papagiannaki
            rp = self.conn.execute(s, start=start)
877 f897bea9 Sofia Papagiannaki
            filter_ = lambda r : [t for t in r if not filterout(t)]
878 f897bea9 Sofia Papagiannaki
            r = filter_(rp.fetchall())
879 b43d44ad Sofia Papagiannaki
            rp.close()
880 b43d44ad Sofia Papagiannaki
            return r, ()
881 4f917833 Sofia Papagiannaki
        
882 4f917833 Sofia Papagiannaki
        pfz = len(prefix)
883 4f917833 Sofia Papagiannaki
        dz = len(delimiter)
884 4f917833 Sofia Papagiannaki
        count = 0
885 4f917833 Sofia Papagiannaki
        prefixes = []
886 4f917833 Sofia Papagiannaki
        pappend = prefixes.append
887 4f917833 Sofia Papagiannaki
        matches = []
888 4f917833 Sofia Papagiannaki
        mappend = matches.append
889 4f917833 Sofia Papagiannaki
        
890 b43d44ad Sofia Papagiannaki
        rp = self.conn.execute(s, start=start)
891 4f917833 Sofia Papagiannaki
        while True:
892 b43d44ad Sofia Papagiannaki
            props = rp.fetchone()
893 4f917833 Sofia Papagiannaki
            if props is None:
894 4f917833 Sofia Papagiannaki
                break
895 f897bea9 Sofia Papagiannaki
            if filterout(props):
896 f897bea9 Sofia Papagiannaki
                continue
897 4f917833 Sofia Papagiannaki
            path, serial = props
898 4f917833 Sofia Papagiannaki
            idx = path.find(delimiter, pfz)
899 4f917833 Sofia Papagiannaki
            
900 4f917833 Sofia Papagiannaki
            if idx < 0:
901 4f917833 Sofia Papagiannaki
                mappend(props)
902 4f917833 Sofia Papagiannaki
                count += 1
903 4f917833 Sofia Papagiannaki
                if count >= limit:
904 4f917833 Sofia Papagiannaki
                    break
905 4f917833 Sofia Papagiannaki
                continue
906 4f917833 Sofia Papagiannaki
            
907 4f917833 Sofia Papagiannaki
            if idx + dz == len(path):
908 4f917833 Sofia Papagiannaki
                mappend(props)
909 4f917833 Sofia Papagiannaki
                count += 1
910 d3be972a Sofia Papagiannaki
                continue # Get one more, in case there is a path.
911 d3be972a Sofia Papagiannaki
            pf = path[:idx + dz]
912 d3be972a Sofia Papagiannaki
            pappend(pf)
913 4f917833 Sofia Papagiannaki
            if count >= limit: 
914 4f917833 Sofia Papagiannaki
                break
915 4f917833 Sofia Papagiannaki
            
916 b43d44ad Sofia Papagiannaki
            rp = self.conn.execute(s, start=strnextling(pf)) # New start.
917 9eb713e1 Sofia Papagiannaki
        rp.close()
918 4f917833 Sofia Papagiannaki
        
919 4f917833 Sofia Papagiannaki
        return matches, prefixes
920 37bee317 Antony Chazapis
    
921 37bee317 Antony Chazapis
    def latest_uuid(self, uuid):
922 37bee317 Antony Chazapis
        """Return a (path, serial) tuple, for the latest version of the given uuid."""
923 37bee317 Antony Chazapis
        
924 37bee317 Antony Chazapis
        v = self.versions.alias('v')
925 37bee317 Antony Chazapis
        n = self.nodes.alias('n')
926 37bee317 Antony Chazapis
        s = select([n.c.path, v.c.serial])
927 37bee317 Antony Chazapis
        filtered = select([func.max(self.versions.c.serial)])
928 37bee317 Antony Chazapis
        s = s.where(v.c.serial == filtered.where(self.versions.c.uuid == uuid))
929 37bee317 Antony Chazapis
        s = s.where(n.c.node == v.c.node)
930 37bee317 Antony Chazapis
        
931 37bee317 Antony Chazapis
        r = self.conn.execute(s)
932 37bee317 Antony Chazapis
        l = r.fetchone()
933 37bee317 Antony Chazapis
        r.close()
934 37bee317 Antony Chazapis
        return l