root / pithos / lib / hashfiler / filer.py @ 5bd53e3b
History | View | Annotate | Download (9.4 kB)
1 |
from sqlite3 import connect |
---|---|
2 |
from os import makedirs |
3 |
from os.path import realpath, dirname, exists, isdir |
4 |
from pithos.lib.hashfiler import (Blocker, Mapper, Noder, AccessController, |
5 |
inf, SERIAL, PARENT, PATH, SIZE, |
6 |
POPULATION, POPSIZE, SOURCE, MTIME, CLUSTER) |
7 |
|
8 |
def packversion(serial, mtime): |
9 |
"""Return a version string combining serial and mtime"""
|
10 |
return "%x:%.8f" % (serial, mtime) |
11 |
|
12 |
def unpackversion(strver): |
13 |
"""Return a serial, mtime tuple from a combined version string"""
|
14 |
s, t = strver.split(':')
|
15 |
return int(s, 16), float(t) |
16 |
|
17 |
class Filer(Noder, AccessController, Mapper, Blocker): |
18 |
version = None
|
19 |
|
20 |
def __init__(self, **params): |
21 |
filepath = realpath(params['filepath'])
|
22 |
basepath = dirname(filepath) |
23 |
if not exists(basepath): |
24 |
makedirs(basepath) |
25 |
elif not isdir(basepath): |
26 |
raise ValueError("%s is not a directory" % (basepath,)) |
27 |
|
28 |
conn = connect(filepath, check_same_thread=0)
|
29 |
cur = conn.cursor() |
30 |
self.filepath = filepath
|
31 |
self.conn = conn
|
32 |
self.cur = cur
|
33 |
self.execute = cur.execute
|
34 |
self.executemany = cur.executemany
|
35 |
self.fetchone = cur.fetchone
|
36 |
self.fetchall = cur.fetchall
|
37 |
|
38 |
self.serial = 0 |
39 |
self.limit_serial = 0 |
40 |
self.serial_step = 256 |
41 |
|
42 |
self.filer_init()
|
43 |
|
44 |
params['connection'] = conn
|
45 |
params['cursor'] = cur
|
46 |
Blocker.__init__(self, **params)
|
47 |
params['namelen'] = self.hashlen |
48 |
Noder.__init__(self, **params)
|
49 |
AccessController.__init__(self, **params)
|
50 |
Mapper.__init__(self, **params)
|
51 |
|
52 |
|
53 |
def filer_init(self): |
54 |
execute = self.execute
|
55 |
|
56 |
execute(""" create table if not exists conf
|
57 |
( param text,
|
58 |
value text,
|
59 |
primary key(param) )""")
|
60 |
|
61 |
execute(""" select value from conf where param = 'serial' """)
|
62 |
r = self.fetchone()
|
63 |
if r is None: |
64 |
execute(""" insert into conf values('serial', 100) """)
|
65 |
|
66 |
def alloc_serial(self, nr=1): |
67 |
"""Allocate global serial identifiers.
|
68 |
Pre-allocate a buffer of identifiers locally,
|
69 |
to reduce load to the database.
|
70 |
"""
|
71 |
serial = self.serial + nr
|
72 |
if serial >= self.limit_serial: |
73 |
params = self.params
|
74 |
db_limit_serial = int(self.get_parameter('serial'), 16) |
75 |
limit_serial = db_limit_serial + self.serial_step
|
76 |
self.set_parameter('serial', hex(limit_serial)) |
77 |
self.limit_serial = limit_serial
|
78 |
serial = db_limit_serial |
79 |
self.serial = serial
|
80 |
return serial
|
81 |
|
82 |
def quit(self): |
83 |
return
|
84 |
|
85 |
def begin(self): |
86 |
"""Commits the current transaction (if any)
|
87 |
and starts a new one.
|
88 |
"""
|
89 |
self.cur.execute("begin deferred") |
90 |
|
91 |
def commit(self): |
92 |
"""Commit the current transaction (if any)
|
93 |
but do not start a new one.
|
94 |
"""
|
95 |
self.conn.commit()
|
96 |
|
97 |
def rollback(self): |
98 |
"""Abort the current transaction and undo the changes it made."""
|
99 |
self.conn.rollback()
|
100 |
|
101 |
def __enter__(self): |
102 |
self.begin()
|
103 |
return self |
104 |
|
105 |
def __exit__(self, exc, val, trace): |
106 |
if not exc: |
107 |
self.commit()
|
108 |
else:
|
109 |
self.rollback()
|
110 |
|
111 |
def get_parameter(self, name): |
112 |
c = self.cur.execute("select value from conf where param = ?", (name,)) |
113 |
r = c.fetchone() |
114 |
if r is None: |
115 |
return r
|
116 |
return r[0] |
117 |
|
118 |
def set_parameter(self, name, value): |
119 |
q = "insert or replace into conf values (?, ?)"
|
120 |
self.cur.execute(q, (name, value))
|
121 |
|
122 |
def list_parameters(self): |
123 |
c = self.cur.execute("select * from conf") |
124 |
return c.fetchall()
|
125 |
|
126 |
def file_create(self, parent, path, size, cluster=0): |
127 |
"""Create a new file on path in parent's namespace with the given size.
|
128 |
Return the (serial, timestamp) of the created file.
|
129 |
"""
|
130 |
return self.node_create(parent, path, size, None, cluster=cluster) |
131 |
|
132 |
def file_list(self, parent, prefix='', |
133 |
start='', delimiter=None, |
134 |
after=0.0, before=inf,
|
135 |
filterq=None, versions=0, |
136 |
cluster=0, limit=10000): |
137 |
return self.node_list(parent, prefix=prefix, |
138 |
start=start, delimiter=delimiter, |
139 |
after=after, before=before, |
140 |
filterq=filterq, versions=versions, |
141 |
cluster=cluster, limit=limit) |
142 |
|
143 |
def file_get_properties(self, serial): |
144 |
"""Alias for node_get_properties()"""
|
145 |
return self.node_get_properties(serial) |
146 |
|
147 |
def file_set_attributes(self, serial, items): |
148 |
"""Alias for node_attr_set()."""
|
149 |
return self.node_attr_set(serial, items) |
150 |
|
151 |
def file_increment(self, serial, size=None): |
152 |
"""Alias for node_increment()."""
|
153 |
return self.node_increment(serial, size=None) |
154 |
|
155 |
def file_list_attributes(self, parent): |
156 |
"""List file attributes. Alias for node_attr_list_keys()."""
|
157 |
return self.node_attr_list_keys(parent) |
158 |
|
159 |
def file_get_attributes(self, serial, keys=()): |
160 |
"""Alias for node_attr_get()."""
|
161 |
return self.node_attr_get(serial, keys=keys) |
162 |
|
163 |
def file_del_attributes(self, serial): |
164 |
"""Alias for node_attr_del()."""
|
165 |
return self.node_attr_del(serial) |
166 |
|
167 |
def file_remove(self, serial, recursive=0): |
168 |
"""Remove the file given.
|
169 |
If the file's namespace is not empty,
|
170 |
remove nothing and return false.
|
171 |
Return true if the file was removed.
|
172 |
"""
|
173 |
r = self.node_remove(serial, recursive=recursive)
|
174 |
if not r: |
175 |
return 0 |
176 |
|
177 |
r = self.map_remv(serial)
|
178 |
return 1 |
179 |
|
180 |
def file_purge_cluster(self, parent, cluster=0): |
181 |
"""Purge a whole cluster from the namespace of a node.
|
182 |
Return the serials of the purged nodes.
|
183 |
"""
|
184 |
purged = self.node_delete(parent, cluster=0) |
185 |
purged = [props[0] for props in purged] |
186 |
map_remv = self.map_remv
|
187 |
for serial in purged: |
188 |
# XXX: victim caches?
|
189 |
map_remv(serial) |
190 |
return purged
|
191 |
|
192 |
def file_purge_path(self, parent, path, after=0, before=inf, cluster=0): |
193 |
"""Purge all versions in (after, before)
|
194 |
for the given parent and path and cluster.
|
195 |
Return the list of serials deleted.
|
196 |
"""
|
197 |
serials = self.node_purge(parent, path, after, cluster=cluster)
|
198 |
map_remv = self.map_remv
|
199 |
for serial in serials: |
200 |
map_remv(serial) |
201 |
return serials
|
202 |
|
203 |
def file_lookup(self, parent, path, before=inf, |
204 |
create=0, size=0, cluster=0): |
205 |
"""Lookup the latest node in parent and path.
|
206 |
If the object is not found and create is true,
|
207 |
it is created with the given size.
|
208 |
Also, accept a before time limit and a cluster.
|
209 |
"""
|
210 |
props = self.node_lookup(parent, path, before=inf, cluster=cluster)
|
211 |
if props or not create: |
212 |
return props
|
213 |
|
214 |
source = None
|
215 |
population = 0
|
216 |
popsize = 0
|
217 |
r = self.node_create(parent, path, size, source, cluster=cluster)
|
218 |
serial, mtime = r |
219 |
return (serial, parent, path, size,
|
220 |
population, popsize, source, mtime) |
221 |
|
222 |
def file_copy(self, source, parent=None, path=None): |
223 |
"""Create a file as a copy (data and attributes) of another.
|
224 |
If parent or path is not specified, it remains the same.
|
225 |
"""
|
226 |
serial, mtime = self.node_copy(source, parent, path)
|
227 |
self.node_attr_copy(source, serial)
|
228 |
return serial, mtime
|
229 |
|
230 |
def file_read(self, serial, nr, blkoff=0): |
231 |
"""Read a number of blocks at block offset from a file."""
|
232 |
hashes = self.map_retr(serial, blkoff=blkoff, nr=nr)
|
233 |
blocks = self.block_retr(hashes)
|
234 |
return blocks
|
235 |
|
236 |
def file_write(self, serial, blocks, blkoff=0, fork=0): |
237 |
"""Write blocks at block offset and create a new version.
|
238 |
If fork is true, the old version is preserved.
|
239 |
"""
|
240 |
blocksize = self.blocksize
|
241 |
datasize = (len(blocks) -1) * blocksize + len(blocks[-1]) |
242 |
props = self.node_get_properties(serial)
|
243 |
oldsize = props[3]
|
244 |
newsize = blkoff * blocksize + datasize |
245 |
|
246 |
if fork:
|
247 |
serial, mtime = self.file_clone(serial, props[1], props[2]) |
248 |
|
249 |
hashes, missing = self.block_stor(blocks)
|
250 |
self.map_stor(serial, hashes, blkoff=blkoff)
|
251 |
|
252 |
node_increment = self.node_increment
|
253 |
if newsize > oldsize:
|
254 |
mtime = node_increment(serial, newsize) |
255 |
elif not fork: |
256 |
mtime = node_increment(serial, oldsize) |
257 |
|
258 |
return serial, mtime, newsize, hashes, missing
|
259 |
|
260 |
def file_move(self, serial, parent, path=None): |
261 |
"""Move a file, possibly to another parent and path."""
|
262 |
if path is None: |
263 |
props = self.node_get_properties(serial)
|
264 |
path = props[PATH] |
265 |
|
266 |
return self.node_move(serial, parent, path) |
267 |
|
268 |
def file_recluster(self, serial, cluster): |
269 |
"""Send file to the given cluster."""
|
270 |
self.node_set_properties(serial, (('cluster', cluster),)) |
271 |
|
272 |
|
273 |
if __name__ == "__main__": |
274 |
|
275 |
hf = Filer( filepath=".hf-test/nodes.sqlite3",
|
276 |
mappath=".hf-test/maps",
|
277 |
blockpath=".hf-test/blocks",
|
278 |
blocksize=64*1024, |
279 |
hashtype='sha256')
|
280 |
|
281 |
#hf.quit()
|
282 |
|