Revision f3525003
b/snf-pithos-backend/pithos/backends/lib/hashfiler/archipelagoblocker.py | ||
---|---|---|
1 |
# Copyright 2013 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from hashlib import new as newhasher |
|
35 |
from binascii import hexlify |
|
36 |
import os, re |
|
37 |
|
|
38 |
from context_archipelago import ArchipelagoObject, file_sync_read_chunks |
|
39 |
from archipelago.common import ( |
|
40 |
Request, |
|
41 |
xseg_reply_info, |
|
42 |
string_at, |
|
43 |
) |
|
44 |
|
|
45 |
from pithos.workers import glue, monkey |
|
46 |
|
|
47 |
monkey.patch_Request() |
|
48 |
|
|
49 |
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF |
|
50 |
|
|
51 |
class ArchipelagoBlocker(object): |
|
52 |
"""Blocker. |
|
53 |
Required constructor parameters: blocksize, hashtype. |
|
54 |
""" |
|
55 |
|
|
56 |
blocksize = None |
|
57 |
blockpool = None |
|
58 |
hashtype = None |
|
59 |
|
|
60 |
def __init__(self, **params): |
|
61 |
cfg = {} |
|
62 |
bcfg = open(BACKEND_ARCHIPELAGO_CONF).read() |
|
63 |
cfg['blockerb'] = re.search('\'blockerb_port\'\s*:\s*\d+', |
|
64 |
bcfg).group(0).split(':')[1] |
|
65 |
blocksize = params['blocksize'] |
|
66 |
hashtype = params['hashtype'] |
|
67 |
try: |
|
68 |
hasher = newhasher(hashtype) |
|
69 |
except ValueError: |
|
70 |
msg = "Variable hashtype '%s' is not available from hashlib" |
|
71 |
raise ValueError(msg % (hashtype,)) |
|
72 |
|
|
73 |
hasher.update("") |
|
74 |
emptyhash = hasher.digest() |
|
75 |
|
|
76 |
self.blocksize = blocksize |
|
77 |
self.ioctx_pool = glue.WorkerGlue().ioctx_pool |
|
78 |
self.dst_port = int(cfg['blockerb']) |
|
79 |
self.hashtype = hashtype |
|
80 |
self.hashlen = len(emptyhash) |
|
81 |
self.emptyhash = emptyhash |
|
82 |
|
|
83 |
def _pad(self, block): |
|
84 |
return block + ('\x00' * (self.blocksize - len(block))) |
|
85 |
|
|
86 |
def _get_rear_block(self, blkhash, create=0): |
|
87 |
name = hexlify(blkhash) |
|
88 |
return ArchipelagoObject(name, self.ioctx_pool,self.dst_port,create) |
|
89 |
|
|
90 |
def _check_rear_block(self, blkhash): |
|
91 |
filename = hexlify(blkhash) |
|
92 |
ioctx = self.ioctx_pool.pool_get() |
|
93 |
req = Request.get_info_request(ioctx,self.dst_port,filename) |
|
94 |
req.submit() |
|
95 |
req.wait() |
|
96 |
ret = req.success() |
|
97 |
req.put() |
|
98 |
self.ioctx_pool.pool_put(ioctx) |
|
99 |
if ret: |
|
100 |
return True |
|
101 |
else: |
|
102 |
return False |
|
103 |
|
|
104 |
def block_hash(self, data): |
|
105 |
"""Hash a block of data""" |
|
106 |
hasher = newhasher(self.hashtype) |
|
107 |
hasher.update(data.rstrip('\x00')) |
|
108 |
return hasher.digest() |
|
109 |
|
|
110 |
def block_ping(self, hashes): |
|
111 |
"""Check hashes for existence and |
|
112 |
return those missing from block storage. |
|
113 |
""" |
|
114 |
notfound = [] |
|
115 |
append = notfound.append |
|
116 |
|
|
117 |
for h in hashes: |
|
118 |
if h not in notfound and not self._check_rear_block(h): |
|
119 |
append(h) |
|
120 |
|
|
121 |
return notfound |
|
122 |
|
|
123 |
def block_retr(self, hashes): |
|
124 |
"""Retrieve blocks from storage by their hashes.""" |
|
125 |
blocksize = self.blocksize |
|
126 |
blocks = [] |
|
127 |
append = blocks.append |
|
128 |
block = None |
|
129 |
|
|
130 |
for h in hashes: |
|
131 |
if h == self.emptyhash: |
|
132 |
append(self._pad('')) |
|
133 |
continue |
|
134 |
with self._get_rear_block(h, 0) as rbl: |
|
135 |
if not rbl: |
|
136 |
break |
|
137 |
for block in rbl.sync_read_chunks(blocksize, 1, 0): |
|
138 |
break # there should be just one block there |
|
139 |
if not block: |
|
140 |
break |
|
141 |
append(self._pad(block)) |
|
142 |
|
|
143 |
return blocks |
|
144 |
|
|
145 |
def block_retr_archipelago(self, hashes): |
|
146 |
"""Retrieve blocks from storage by their hashes""" |
|
147 |
blocks = [] |
|
148 |
append = blocks.append |
|
149 |
block = None |
|
150 |
|
|
151 |
ioctx = self.ioctx_pool.pool_get() |
|
152 |
archip_emptyhash = hexlify(self.emptyhash) |
|
153 |
|
|
154 |
for h in hashes: |
|
155 |
if h == archip_emptyhash: |
|
156 |
append(self._pad('')) |
|
157 |
continue |
|
158 |
req = Request.get_info_request(ioctx, self.dst_port, h) |
|
159 |
req.submit() |
|
160 |
req.wait() |
|
161 |
ret = req.success() |
|
162 |
if ret: |
|
163 |
info = req.get_data(_type=xseg_reply_info) |
|
164 |
size = info.contents.size |
|
165 |
req.put() |
|
166 |
req_data = Request.get_read_request(ioctx, self.dst_port, h, |
|
167 |
size=size) |
|
168 |
req_data.submit() |
|
169 |
req_data.wait() |
|
170 |
ret_data = req_data.success() |
|
171 |
if ret_data: |
|
172 |
append(self._pad(string_at(req_data.get_data(), size))) |
|
173 |
req_data.put() |
|
174 |
else: |
|
175 |
req_data.put() |
|
176 |
self.ioctx_pool.put(ioctx) |
|
177 |
raise Exception("Cannot retrieve Archipelago data.") |
|
178 |
else: |
|
179 |
req.put() |
|
180 |
self.ioctx_pool.pool_put(ioctx) |
|
181 |
raise Exception("Bad block file.") |
|
182 |
self.ioctx_pool.pool_put(ioctx) |
|
183 |
return blocks |
|
184 |
|
|
185 |
|
|
186 |
def block_stor(self, blocklist): |
|
187 |
"""Store a bunch of blocks and return (hashes, missing). |
|
188 |
Hashes is a list of the hashes of the blocks, |
|
189 |
missing is a list of indices in that list indicating |
|
190 |
which blocks were missing from the store. |
|
191 |
""" |
|
192 |
block_hash = self.block_hash |
|
193 |
hashlist = [block_hash(b) for b in blocklist] |
|
194 |
missing = [i for i, h in enumerate(hashlist) if not |
|
195 |
self._check_rear_block(h)] |
|
196 |
for i in missing: |
|
197 |
with self._get_rear_block(hashlist[i], 1) as rbl: |
|
198 |
rbl.sync_write(blocklist[i]) # XXX: verify? |
|
199 |
|
|
200 |
return hashlist, missing |
|
201 |
|
|
202 |
def block_delta(self, blkhash, offset, data): |
|
203 |
"""Construct and store a new block from a given block |
|
204 |
and a data 'patch' applied at offset. Return: |
|
205 |
(the hash of the new block, if the block already existed) |
|
206 |
""" |
|
207 |
|
|
208 |
blocksize = self.blocksize |
|
209 |
if offset >= blocksize or not data: |
|
210 |
return None, None |
|
211 |
|
|
212 |
block = self.block_retr((blkhash,)) |
|
213 |
if not block: |
|
214 |
return None, None |
|
215 |
|
|
216 |
block = block[0] |
|
217 |
newblock = block[:offset] + data |
|
218 |
if len(newblock) > blocksize: |
|
219 |
newblock = newblock[:blocksize] |
|
220 |
elif len(newblock) < blocksize: |
|
221 |
newblock += block[len(newblock):] |
|
222 |
|
|
223 |
h, a = self.block_stor((newblock,)) |
|
224 |
return h[0], 1 if a else 0 |
|
225 |
|
|
226 |
def block_hash_file(self, archipelagoobject): |
|
227 |
"""Return the list of hashes (hashes map) |
|
228 |
for the blocks in a buffered file. |
|
229 |
Helper method, does not affect store. |
|
230 |
""" |
|
231 |
hashes = [] |
|
232 |
append = hashes.append |
|
233 |
block_hash = self.block_hash |
|
234 |
|
|
235 |
for block in file_sync_read_chunks(archipelagoobject, self.blocksize, 1, 0): |
|
236 |
append(block_hash(block)) |
|
237 |
|
|
238 |
return hashes |
|
239 |
|
|
240 |
def block_stor_file(self, archipelagoobject): |
|
241 |
"""Read blocks from buffered file object and store them. Return: |
|
242 |
(bytes read, list of hashes, list of hashes that were missing) |
|
243 |
""" |
|
244 |
blocksize = self.blocksize |
|
245 |
block_stor = self.block_stor |
|
246 |
hashlist = [] |
|
247 |
hextend = hashlist.extend |
|
248 |
storedlist = [] |
|
249 |
sextend = storedlist.extend |
|
250 |
lastsize = 0 |
|
251 |
|
|
252 |
for block in file_sync_read_chunks(archipelagoobject, blocksize, 1, 0): |
|
253 |
hl, sl = block_stor((block,)) |
|
254 |
hextend(hl) |
|
255 |
sextend(sl) |
|
256 |
lastsize = len(block) |
|
257 |
|
|
258 |
size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0 |
|
259 |
return size, hashlist, storedlist |
b/snf-pithos-backend/pithos/backends/lib/hashfiler/archipelagomapper.py | ||
---|---|---|
1 |
# Copyright 2013 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from binascii import hexlify |
|
35 |
import os, re |
|
36 |
import ctypes |
|
37 |
|
|
38 |
from context_archipelago import ArchipelagoObject |
|
39 |
from archipelago.common import ( |
|
40 |
Request, |
|
41 |
xseg_reply_info, |
|
42 |
xseg_reply_map, |
|
43 |
xseg_reply_map_scatterlist, |
|
44 |
string_at, |
|
45 |
) |
|
46 |
|
|
47 |
from pithos.workers import ( |
|
48 |
glue, |
|
49 |
monkey, |
|
50 |
) |
|
51 |
|
|
52 |
monkey.patch_Request() |
|
53 |
|
|
54 |
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF |
|
55 |
|
|
56 |
|
|
57 |
class ArchipelagoMapper(object): |
|
58 |
"""Mapper. |
|
59 |
Required constructor parameters: namelen. |
|
60 |
""" |
|
61 |
|
|
62 |
namelen = None |
|
63 |
|
|
64 |
def __init__(self, **params): |
|
65 |
self.params = params |
|
66 |
self.namelen = params['namelen'] |
|
67 |
ioctx_pool = glue.WorkerGlue().ioctx_pool |
|
68 |
cfg = {} |
|
69 |
bcfg = open(BACKEND_ARCHIPELAGO_CONF).read() |
|
70 |
cfg['blockerm'] = re.search('\'blockerm_port\'\s*:\s*\d+', |
|
71 |
bcfg).group(0).split(':')[1] |
|
72 |
cfg['mapperd'] = re.search('\'mapper_port\'\s*:\s*\d+', |
|
73 |
bcfg).group(0).split(':')[1] |
|
74 |
self.ioctx_pool = ioctx_pool |
|
75 |
self.dst_port = int(cfg['blockerm']) |
|
76 |
self.mapperd_port = int(cfg['mapperd']) |
|
77 |
|
|
78 |
def _get_rear_map(self, maphash, create=0): |
|
79 |
name = hexlify(maphash) |
|
80 |
return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create) |
|
81 |
|
|
82 |
def _check_rear_map(self, maphash): |
|
83 |
name = hexlify(maphash) |
|
84 |
ioctx = self.ioctx_pool.pool_get() |
|
85 |
req = Request.get_info_request(ioctx, self.dst_port, name) |
|
86 |
req.submit() |
|
87 |
req.wait() |
|
88 |
ret = req.success() |
|
89 |
req.put() |
|
90 |
self.ioctx_pool.pool_put(ioctx) |
|
91 |
if ret: |
|
92 |
return True |
|
93 |
else: |
|
94 |
return False |
|
95 |
|
|
96 |
def map_retr(self, maphash, blkoff=0, nr=100000000000000): |
|
97 |
"""Return as a list, part of the hashes map of an object |
|
98 |
at the given block offset. |
|
99 |
By default, return the whole hashes map. |
|
100 |
""" |
|
101 |
namelen = self.namelen |
|
102 |
hashes = () |
|
103 |
ioctx = self.ioctx_pool.pool_get() |
|
104 |
req = Request.get_info_request(ioctx, self.dst_port, |
|
105 |
hexlify(maphash)) |
|
106 |
req.submit() |
|
107 |
req.wait() |
|
108 |
ret = req.success() |
|
109 |
if ret: |
|
110 |
info = req.get_data(_type=xseg_reply_info) |
|
111 |
size = int(info.contents.size) |
|
112 |
req.put() |
|
113 |
else: |
|
114 |
req.put() |
|
115 |
self.ioctx_pool.pool_put(ioctx) |
|
116 |
raise RuntimeError("Hashmap '%s' doesn't exists" % hexlify(maphash)) |
|
117 |
req = Request.get_read_request(ioctx, self.dst_port, |
|
118 |
hexlify(maphash), size = size) |
|
119 |
req.submit() |
|
120 |
req.wait() |
|
121 |
ret = req.success() |
|
122 |
if ret: |
|
123 |
data = string_at(req.get_data(),size) |
|
124 |
req.put() |
|
125 |
self.ioctx_pool.pool_put(ioctx) |
|
126 |
for idx in xrange(0,len(data),namelen): |
|
127 |
hashes = hashes + (data[idx:idx+namelen],) |
|
128 |
hashes = list(hashes) |
|
129 |
else: |
|
130 |
req.put() |
|
131 |
self.ioctx_pool.pool_put(ioctx) |
|
132 |
raise RuntimeError("Hashmap '%s' doesn't exists" % hexlify(maphash)) |
|
133 |
return hashes |
|
134 |
|
|
135 |
def map_retr_archipelago(self, maphash, size): |
|
136 |
"""Retrieve Archipelago mapfile""" |
|
137 |
ioctx = self.ioctx_pool.pool_get() |
|
138 |
maphash = maphash.split("archip:")[1] |
|
139 |
req = Request.get_mapr_request(ioctx, self.mapperd_port, maphash, |
|
140 |
offset=0, size=size) |
|
141 |
req.submit() |
|
142 |
req.wait() |
|
143 |
ret = req.success() |
|
144 |
if ret: |
|
145 |
data = req.get_data(xseg_reply_map) |
|
146 |
Segsarray = xseg_reply_map_scatterlist * data.contents.cnt |
|
147 |
segs = Segsarray.from_address(ctypes.addressof(data.contents.segs)) |
|
148 |
req.put() |
|
149 |
else: |
|
150 |
req.put() |
|
151 |
self.ioctx_pool.pool_put(ioctx) |
|
152 |
raise Exception("Could not retrieve Archipelago mapfile.") |
|
153 |
self.ioctx_pool.pool_put(ioctx) |
|
154 |
return [string_at(segs[idx].target, segs[idx].targetlen) for idx in xrange(len(segs))] |
|
155 |
|
|
156 |
def map_stor(self, maphash, hashes=(), blkoff=0, create=1): |
|
157 |
"""Store hashes in the given hashes map.""" |
|
158 |
namelen = self.namelen |
|
159 |
if self._check_rear_map(maphash): |
|
160 |
return |
|
161 |
with self._get_rear_map(maphash, 1) as rmap: |
|
162 |
rmap.sync_write_chunks(namelen, blkoff, hashes, None) |
b/snf-pithos-backend/pithos/backends/lib/hashfiler/context_archipelago.py | ||
---|---|---|
1 |
# Copyright 2013 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from os import SEEK_CUR, SEEK_SET |
|
35 |
from archipelago.common import ( |
|
36 |
Request, |
|
37 |
string_at, |
|
38 |
) |
|
39 |
from pithos.workers import monkey |
|
40 |
monkey.patch_Request() |
|
41 |
|
|
42 |
_zeros = '' |
|
43 |
|
|
44 |
|
|
45 |
def zeros(nr): |
|
46 |
global _zeros |
|
47 |
size = len(_zeros) |
|
48 |
if nr == size: |
|
49 |
return _zeros |
|
50 |
|
|
51 |
if nr > size: |
|
52 |
_zeros += '\0' * (nr - size) |
|
53 |
return _zeros |
|
54 |
|
|
55 |
if nr < size: |
|
56 |
_zeros = _zeros[:nr] |
|
57 |
return _zeros |
|
58 |
|
|
59 |
|
|
60 |
def file_sync_write_chunks(archipelagoobject, chunksize, offset, |
|
61 |
chunks, size=None): |
|
62 |
"""Write given chunks to the given buffered file object. |
|
63 |
Writes never span across chunk boundaries. |
|
64 |
If size is given stop after or pad until size bytes have been written. |
|
65 |
""" |
|
66 |
padding = 0 |
|
67 |
cursize = chunksize * offset |
|
68 |
archipelagoobject.seek(cursize) |
|
69 |
for chunk in chunks: |
|
70 |
if padding: |
|
71 |
archipelagoobject.sync_write(buffer(zeros(chunksize), 0, padding)) |
|
72 |
if size is not None and cursize + chunksize >= size: |
|
73 |
chunk = chunk[:chunksize - (cursize - size)] |
|
74 |
archipelagoobject.sync_write(chunk) |
|
75 |
cursize += len(chunk) |
|
76 |
break |
|
77 |
archipelagoobject.sync_write(chunk) |
|
78 |
padding = chunksize - len(chunk) |
|
79 |
|
|
80 |
padding = size - cursize if size is not None else 0 |
|
81 |
if padding <= 0: |
|
82 |
return |
|
83 |
|
|
84 |
q, r = divmod(padding, chunksize) |
|
85 |
for x in xrange(q): |
|
86 |
archipelagoobject.sync_write(zeros(chunksize)) |
|
87 |
archipelagoobject.sync_write(buffer(zeros(chunksize), 0, r)) |
|
88 |
|
|
89 |
|
|
90 |
def file_sync_read_chunks(archipelagoobject, chunksize, nr, offset=0): |
|
91 |
"""Read and yield groups of chunks from a buffered file object at offset. |
|
92 |
Reads never span accros chunksize boundaries. |
|
93 |
""" |
|
94 |
archipelagoobject.seek(offset * chunksize) |
|
95 |
while nr: |
|
96 |
remains = chunksize |
|
97 |
chunk = '' |
|
98 |
while 1: |
|
99 |
s = archipelagoobject.sync_read(remains) |
|
100 |
if not s: |
|
101 |
if chunk: |
|
102 |
yield chunk |
|
103 |
return |
|
104 |
chunk += s |
|
105 |
remains -= len(s) |
|
106 |
if remains <= 0: |
|
107 |
break |
|
108 |
yield chunk |
|
109 |
nr -= 1 |
|
110 |
|
|
111 |
|
|
112 |
class ArchipelagoObject(object): |
|
113 |
__slots__ = ("name", "ioctx_pool", "dst_port", "create", "offset") |
|
114 |
|
|
115 |
def __init__(self, name, ioctx_pool, dst_port=None, create=0): |
|
116 |
self.name = name |
|
117 |
self.ioctx_pool = ioctx_pool |
|
118 |
self.create = create |
|
119 |
self.dst_port = dst_port |
|
120 |
self.offset = 0 |
|
121 |
|
|
122 |
def __enter__(self): |
|
123 |
return self |
|
124 |
|
|
125 |
def __exit__(self, exc, arg, trace): |
|
126 |
return False |
|
127 |
|
|
128 |
def seek(self, offset, whence=SEEK_SET): |
|
129 |
if whence == SEEK_CUR: |
|
130 |
offset += self.offset |
|
131 |
self.offset = offset |
|
132 |
return offset |
|
133 |
|
|
134 |
def tell(self): |
|
135 |
return self.offset |
|
136 |
|
|
137 |
def truncate(self, size): |
|
138 |
raise NotImplementedError("File truncation is not implemented yet \ |
|
139 |
in archipelago") |
|
140 |
|
|
141 |
def sync_write(self, data): |
|
142 |
ioctx = self.ioctx_pool.pool_get() |
|
143 |
req = Request.get_write_request(ioctx, self.dst_port, self.name, |
|
144 |
data=data, offset=self.offset, |
|
145 |
datalen=len(data)) |
|
146 |
req.submit() |
|
147 |
req.wait() |
|
148 |
ret = req.success() |
|
149 |
req.put() |
|
150 |
self.ioctx_pool.pool_put(ioctx) |
|
151 |
if ret: |
|
152 |
self.offset += len(data) |
|
153 |
else: |
|
154 |
raise IOError("archipelago: Write request error") |
|
155 |
|
|
156 |
def sync_write_chunks(self, chunksize, offset, chunks, size=None): |
|
157 |
return file_sync_write_chunks(self, chunksize, offset, chunks,size) |
|
158 |
|
|
159 |
def sync_read(self, size): |
|
160 |
read = Request.get_read_request |
|
161 |
data = '' |
|
162 |
datalen = 0 |
|
163 |
dsize = size |
|
164 |
while 1: |
|
165 |
ioctx = self.ioctx_pool.pool_get() |
|
166 |
req = read(ioctx, self.dst_port, |
|
167 |
self.name,size=dsize-datalen,offset=self.offset) |
|
168 |
req.submit() |
|
169 |
req.wait() |
|
170 |
ret = req.success() |
|
171 |
if ret: |
|
172 |
s = string_at(req.get_data(),dsize-datalen) |
|
173 |
else: |
|
174 |
s = None |
|
175 |
req.put() |
|
176 |
self.ioctx_pool.pool_put(ioctx) |
|
177 |
if not s: |
|
178 |
break |
|
179 |
data += s |
|
180 |
datalen += len(s) |
|
181 |
self.offset += len(s) |
|
182 |
if datalen >= size: |
|
183 |
break |
|
184 |
return data |
|
185 |
|
|
186 |
def sync_read_chunks(self, chunksize, nr, offset=0): |
|
187 |
return file_sync_read_chunks(self, chunksize, nr, offset) |
Also available in: Unified diff