Revision a1557c9c
b/snf-pithos-app/conf/20-snf-pithos-app-settings.conf | ||
---|---|---|
10 | 10 |
|
11 | 11 |
# Block storage. |
12 | 12 |
#PITHOS_BACKEND_BLOCK_MODULE = 'pithos.backends.lib.hashfiler' |
13 |
#PITHOS_BACKEND_BLOCK_PATH = '/tmp/pithos-data/' |
|
14 | 13 |
#PITHOS_BACKEND_BLOCK_UMASK = 0o022 |
15 | 14 |
|
16 | 15 |
# Default setting for new accounts. |
... | ... | |
25 | 24 |
# Service Token acquired by identity provider. |
26 | 25 |
#PITHOS_SERVICE_TOKEN = '' |
27 | 26 |
|
28 |
# Enable and configure secondary rados storage for pithos |
|
29 |
#PITHOS_RADOS_STORAGE = False |
|
30 |
#PITHOS_RADOS_POOL_BLOCKS = 'blocks' |
|
31 |
#PITHOS_RADOS_POOL_MAPS = 'maps' |
|
32 |
|
|
33 | 27 |
# This enables a ui compatibility layer for the introduction of UUIDs in |
34 | 28 |
# identity management. WARNING: Setting to True will break your installation. |
35 | 29 |
# PITHOS_TRANSLATE_UUIDS = False |
... | ... | |
63 | 57 |
# Set domain to restrict requests of pithos object contents serve endpoint or |
64 | 58 |
# None for no domain restriction |
65 | 59 |
#PITHOS_UNSAFE_DOMAIN = None |
60 |
# |
|
61 |
#Archipelago Configuration File |
|
62 |
#PITHOS_BACKEND_ARCHIPELAGO_CONF = '/etc/archipelago/archipelago.conf' |
b/snf-pithos-app/pithos/api/settings.py | ||
---|---|---|
138 | 138 |
settings, 'PITHOS_BACKEND_BLOCK_PATH', '/tmp/pithos-data/') |
139 | 139 |
BACKEND_BLOCK_UMASK = getattr(settings, 'PITHOS_BACKEND_BLOCK_UMASK', 0o022) |
140 | 140 |
|
141 |
# Archipelago Configuration File |
|
142 |
BACKEND_ARCHIPELAGO_CONF = getattr( |
|
143 |
settings, 'PITHOS_BACKEND_ARCHIPELAGO_CONF', |
|
144 |
'/etc/archipelago/archipelago.conf') |
|
145 |
|
|
141 | 146 |
# Queue for billing. |
142 | 147 |
BACKEND_QUEUE_MODULE = getattr(settings, 'PITHOS_BACKEND_QUEUE_MODULE', None) |
143 | 148 |
# Example: 'pithos.backends.lib.rabbitmq' |
b/snf-pithos-backend/pithos/backends/__init__.py | ||
---|---|---|
33 | 33 |
|
34 | 34 |
import warnings |
35 | 35 |
|
36 |
from pithos.workers import glue |
|
37 |
from archipelago.common import Segment, Xseg_ctx |
|
38 |
from objpool import ObjectPool |
|
39 |
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF as cfile |
|
40 |
|
|
41 |
glue.WorkerGlue.setupXsegPool(ObjectPool, Segment, Xseg_ctx, cfile, |
|
42 |
pool_size=8) |
|
43 |
|
|
36 | 44 |
|
37 | 45 |
def connect_backend(**kwargs): |
38 | 46 |
from pithos.backends.modular import ModularBackend |
b/snf-pithos-backend/pithos/backends/lib/hashfiler/blocker.py | ||
---|---|---|
31 | 31 |
# interpreted as representing official policies, either expressed |
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 |
from fileblocker import FileBlocker |
|
35 |
|
|
36 |
|
|
37 |
def intersect(a, b): |
|
38 |
""" return the intersection of two lists """ |
|
39 |
return list(set(a) & set(b)) |
|
40 |
|
|
41 |
|
|
42 |
def union(a, b): |
|
43 |
""" return the union of two lists """ |
|
44 |
return list(set(a) | set(b)) |
|
34 |
from archipelagoblocker import ArchipelagoBlocker |
|
45 | 35 |
|
46 | 36 |
|
47 | 37 |
class Blocker(object): |
... | ... | |
51 | 41 |
""" |
52 | 42 |
|
53 | 43 |
def __init__(self, **params): |
54 |
self.rblocker = None |
|
55 |
try: |
|
56 |
if params['blockpool']: |
|
57 |
from radosblocker import RadosBlocker |
|
58 |
self.rblocker = RadosBlocker(**params) |
|
59 |
except KeyError: |
|
60 |
pass |
|
61 |
|
|
62 |
self.fblocker = FileBlocker(**params) |
|
63 |
self.hashlen = self.fblocker.hashlen |
|
44 |
self.archip_blocker = ArchipelagoBlocker(**params) |
|
45 |
self.hashlen = self.archip_blocker.hashlen |
|
64 | 46 |
self.blocksize = params['blocksize'] |
65 | 47 |
|
66 | 48 |
def block_hash(self, data): |
67 | 49 |
"""Hash a block of data""" |
68 |
return self.fblocker.block_hash(data)
|
|
50 |
return self.archip_blocker.block_hash(data)
|
|
69 | 51 |
|
70 | 52 |
def block_ping(self, hashes): |
71 | 53 |
"""Check hashes for existence and |
72 | 54 |
return those missing from block storage. |
73 | 55 |
""" |
74 |
r = [] |
|
75 |
if self.rblocker: |
|
76 |
r = self.rblocker.block_ping(hashes) |
|
77 |
f = self.fblocker.block_ping(hashes) |
|
78 |
return union(r, f) |
|
56 |
return self.archip_blocker.block_ping(hashes) |
|
79 | 57 |
|
80 | 58 |
def block_retr(self, hashes): |
81 | 59 |
"""Retrieve blocks from storage by their hashes.""" |
82 |
return self.fblocker.block_retr(hashes) |
|
60 |
return self.archip_blocker.block_retr(hashes) |
|
61 |
|
|
62 |
def block_retr_archipelago(self, hashes): |
|
63 |
"""Retrieve blocks from storage by theri hashes.""" |
|
64 |
return self.archip_blocker.block_retr_archipelago(hashes) |
|
83 | 65 |
|
84 | 66 |
def block_stor(self, blocklist): |
85 | 67 |
"""Store a bunch of blocks and return (hashes, missing). |
... | ... | |
87 | 69 |
missing is a list of indices in that list indicating |
88 | 70 |
which blocks were missing from the store. |
89 | 71 |
""" |
90 |
r_missing = [] |
|
91 |
(hashes, f_missing) = self.fblocker.block_stor(blocklist) |
|
92 |
if self.rblocker: |
|
93 |
(_, r_missing) = self.rblocker.block_stor(blocklist) |
|
94 |
return (hashes, union(r_missing, f_missing)) |
|
72 |
(hashes, missing) = self.archip_blocker.block_stor(blocklist) |
|
73 |
return (hashes, missing) |
|
95 | 74 |
|
96 | 75 |
def block_delta(self, blkhash, offset, data): |
97 | 76 |
"""Construct and store a new block from a given block |
... | ... | |
99 | 78 |
(the hash of the new block, if the block already existed) |
100 | 79 |
""" |
101 | 80 |
blocksize = self.blocksize |
102 |
r_hash = None |
|
103 |
r_existed = True |
|
104 |
(f_hash, f_existed) = self.fblocker.block_delta(blkhash, offset, data) |
|
105 |
if self.rblocker: |
|
106 |
(r_hash, r_existed) = self.rblocker.block_delta(blkhash, offset, |
|
107 |
data) |
|
108 |
if not r_hash and not f_hash: |
|
81 |
archip_hash = None |
|
82 |
archip_existed = True |
|
83 |
(archip_hash, archip_existed) = \ |
|
84 |
self.archip_blocker.block_delta(blkhash, offset, data) |
|
85 |
|
|
86 |
if not archip_hash: |
|
109 | 87 |
return None, None |
110 |
if self.rblocker and not r_hash: |
|
111 |
block = self.fblocker.block_retr((blkhash,)) |
|
88 |
|
|
89 |
if self.archip_blocker and not archip_hash: |
|
90 |
block = self.archip_blocker.block_retr((blkhash,)) |
|
112 | 91 |
if not block: |
113 | 92 |
return None, None |
114 | 93 |
block = block[0] |
... | ... | |
117 | 96 |
newblock = newblock[:blocksize] |
118 | 97 |
elif len(newblock) < blocksize: |
119 | 98 |
newblock += block[len(newblock):] |
120 |
r_hash, r_existed = self.rblocker.block_stor((newblock,))
|
|
99 |
archip_hash, archip_existed = self.rblocker.block_stor((newblock,))
|
|
121 | 100 |
|
122 |
return f_hash, 1 if r_existed and f_existed else 0 |
|
101 |
return archip_hash, 1 if archip_existed else 0 |
/dev/null | ||
---|---|---|
1 |
# Copyright 2011-2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from os import SEEK_CUR, SEEK_SET |
|
35 |
from errno import ENOENT, EROFS |
|
36 |
|
|
37 |
|
|
38 |
_zeros = '' |
|
39 |
|
|
40 |
|
|
41 |
def zeros(nr): |
|
42 |
global _zeros |
|
43 |
size = len(_zeros) |
|
44 |
if nr == size: |
|
45 |
return _zeros |
|
46 |
|
|
47 |
if nr > size: |
|
48 |
_zeros += '\0' * (nr - size) |
|
49 |
return _zeros |
|
50 |
|
|
51 |
if nr < size: |
|
52 |
_zeros = _zeros[:nr] |
|
53 |
return _zeros |
|
54 |
|
|
55 |
|
|
56 |
def file_sync_write_chunks(openfile, chunksize, offset, chunks, size=None): |
|
57 |
"""Write given chunks to the given buffered file object. |
|
58 |
Writes never span across chunk boundaries. |
|
59 |
If size is given stop after or pad until size bytes have been written. |
|
60 |
""" |
|
61 |
fwrite = openfile.write |
|
62 |
seek = openfile.seek |
|
63 |
padding = 0 |
|
64 |
|
|
65 |
try: |
|
66 |
seek(offset * chunksize) |
|
67 |
except IOError: |
|
68 |
seek = None |
|
69 |
for x in xrange(offset): |
|
70 |
fwrite(zeros(chunksize)) |
|
71 |
|
|
72 |
cursize = offset * chunksize |
|
73 |
|
|
74 |
for chunk in chunks: |
|
75 |
if padding: |
|
76 |
if seek: |
|
77 |
seek(padding - 1, SEEK_CUR) |
|
78 |
fwrite("\x00") |
|
79 |
else: |
|
80 |
fwrite(buffer(zeros(chunksize), 0, padding)) |
|
81 |
if size is not None and cursize + chunksize >= size: |
|
82 |
chunk = chunk[:chunksize - (cursize - size)] |
|
83 |
fwrite(chunk) |
|
84 |
cursize += len(chunk) |
|
85 |
break |
|
86 |
fwrite(chunk) |
|
87 |
padding = chunksize - len(chunk) |
|
88 |
|
|
89 |
padding = size - cursize if size is not None else 0 |
|
90 |
if padding <= 0: |
|
91 |
return |
|
92 |
|
|
93 |
q, r = divmod(padding, chunksize) |
|
94 |
for x in xrange(q): |
|
95 |
fwrite(zeros(chunksize)) |
|
96 |
fwrite(buffer(zeros(chunksize), 0, r)) |
|
97 |
|
|
98 |
|
|
99 |
def file_sync_read_chunks(openfile, chunksize, nr, offset=0): |
|
100 |
"""Read and yield groups of chunks from a buffered file object at offset. |
|
101 |
Reads never span accros chunksize boundaries. |
|
102 |
""" |
|
103 |
fread = openfile.read |
|
104 |
remains = offset * chunksize |
|
105 |
seek = openfile.seek |
|
106 |
try: |
|
107 |
seek(remains) |
|
108 |
except IOError: |
|
109 |
seek = None |
|
110 |
while 1: |
|
111 |
s = fread(remains) |
|
112 |
remains -= len(s) |
|
113 |
if remains <= 0: |
|
114 |
break |
|
115 |
|
|
116 |
while nr: |
|
117 |
remains = chunksize |
|
118 |
chunk = '' |
|
119 |
while 1: |
|
120 |
s = fread(remains) |
|
121 |
if not s: |
|
122 |
if chunk: |
|
123 |
yield chunk |
|
124 |
return |
|
125 |
chunk += s |
|
126 |
remains -= len(s) |
|
127 |
if remains <= 0: |
|
128 |
break |
|
129 |
yield chunk |
|
130 |
nr -= 1 |
|
131 |
|
|
132 |
|
|
133 |
class ContextFile(object): |
|
134 |
__slots__ = ("name", "fdesc", "create") |
|
135 |
|
|
136 |
def __init__(self, name, create=0): |
|
137 |
self.name = name |
|
138 |
self.fdesc = None |
|
139 |
self.create = create |
|
140 |
#self.dirty = 0 |
|
141 |
|
|
142 |
def __enter__(self): |
|
143 |
name = self.name |
|
144 |
try: |
|
145 |
fdesc = open(name, 'rb+') |
|
146 |
except IOError, e: |
|
147 |
if self.create and e.errno == ENOENT: |
|
148 |
fdesc = open(name, 'w+') |
|
149 |
elif not self.create and e.errno == EROFS: |
|
150 |
fdesc = open(name, 'rb') |
|
151 |
else: |
|
152 |
raise |
|
153 |
|
|
154 |
self.fdesc = fdesc |
|
155 |
return self |
|
156 |
|
|
157 |
def __exit__(self, exc, arg, trace): |
|
158 |
fdesc = self.fdesc |
|
159 |
if fdesc is not None: |
|
160 |
#if self.dirty: |
|
161 |
# fsync(fdesc.fileno()) |
|
162 |
fdesc.close() |
|
163 |
return False # propagate exceptions |
|
164 |
|
|
165 |
def seek(self, offset, whence=SEEK_SET): |
|
166 |
return self.fdesc.seek(offset, whence) |
|
167 |
|
|
168 |
def tell(self): |
|
169 |
return self.fdesc.tell() |
|
170 |
|
|
171 |
def truncate(self, size): |
|
172 |
self.fdesc.truncate(size) |
|
173 |
|
|
174 |
def sync_write(self, data): |
|
175 |
#self.dirty = 1 |
|
176 |
self.fdesc.write(data) |
|
177 |
|
|
178 |
def sync_write_chunks(self, chunksize, offset, chunks, size=None): |
|
179 |
#self.dirty = 1 |
|
180 |
return file_sync_write_chunks(self.fdesc, chunksize, offset, chunks, |
|
181 |
size) |
|
182 |
|
|
183 |
def sync_read(self, size): |
|
184 |
read = self.fdesc.read |
|
185 |
data = '' |
|
186 |
while 1: |
|
187 |
s = read(size) |
|
188 |
if not s: |
|
189 |
break |
|
190 |
data += s |
|
191 |
return data |
|
192 |
|
|
193 |
def sync_read_chunks(self, chunksize, nr, offset=0): |
|
194 |
return file_sync_read_chunks(self.fdesc, chunksize, nr, offset) |
/dev/null | ||
---|---|---|
1 |
# Copyright 2011-2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from os import SEEK_CUR, SEEK_SET |
|
35 |
from rados import ObjectNotFound |
|
36 |
|
|
37 |
_zeros = '' |
|
38 |
|
|
39 |
|
|
40 |
def zeros(nr): |
|
41 |
global _zeros |
|
42 |
size = len(_zeros) |
|
43 |
if nr == size: |
|
44 |
return _zeros |
|
45 |
|
|
46 |
if nr > size: |
|
47 |
_zeros += '\0' * (nr - size) |
|
48 |
return _zeros |
|
49 |
|
|
50 |
if nr < size: |
|
51 |
_zeros = _zeros[:nr] |
|
52 |
return _zeros |
|
53 |
|
|
54 |
|
|
55 |
def file_sync_write_chunks(radosobject, chunksize, offset, chunks, size=None): |
|
56 |
"""Write given chunks to the given buffered file object. |
|
57 |
Writes never span across chunk boundaries. |
|
58 |
If size is given stop after or pad until size bytes have been written. |
|
59 |
""" |
|
60 |
padding = 0 |
|
61 |
cursize = chunksize * offset |
|
62 |
radosobject.seek(cursize) |
|
63 |
for chunk in chunks: |
|
64 |
if padding: |
|
65 |
radosobject.sync_write(buffer(zeros(chunksize), 0, padding)) |
|
66 |
if size is not None and cursize + chunksize >= size: |
|
67 |
chunk = chunk[:chunksize - (cursize - size)] |
|
68 |
radosobject.sync_write(chunk) |
|
69 |
cursize += len(chunk) |
|
70 |
break |
|
71 |
radosobject.sync_write(chunk) |
|
72 |
padding = chunksize - len(chunk) |
|
73 |
|
|
74 |
padding = size - cursize if size is not None else 0 |
|
75 |
if padding <= 0: |
|
76 |
return |
|
77 |
|
|
78 |
q, r = divmod(padding, chunksize) |
|
79 |
for x in xrange(q): |
|
80 |
radosobject.sunc_write(zeros(chunksize)) |
|
81 |
radosobject.sync_write(buffer(zeros(chunksize), 0, r)) |
|
82 |
|
|
83 |
|
|
84 |
def file_sync_read_chunks(radosobject, chunksize, nr, offset=0): |
|
85 |
"""Read and yield groups of chunks from a buffered file object at offset. |
|
86 |
Reads never span accros chunksize boundaries. |
|
87 |
""" |
|
88 |
radosobject.seek(offset * chunksize) |
|
89 |
while nr: |
|
90 |
remains = chunksize |
|
91 |
chunk = '' |
|
92 |
while 1: |
|
93 |
s = radosobject.sync_read(remains) |
|
94 |
if not s: |
|
95 |
if chunk: |
|
96 |
yield chunk |
|
97 |
return |
|
98 |
chunk += s |
|
99 |
remains -= len(s) |
|
100 |
if remains <= 0: |
|
101 |
break |
|
102 |
yield chunk |
|
103 |
nr -= 1 |
|
104 |
|
|
105 |
|
|
106 |
class RadosObject(object): |
|
107 |
__slots__ = ("name", "ioctx", "create", "offset") |
|
108 |
|
|
109 |
def __init__(self, name, ioctx, create=0): |
|
110 |
self.name = name |
|
111 |
self.ioctx = ioctx |
|
112 |
self.create = create |
|
113 |
self.offset = 0 |
|
114 |
#self.dirty = 0 |
|
115 |
|
|
116 |
def __enter__(self): |
|
117 |
return self |
|
118 |
|
|
119 |
def __exit__(self, exc, arg, trace): |
|
120 |
return False |
|
121 |
|
|
122 |
def seek(self, offset, whence=SEEK_SET): |
|
123 |
if whence == SEEK_CUR: |
|
124 |
offset += self.offset |
|
125 |
self.offset = offset |
|
126 |
return offset |
|
127 |
|
|
128 |
def tell(self): |
|
129 |
return self.offset |
|
130 |
|
|
131 |
def truncate(self, size): |
|
132 |
self.ioctx.trunc(self.name, size) |
|
133 |
|
|
134 |
def sync_write(self, data): |
|
135 |
#self.dirty = 1 |
|
136 |
self.ioctx.write(self.name, data, self.offset) |
|
137 |
self.offset += len(data) |
|
138 |
|
|
139 |
def sync_write_chunks(self, chunksize, offset, chunks, size=None): |
|
140 |
#self.dirty = 1 |
|
141 |
return file_sync_write_chunks(self, chunksize, offset, chunks, size) |
|
142 |
|
|
143 |
def sync_read(self, size): |
|
144 |
read = self.ioctx.read |
|
145 |
data = '' |
|
146 |
datalen = 0 |
|
147 |
while 1: |
|
148 |
try: |
|
149 |
s = read(self.name, size - datalen, self.offset) |
|
150 |
except ObjectNotFound: |
|
151 |
s = None |
|
152 |
if not s: |
|
153 |
break |
|
154 |
data += s |
|
155 |
datalen += len(s) |
|
156 |
self.offset += len(s) |
|
157 |
if datalen >= size: |
|
158 |
break |
|
159 |
return data |
|
160 |
|
|
161 |
def sync_read_chunks(self, chunksize, nr, offset=0): |
|
162 |
return file_sync_read_chunks(self, chunksize, nr, offset) |
/dev/null | ||
---|---|---|
1 |
# Copyright 2011-2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from os import makedirs |
|
35 |
from os.path import isdir, realpath, exists, join |
|
36 |
from hashlib import new as newhasher |
|
37 |
from binascii import hexlify |
|
38 |
|
|
39 |
from context_file import ContextFile, file_sync_read_chunks |
|
40 |
|
|
41 |
|
|
42 |
class FileBlocker(object): |
|
43 |
"""Blocker. |
|
44 |
Required constructor parameters: blocksize, blockpath, hashtype. |
|
45 |
""" |
|
46 |
|
|
47 |
blocksize = None |
|
48 |
blockpath = None |
|
49 |
hashtype = None |
|
50 |
|
|
51 |
def __init__(self, **params): |
|
52 |
blocksize = params['blocksize'] |
|
53 |
blockpath = params['blockpath'] |
|
54 |
blockpath = realpath(blockpath) |
|
55 |
if not isdir(blockpath): |
|
56 |
if not exists(blockpath): |
|
57 |
makedirs(blockpath) |
|
58 |
else: |
|
59 |
raise ValueError("Variable blockpath '%s' is not a directory" % |
|
60 |
(blockpath,)) |
|
61 |
|
|
62 |
hashtype = params['hashtype'] |
|
63 |
try: |
|
64 |
hasher = newhasher(hashtype) |
|
65 |
except ValueError: |
|
66 |
msg = "Variable hashtype '%s' is not available from hashlib" |
|
67 |
raise ValueError(msg % (hashtype,)) |
|
68 |
|
|
69 |
hasher.update("") |
|
70 |
emptyhash = hasher.digest() |
|
71 |
|
|
72 |
self.blocksize = blocksize |
|
73 |
self.blockpath = blockpath |
|
74 |
self.hashtype = hashtype |
|
75 |
self.hashlen = len(emptyhash) |
|
76 |
self.emptyhash = emptyhash |
|
77 |
|
|
78 |
def _pad(self, block): |
|
79 |
return block + ('\x00' * (self.blocksize - len(block))) |
|
80 |
|
|
81 |
def _get_rear_block(self, blkhash, create=0): |
|
82 |
filename = hexlify(blkhash) |
|
83 |
dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6]) |
|
84 |
if not exists(dir): |
|
85 |
makedirs(dir) |
|
86 |
name = join(dir, filename) |
|
87 |
return ContextFile(name, create) |
|
88 |
|
|
89 |
def _check_rear_block(self, blkhash): |
|
90 |
filename = hexlify(blkhash) |
|
91 |
dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6]) |
|
92 |
name = join(dir, filename) |
|
93 |
return exists(name) |
|
94 |
|
|
95 |
def block_hash(self, data): |
|
96 |
"""Hash a block of data""" |
|
97 |
hasher = newhasher(self.hashtype) |
|
98 |
hasher.update(data.rstrip('\x00')) |
|
99 |
return hasher.digest() |
|
100 |
|
|
101 |
def block_ping(self, hashes): |
|
102 |
"""Check hashes for existence and |
|
103 |
return those missing from block storage. |
|
104 |
""" |
|
105 |
notfound = [] |
|
106 |
append = notfound.append |
|
107 |
|
|
108 |
for h in hashes: |
|
109 |
if h not in notfound and not self._check_rear_block(h): |
|
110 |
append(h) |
|
111 |
|
|
112 |
return notfound |
|
113 |
|
|
114 |
def block_retr(self, hashes): |
|
115 |
"""Retrieve blocks from storage by their hashes.""" |
|
116 |
blocksize = self.blocksize |
|
117 |
blocks = [] |
|
118 |
append = blocks.append |
|
119 |
block = None |
|
120 |
|
|
121 |
for h in hashes: |
|
122 |
if h == self.emptyhash: |
|
123 |
append(self._pad('')) |
|
124 |
continue |
|
125 |
with self._get_rear_block(h, 0) as rbl: |
|
126 |
if not rbl: |
|
127 |
break |
|
128 |
for block in rbl.sync_read_chunks(blocksize, 1, 0): |
|
129 |
break # there should be just one block there |
|
130 |
if not block: |
|
131 |
break |
|
132 |
append(self._pad(block)) |
|
133 |
|
|
134 |
return blocks |
|
135 |
|
|
136 |
def block_stor(self, blocklist): |
|
137 |
"""Store a bunch of blocks and return (hashes, missing). |
|
138 |
Hashes is a list of the hashes of the blocks, |
|
139 |
missing is a list of indices in that list indicating |
|
140 |
which blocks were missing from the store. |
|
141 |
""" |
|
142 |
block_hash = self.block_hash |
|
143 |
hashlist = [block_hash(b) for b in blocklist] |
|
144 |
missing = [i for i, h in enumerate(hashlist) if not |
|
145 |
self._check_rear_block(h)] |
|
146 |
for i in missing: |
|
147 |
with self._get_rear_block(hashlist[i], 1) as rbl: |
|
148 |
rbl.sync_write(blocklist[i]) # XXX: verify? |
|
149 |
|
|
150 |
return hashlist, missing |
|
151 |
|
|
152 |
def block_delta(self, blkhash, offset, data): |
|
153 |
"""Construct and store a new block from a given block |
|
154 |
and a data 'patch' applied at offset. Return: |
|
155 |
(the hash of the new block, if the block already existed) |
|
156 |
""" |
|
157 |
|
|
158 |
blocksize = self.blocksize |
|
159 |
if offset >= blocksize or not data: |
|
160 |
return None, None |
|
161 |
|
|
162 |
block = self.block_retr((blkhash,)) |
|
163 |
if not block: |
|
164 |
return None, None |
|
165 |
|
|
166 |
block = block[0] |
|
167 |
newblock = block[:offset] + data |
|
168 |
if len(newblock) > blocksize: |
|
169 |
newblock = newblock[:blocksize] |
|
170 |
elif len(newblock) < blocksize: |
|
171 |
newblock += block[len(newblock):] |
|
172 |
|
|
173 |
h, a = self.block_stor((newblock,)) |
|
174 |
return h[0], 1 if a else 0 |
|
175 |
|
|
176 |
def block_hash_file(self, openfile): |
|
177 |
"""Return the list of hashes (hashes map) |
|
178 |
for the blocks in a buffered file. |
|
179 |
Helper method, does not affect store. |
|
180 |
""" |
|
181 |
hashes = [] |
|
182 |
append = hashes.append |
|
183 |
block_hash = self.block_hash |
|
184 |
|
|
185 |
for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0): |
|
186 |
append(block_hash(block)) |
|
187 |
|
|
188 |
return hashes |
|
189 |
|
|
190 |
def block_stor_file(self, openfile): |
|
191 |
"""Read blocks from buffered file object and store them. Return: |
|
192 |
(bytes read, list of hashes, list of hashes that were missing) |
|
193 |
""" |
|
194 |
blocksize = self.blocksize |
|
195 |
block_stor = self.block_stor |
|
196 |
hashlist = [] |
|
197 |
hextend = hashlist.extend |
|
198 |
storedlist = [] |
|
199 |
sextend = storedlist.extend |
|
200 |
lastsize = 0 |
|
201 |
|
|
202 |
for block in file_sync_read_chunks(openfile, blocksize, 1, 0): |
|
203 |
hl, sl = block_stor((block,)) |
|
204 |
hextend(hl) |
|
205 |
sextend(sl) |
|
206 |
lastsize = len(block) |
|
207 |
|
|
208 |
size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0 |
|
209 |
return size, hashlist, storedlist |
/dev/null | ||
---|---|---|
1 |
# Copyright 2011-2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from os import makedirs |
|
35 |
from os.path import isdir, realpath, exists, join |
|
36 |
from binascii import hexlify |
|
37 |
|
|
38 |
from context_file import ContextFile |
|
39 |
|
|
40 |
|
|
41 |
class FileMapper(object): |
|
42 |
"""Mapper. |
|
43 |
Required constructor parameters: mappath, namelen. |
|
44 |
""" |
|
45 |
|
|
46 |
mappath = None |
|
47 |
namelen = None |
|
48 |
|
|
49 |
def __init__(self, **params): |
|
50 |
self.params = params |
|
51 |
self.namelen = params['namelen'] |
|
52 |
mappath = realpath(params['mappath']) |
|
53 |
if not isdir(mappath): |
|
54 |
if not exists(mappath): |
|
55 |
makedirs(mappath) |
|
56 |
else: |
|
57 |
raise ValueError("Variable mappath '%s' is not a directory" % |
|
58 |
(mappath,)) |
|
59 |
self.mappath = mappath |
|
60 |
|
|
61 |
def _get_rear_map(self, maphash, create=0): |
|
62 |
filename = hexlify(maphash) |
|
63 |
dir = join(self.mappath, filename[0:2], filename[2:4], filename[4:6]) |
|
64 |
if not exists(dir): |
|
65 |
makedirs(dir) |
|
66 |
name = join(dir, filename) |
|
67 |
return ContextFile(name, create) |
|
68 |
|
|
69 |
def _check_rear_map(self, maphash): |
|
70 |
filename = hexlify(maphash) |
|
71 |
dir = join(self.mappath, filename[0:2], filename[2:4], filename[4:6]) |
|
72 |
name = join(dir, filename) |
|
73 |
return exists(name) |
|
74 |
|
|
75 |
def map_retr(self, maphash, blkoff=0, nr=100000000000000): |
|
76 |
"""Return as a list, part of the hashes map of an object |
|
77 |
at the given block offset. |
|
78 |
By default, return the whole hashes map. |
|
79 |
""" |
|
80 |
namelen = self.namelen |
|
81 |
hashes = () |
|
82 |
|
|
83 |
with self._get_rear_map(maphash, 0) as rmap: |
|
84 |
if rmap: |
|
85 |
hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff)) |
|
86 |
return hashes |
|
87 |
|
|
88 |
def map_stor(self, maphash, hashes=(), blkoff=0, create=1): |
|
89 |
"""Store hashes in the given hashes map.""" |
|
90 |
namelen = self.namelen |
|
91 |
if self._check_rear_map(maphash): |
|
92 |
return |
|
93 |
with self._get_rear_map(maphash, 1) as rmap: |
|
94 |
rmap.sync_write_chunks(namelen, blkoff, hashes, None) |
b/snf-pithos-backend/pithos/backends/lib/hashfiler/mapper.py | ||
---|---|---|
31 | 31 |
# interpreted as representing official policies, either expressed |
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 |
from filemapper import FileMapper
|
|
34 |
from archipelagomapper import ArchipelagoMapper
|
|
35 | 35 |
|
36 | 36 |
|
37 | 37 |
class Mapper(object): |
... | ... | |
41 | 41 |
""" |
42 | 42 |
|
43 | 43 |
def __init__(self, **params): |
44 |
self.rmap = None |
|
45 |
try: |
|
46 |
if params['mappool']: |
|
47 |
from radosmapper import RadosMapper |
|
48 |
self.rmap = RadosMapper(**params) |
|
49 |
except KeyError: |
|
50 |
pass |
|
51 |
|
|
52 |
self.fmap = FileMapper(**params) |
|
44 |
self.archip_map = ArchipelagoMapper(**params) |
|
53 | 45 |
|
54 | 46 |
def map_retr(self, maphash, blkoff=0, nr=100000000000000): |
55 | 47 |
"""Return as a list, part of the hashes map of an object |
56 | 48 |
at the given block offset. |
57 | 49 |
By default, return the whole hashes map. |
58 | 50 |
""" |
59 |
return self.fmap.map_retr(maphash, blkoff, nr) |
|
51 |
return self.archip_map.map_retr(maphash, blkoff, nr) |
|
52 |
|
|
53 |
def map_retr_archipelago(self, maphash, size): |
|
54 |
"""Return as a list the hashes map of an Archipelago |
|
55 |
Volume. |
|
56 |
""" |
|
57 |
return self.archip_map.map_retr_archipelago(maphash, size) |
|
60 | 58 |
|
61 | 59 |
def map_stor(self, maphash, hashes=(), blkoff=0, create=1): |
62 | 60 |
"""Store hashes in the given hashes map.""" |
63 |
if self.rmap: |
|
64 |
self.rmap.map_stor(maphash, hashes, blkoff, create) |
|
65 |
self.fmap.map_stor(maphash, hashes, blkoff, create) |
|
61 |
self.archip_map.map_stor(maphash, hashes, blkoff, create) |
/dev/null | ||
---|---|---|
1 |
# Copyright 2011-2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from hashlib import new as newhasher |
|
35 |
from binascii import hexlify |
|
36 |
from rados import * |
|
37 |
|
|
38 |
from context_object import RadosObject, file_sync_read_chunks |
|
39 |
|
|
40 |
CEPH_CONF_FILE = "/etc/ceph/ceph.conf" |
|
41 |
|
|
42 |
|
|
43 |
class RadosBlocker(object): |
|
44 |
"""Blocker. |
|
45 |
Required constructor parameters: blocksize, blockpath, hashtype. |
|
46 |
""" |
|
47 |
|
|
48 |
blocksize = None |
|
49 |
blockpool = None |
|
50 |
hashtype = None |
|
51 |
|
|
52 |
def __init__(self, **params): |
|
53 |
blocksize = params['blocksize'] |
|
54 |
blockpool = params['blockpool'] |
|
55 |
|
|
56 |
rados = Rados(conffile=CEPH_CONF_FILE) |
|
57 |
rados.connect() |
|
58 |
ioctx = rados.open_ioctx(blockpool) |
|
59 |
|
|
60 |
hashtype = params['hashtype'] |
|
61 |
try: |
|
62 |
hasher = newhasher(hashtype) |
|
63 |
except ValueError: |
|
64 |
msg = "Variable hashtype '%s' is not available from hashlib" |
|
65 |
raise ValueError(msg % (hashtype,)) |
|
66 |
|
|
67 |
hasher.update("") |
|
68 |
emptyhash = hasher.digest() |
|
69 |
|
|
70 |
self.blocksize = blocksize |
|
71 |
self.blockpool = blockpool |
|
72 |
self.rados = rados |
|
73 |
self.ioctx = ioctx |
|
74 |
self.hashtype = hashtype |
|
75 |
self.hashlen = len(emptyhash) |
|
76 |
self.emptyhash = emptyhash |
|
77 |
|
|
78 |
def _pad(self, block): |
|
79 |
return block + ('\x00' * (self.blocksize - len(block))) |
|
80 |
|
|
81 |
def _get_rear_block(self, blkhash, create=0): |
|
82 |
name = hexlify(blkhash) |
|
83 |
return RadosObject(name, self.ioctx, create) |
|
84 |
|
|
85 |
def _check_rear_block(self, blkhash): |
|
86 |
filename = hexlify(blkhash) |
|
87 |
try: |
|
88 |
self.ioctx.stat(filename) |
|
89 |
return True |
|
90 |
except ObjectNotFound: |
|
91 |
return False |
|
92 |
|
|
93 |
def block_hash(self, data): |
|
94 |
"""Hash a block of data""" |
|
95 |
hasher = newhasher(self.hashtype) |
|
96 |
hasher.update(data.rstrip('\x00')) |
|
97 |
return hasher.digest() |
|
98 |
|
|
99 |
def block_ping(self, hashes): |
|
100 |
"""Check hashes for existence and |
|
101 |
return those missing from block storage. |
|
102 |
""" |
|
103 |
notfound = [] |
|
104 |
append = notfound.append |
|
105 |
|
|
106 |
for h in hashes: |
|
107 |
if h not in notfound and not self._check_rear_block(h): |
|
108 |
append(h) |
|
109 |
|
|
110 |
return notfound |
|
111 |
|
|
112 |
def block_retr(self, hashes): |
|
113 |
"""Retrieve blocks from storage by their hashes.""" |
|
114 |
blocksize = self.blocksize |
|
115 |
blocks = [] |
|
116 |
append = blocks.append |
|
117 |
block = None |
|
118 |
|
|
119 |
for h in hashes: |
|
120 |
if h == self.emptyhash: |
|
121 |
append(self._pad('')) |
|
122 |
continue |
|
123 |
with self._get_rear_block(h, 0) as rbl: |
|
124 |
if not rbl: |
|
125 |
break |
|
126 |
for block in rbl.sync_read_chunks(blocksize, 1, 0): |
|
127 |
break # there should be just one block there |
|
128 |
if not block: |
|
129 |
break |
|
130 |
append(self._pad(block)) |
|
131 |
|
|
132 |
return blocks |
|
133 |
|
|
134 |
def block_stor(self, blocklist): |
|
135 |
"""Store a bunch of blocks and return (hashes, missing). |
|
136 |
Hashes is a list of the hashes of the blocks, |
|
137 |
missing is a list of indices in that list indicating |
|
138 |
which blocks were missing from the store. |
|
139 |
""" |
|
140 |
block_hash = self.block_hash |
|
141 |
hashlist = [block_hash(b) for b in blocklist] |
|
142 |
missing = [i for i, h in enumerate(hashlist) if not |
|
143 |
self._check_rear_block(h)] |
|
144 |
for i in missing: |
|
145 |
with self._get_rear_block(hashlist[i], 1) as rbl: |
|
146 |
rbl.sync_write(blocklist[i]) # XXX: verify? |
|
147 |
|
|
148 |
return hashlist, missing |
|
149 |
|
|
150 |
def block_delta(self, blkhash, offset, data): |
|
151 |
"""Construct and store a new block from a given block |
|
152 |
and a data 'patch' applied at offset. Return: |
|
153 |
(the hash of the new block, if the block already existed) |
|
154 |
""" |
|
155 |
|
|
156 |
blocksize = self.blocksize |
|
157 |
if offset >= blocksize or not data: |
|
158 |
return None, None |
|
159 |
|
|
160 |
block = self.block_retr((blkhash,)) |
|
161 |
if not block: |
|
162 |
return None, None |
|
163 |
|
|
164 |
block = block[0] |
|
165 |
newblock = block[:offset] + data |
|
166 |
if len(newblock) > blocksize: |
|
167 |
newblock = newblock[:blocksize] |
|
168 |
elif len(newblock) < blocksize: |
|
169 |
newblock += block[len(newblock):] |
|
170 |
|
|
171 |
h, a = self.block_stor((newblock,)) |
|
172 |
return h[0], 1 if a else 0 |
|
173 |
|
|
174 |
def block_hash_file(self, radosobject): |
|
175 |
"""Return the list of hashes (hashes map) |
|
176 |
for the blocks in a buffered file. |
|
177 |
Helper method, does not affect store. |
|
178 |
""" |
|
179 |
hashes = [] |
|
180 |
append = hashes.append |
|
181 |
block_hash = self.block_hash |
|
182 |
|
|
183 |
for block in file_sync_read_chunks(radosobject, self.blocksize, 1, 0): |
|
184 |
append(block_hash(block)) |
|
185 |
|
|
186 |
return hashes |
|
187 |
|
|
188 |
def block_stor_file(self, radosobject): |
|
189 |
"""Read blocks from buffered file object and store them. Return: |
|
190 |
(bytes read, list of hashes, list of hashes that were missing) |
|
191 |
""" |
|
192 |
blocksize = self.blocksize |
|
193 |
block_stor = self.block_stor |
|
194 |
hashlist = [] |
|
195 |
hextend = hashlist.extend |
|
196 |
storedlist = [] |
|
197 |
sextend = storedlist.extend |
|
198 |
lastsize = 0 |
|
199 |
|
|
200 |
for block in file_sync_read_chunks(radosobject, blocksize, 1, 0): |
|
201 |
hl, sl = block_stor((block,)) |
|
202 |
hextend(hl) |
|
203 |
sextend(sl) |
|
204 |
lastsize = len(block) |
|
205 |
|
|
206 |
size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0 |
|
207 |
return size, hashlist, storedlist |
/dev/null | ||
---|---|---|
1 |
# Copyright 2011-2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from binascii import hexlify |
|
35 |
|
|
36 |
from context_object import RadosObject |
|
37 |
from rados import * |
|
38 |
|
|
39 |
CEPH_CONF_FILE = "/etc/ceph/ceph.conf" |
|
40 |
|
|
41 |
|
|
42 |
class RadosMapper(object): |
|
43 |
"""Mapper. |
|
44 |
Required constructor parameters: mappath, namelen. |
|
45 |
""" |
|
46 |
|
|
47 |
mappool = None |
|
48 |
namelen = None |
|
49 |
|
|
50 |
def __init__(self, **params): |
|
51 |
self.params = params |
|
52 |
self.namelen = params['namelen'] |
|
53 |
mappool = params['mappool'] |
|
54 |
|
|
55 |
rados = Rados(conffile=CEPH_CONF_FILE) |
|
56 |
rados.connect() |
|
57 |
ioctx = rados.open_ioctx(mappool) |
|
58 |
|
|
59 |
self.mappool = mappool |
|
60 |
self.rados = rados |
|
61 |
self.ioctx = ioctx |
|
62 |
|
|
63 |
def _get_rear_map(self, maphash, create=0): |
|
64 |
name = hexlify(maphash) |
|
65 |
return RadosObject(name, self.ioctx, create) |
|
66 |
|
|
67 |
def _check_rear_map(self, maphash): |
|
68 |
name = hexlify(maphash) |
|
69 |
try: |
|
70 |
self.ioctx.stat(name) |
|
71 |
return True |
|
72 |
except ObjectNotFound: |
|
73 |
return False |
|
74 |
|
|
75 |
def map_retr(self, maphash, blkoff=0, nr=100000000000000): |
|
76 |
"""Return as a list, part of the hashes map of an object |
|
77 |
at the given block offset. |
|
78 |
By default, return the whole hashes map. |
|
79 |
""" |
|
80 |
namelen = self.namelen |
|
81 |
hashes = () |
|
82 |
|
|
83 |
with self._get_rear_map(maphash, 0) as rmap: |
|
84 |
if rmap: |
|
85 |
hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff)) |
|
86 |
return hashes |
|
87 |
|
|
88 |
def map_stor(self, maphash, hashes=(), blkoff=0, create=1): |
|
89 |
"""Store hashes in the given hashes map.""" |
|
90 |
namelen = self.namelen |
|
91 |
if self._check_rear_map(maphash): |
|
92 |
return |
|
93 |
with self._get_rear_map(maphash, 1) as rmap: |
|
94 |
rmap.sync_write_chunks(namelen, blkoff, hashes, None) |
b/snf-pithos-backend/pithos/backends/lib/hashfiler/store.py | ||
---|---|---|
48 | 48 |
if umask is not None: |
49 | 49 |
os.umask(umask) |
50 | 50 |
|
51 |
path = params['path'] |
|
52 |
if path and not os.path.exists(path): |
|
53 |
os.makedirs(path) |
|
54 |
if not os.path.isdir(path): |
|
55 |
raise RuntimeError("Cannot open path '%s'" % (path,)) |
|
56 |
|
|
57 |
p = {'blocksize': params['block_size'], |
|
58 |
'blockpath': os.path.join(path + '/blocks'), |
|
51 |
pb = {'blocksize': params['block_size'], |
|
59 | 52 |
'hashtype': params['hash_algorithm'], |
60 |
'blockpool': params['blockpool']} |
|
61 |
self.blocker = Blocker(**p) |
|
62 |
p = {'mappath': os.path.join(path + '/maps'), |
|
63 |
'namelen': self.blocker.hashlen, |
|
64 |
'mappool': params['mappool']} |
|
65 |
self.mapper = Mapper(**p) |
|
53 |
} |
|
54 |
self.blocker = Blocker(**pb) |
|
55 |
pm = {'namelen': self.blocker.hashlen, |
|
56 |
} |
|
57 |
self.mapper = Mapper(**pm) |
|
66 | 58 |
|
67 | 59 |
def map_get(self, name): |
68 | 60 |
return self.mapper.map_retr(name) |
69 | 61 |
|
62 |
def map_get_archipelago(self, name, size): |
|
63 |
return self.mapper.map_retr_archipelago(name, size) |
|
64 |
|
|
70 | 65 |
def map_put(self, name, map): |
71 | 66 |
self.mapper.map_stor(name, map) |
72 | 67 |
|
... | ... | |
79 | 74 |
return None |
80 | 75 |
return blocks[0] |
81 | 76 |
|
77 |
def block_get_archipelago(self, hash): |
|
78 |
blocks = self.blocker.block_retr_archipelago((hash,)) |
|
79 |
if not blocks: |
|
80 |
return None |
|
81 |
return blocks[0] |
|
82 |
|
|
82 | 83 |
def block_put(self, data): |
83 | 84 |
hashes, absent = self.blocker.block_stor((data,)) |
84 | 85 |
return hashes[0] |
Also available in: Unified diff