Revision b5636704
b/snf-pithos-backend/pithos/backends/lib/hashfiler/archipelagoblocker.py | ||
---|---|---|
33 | 33 |
|
34 | 34 |
from hashlib import new as newhasher |
35 | 35 |
from binascii import hexlify |
36 |
import os, re |
|
36 |
import os |
|
37 |
import re |
|
37 | 38 |
|
38 | 39 |
from context_archipelago import ArchipelagoObject, file_sync_read_chunks |
39 | 40 |
from archipelago.common import ( |
... | ... | |
42 | 43 |
string_at, |
43 | 44 |
) |
44 | 45 |
|
45 |
from pithos.workers import glue, monkey |
|
46 |
from pithos.workers import ( |
|
47 |
glue, |
|
48 |
monkey, |
|
49 |
) |
|
46 | 50 |
|
47 | 51 |
monkey.patch_Request() |
48 | 52 |
|
49 | 53 |
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF |
50 | 54 |
|
55 |
|
|
51 | 56 |
class ArchipelagoBlocker(object): |
52 | 57 |
"""Blocker. |
53 | 58 |
Required constructor parameters: blocksize, hashtype. |
... | ... | |
61 | 66 |
cfg = {} |
62 | 67 |
bcfg = open(BACKEND_ARCHIPELAGO_CONF).read() |
63 | 68 |
cfg['blockerb'] = re.search('\'blockerb_port\'\s*:\s*\d+', |
64 |
bcfg).group(0).split(':')[1] |
|
69 |
bcfg).group(0).split(':')[1]
|
|
65 | 70 |
blocksize = params['blocksize'] |
66 | 71 |
hashtype = params['hashtype'] |
67 | 72 |
try: |
... | ... | |
85 | 90 |
|
86 | 91 |
def _get_rear_block(self, blkhash, create=0): |
87 | 92 |
name = hexlify(blkhash) |
88 |
return ArchipelagoObject(name, self.ioctx_pool,self.dst_port,create)
|
|
93 |
return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
|
|
89 | 94 |
|
90 | 95 |
def _check_rear_block(self, blkhash): |
91 | 96 |
filename = hexlify(blkhash) |
92 | 97 |
ioctx = self.ioctx_pool.pool_get() |
93 |
req = Request.get_info_request(ioctx,self.dst_port,filename)
|
|
98 |
req = Request.get_info_request(ioctx, self.dst_port, filename)
|
|
94 | 99 |
req.submit() |
95 | 100 |
req.wait() |
96 | 101 |
ret = req.success() |
... | ... | |
182 | 187 |
self.ioctx_pool.pool_put(ioctx) |
183 | 188 |
return blocks |
184 | 189 |
|
185 |
|
|
186 | 190 |
def block_stor(self, blocklist): |
187 | 191 |
"""Store a bunch of blocks and return (hashes, missing). |
188 | 192 |
Hashes is a list of the hashes of the blocks, |
... | ... | |
232 | 236 |
append = hashes.append |
233 | 237 |
block_hash = self.block_hash |
234 | 238 |
|
235 |
for block in file_sync_read_chunks(archipelagoobject, self.blocksize, 1, 0): |
|
239 |
for block in file_sync_read_chunks(archipelagoobject, |
|
240 |
self.blocksize, 1, 0): |
|
236 | 241 |
append(block_hash(block)) |
237 | 242 |
|
238 | 243 |
return hashes |
b/snf-pithos-backend/pithos/backends/lib/hashfiler/archipelagomapper.py | ||
---|---|---|
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 | 34 |
from binascii import hexlify |
35 |
import os, re |
|
35 |
import os |
|
36 |
import re |
|
36 | 37 |
import ctypes |
37 | 38 |
|
38 | 39 |
from context_archipelago import ArchipelagoObject |
... | ... | |
68 | 69 |
cfg = {} |
69 | 70 |
bcfg = open(BACKEND_ARCHIPELAGO_CONF).read() |
70 | 71 |
cfg['blockerm'] = re.search('\'blockerm_port\'\s*:\s*\d+', |
71 |
bcfg).group(0).split(':')[1]
|
|
72 |
bcfg).group(0).split(':')[1] |
|
72 | 73 |
cfg['mapperd'] = re.search('\'mapper_port\'\s*:\s*\d+', |
73 |
bcfg).group(0).split(':')[1]
|
|
74 |
bcfg).group(0).split(':')[1] |
|
74 | 75 |
self.ioctx_pool = ioctx_pool |
75 | 76 |
self.dst_port = int(cfg['blockerm']) |
76 | 77 |
self.mapperd_port = int(cfg['mapperd']) |
... | ... | |
113 | 114 |
else: |
114 | 115 |
req.put() |
115 | 116 |
self.ioctx_pool.pool_put(ioctx) |
116 |
raise RuntimeError("Hashmap '%s' doesn't exists" % hexlify(maphash)) |
|
117 |
raise RuntimeError("Hashmap '%s' doesn't exists" % |
|
118 |
hexlify(maphash)) |
|
117 | 119 |
req = Request.get_read_request(ioctx, self.dst_port, |
118 |
hexlify(maphash), size = size)
|
|
120 |
hexlify(maphash), size=size)
|
|
119 | 121 |
req.submit() |
120 | 122 |
req.wait() |
121 | 123 |
ret = req.success() |
122 | 124 |
if ret: |
123 |
data = string_at(req.get_data(),size) |
|
125 |
data = string_at(req.get_data(), size)
|
|
124 | 126 |
req.put() |
125 | 127 |
self.ioctx_pool.pool_put(ioctx) |
126 |
for idx in xrange(0,len(data),namelen):
|
|
128 |
for idx in xrange(0, len(data), namelen):
|
|
127 | 129 |
hashes = hashes + (data[idx:idx+namelen],) |
128 | 130 |
hashes = list(hashes) |
129 | 131 |
else: |
130 | 132 |
req.put() |
131 | 133 |
self.ioctx_pool.pool_put(ioctx) |
132 |
raise RuntimeError("Hashmap '%s' doesn't exists" % hexlify(maphash)) |
|
134 |
raise RuntimeError("Hashmap '%s' doesn't exists" % |
|
135 |
hexlify(maphash)) |
|
133 | 136 |
return hashes |
134 | 137 |
|
135 | 138 |
def map_retr_archipelago(self, maphash, size): |
... | ... | |
151 | 154 |
self.ioctx_pool.pool_put(ioctx) |
152 | 155 |
raise Exception("Could not retrieve Archipelago mapfile.") |
153 | 156 |
self.ioctx_pool.pool_put(ioctx) |
154 |
return [string_at(segs[idx].target, segs[idx].targetlen) for idx in xrange(len(segs))] |
|
157 |
return [string_at(segs[idx].target, segs[idx].targetlen) |
|
158 |
for idx in xrange(len(segs))] |
|
155 | 159 |
|
156 | 160 |
def map_stor(self, maphash, hashes=(), blkoff=0, create=1): |
157 | 161 |
"""Store hashes in the given hashes map.""" |
b/snf-pithos-backend/pithos/backends/lib/hashfiler/blocker.py | ||
---|---|---|
81 | 81 |
archip_hash = None |
82 | 82 |
archip_existed = True |
83 | 83 |
(archip_hash, archip_existed) = \ |
84 |
self.archip_blocker.block_delta(blkhash, offset, data)
|
|
84 |
self.archip_blocker.block_delta(blkhash, offset, data) |
|
85 | 85 |
|
86 | 86 |
if not archip_hash: |
87 | 87 |
return None, None |
b/snf-pithos-backend/pithos/backends/lib/hashfiler/context_archipelago.py | ||
---|---|---|
33 | 33 |
|
34 | 34 |
from os import SEEK_CUR, SEEK_SET |
35 | 35 |
from archipelago.common import ( |
36 |
Request,
|
|
37 |
string_at,
|
|
38 |
)
|
|
36 |
Request, |
|
37 |
string_at, |
|
38 |
) |
|
39 | 39 |
from pithos.workers import monkey |
40 | 40 |
monkey.patch_Request() |
41 | 41 |
|
... | ... | |
139 | 139 |
in archipelago") |
140 | 140 |
|
141 | 141 |
def sync_write(self, data): |
142 |
ioctx = self.ioctx_pool.pool_get()
|
|
142 |
ioctx = self.ioctx_pool.pool_get() |
|
143 | 143 |
req = Request.get_write_request(ioctx, self.dst_port, self.name, |
144 | 144 |
data=data, offset=self.offset, |
145 | 145 |
datalen=len(data)) |
... | ... | |
154 | 154 |
raise IOError("archipelago: Write request error") |
155 | 155 |
|
156 | 156 |
def sync_write_chunks(self, chunksize, offset, chunks, size=None): |
157 |
return file_sync_write_chunks(self, chunksize, offset, chunks,size) |
|
157 |
return file_sync_write_chunks(self, chunksize, offset, chunks, size)
|
|
158 | 158 |
|
159 | 159 |
def sync_read(self, size): |
160 | 160 |
read = Request.get_read_request |
... | ... | |
162 | 162 |
datalen = 0 |
163 | 163 |
dsize = size |
164 | 164 |
while 1: |
165 |
ioctx = self.ioctx_pool.pool_get()
|
|
165 |
ioctx = self.ioctx_pool.pool_get() |
|
166 | 166 |
req = read(ioctx, self.dst_port, |
167 |
self.name,size=dsize-datalen,offset=self.offset)
|
|
167 |
self.name, size=dsize-datalen, offset=self.offset)
|
|
168 | 168 |
req.submit() |
169 | 169 |
req.wait() |
170 | 170 |
ret = req.success() |
171 | 171 |
if ret: |
172 |
s = string_at(req.get_data(),dsize-datalen) |
|
172 |
s = string_at(req.get_data(), dsize-datalen)
|
|
173 | 173 |
else: |
174 | 174 |
s = None |
175 | 175 |
req.put() |
b/snf-pithos-backend/pithos/backends/lib/hashfiler/store.py | ||
---|---|---|
49 | 49 |
os.umask(umask) |
50 | 50 |
|
51 | 51 |
pb = {'blocksize': params['block_size'], |
52 |
'hashtype': params['hash_algorithm'], |
|
53 |
} |
|
52 |
'hashtype': params['hash_algorithm'],
|
|
53 |
}
|
|
54 | 54 |
self.blocker = Blocker(**pb) |
55 | 55 |
pm = {'namelen': self.blocker.hashlen, |
56 |
} |
|
56 |
}
|
|
57 | 57 |
self.mapper = Mapper(**pm) |
58 | 58 |
|
59 | 59 |
def map_get(self, name): |
b/snf-pithos-backend/pithos/workers/glue.py | ||
---|---|---|
35 | 35 |
|
36 | 36 |
import re |
37 | 37 |
|
38 |
|
|
38 | 39 |
class WorkerGlue(object): |
39 | 40 |
|
40 | 41 |
pmap = {} |
... | ... | |
55 | 56 |
cfg = {} |
56 | 57 |
bcfg = open(ARCHIPELAGO_CONF_FILE).read() |
57 | 58 |
cfg['SEGMENT_PORTS'] = re.search('SEGMENT_PORTS\s*=\s*\d+', |
58 |
bcfg).group(0).split('=')[1] |
|
59 |
bcfg).group(0).split('=')[1]
|
|
59 | 60 |
cfg['SEGMENT_SIZE'] = re.search('SEGMENT_SIZE\s*=\s*\d+', |
60 |
bcfg).group(0).split('=')[1] |
|
61 |
bcfg).group(0).split('=')[1]
|
|
61 | 62 |
ARCHIPELAGO_SEGMENT_PORTS = int(cfg['SEGMENT_PORTS']) |
62 | 63 |
ARCHIPELAGO_SEGMENT_SIZE = int(cfg['SEGMENT_SIZE']) |
63 | 64 |
ARCHIPELAGO_SEGMENT_ALIGNMENT = 12 |
Also available in: Unified diff