root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagoblocker.py @ b5636704
History | View | Annotate | Download (8.7 kB)
1 | f3525003 | Chrysostomos Nanakos | # Copyright 2013 GRNET S.A. All rights reserved.
|
---|---|---|---|
2 | f3525003 | Chrysostomos Nanakos | #
|
3 | f3525003 | Chrysostomos Nanakos | # Redistribution and use in source and binary forms, with or
|
4 | f3525003 | Chrysostomos Nanakos | # without modification, are permitted provided that the following
|
5 | f3525003 | Chrysostomos Nanakos | # conditions are met:
|
6 | f3525003 | Chrysostomos Nanakos | #
|
7 | f3525003 | Chrysostomos Nanakos | # 1. Redistributions of source code must retain the above
|
8 | f3525003 | Chrysostomos Nanakos | # copyright notice, this list of conditions and the following
|
9 | f3525003 | Chrysostomos Nanakos | # disclaimer.
|
10 | f3525003 | Chrysostomos Nanakos | #
|
11 | f3525003 | Chrysostomos Nanakos | # 2. Redistributions in binary form must reproduce the above
|
12 | f3525003 | Chrysostomos Nanakos | # copyright notice, this list of conditions and the following
|
13 | f3525003 | Chrysostomos Nanakos | # disclaimer in the documentation and/or other materials
|
14 | f3525003 | Chrysostomos Nanakos | # provided with the distribution.
|
15 | f3525003 | Chrysostomos Nanakos | #
|
16 | f3525003 | Chrysostomos Nanakos | # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 | f3525003 | Chrysostomos Nanakos | # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 | f3525003 | Chrysostomos Nanakos | # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 | f3525003 | Chrysostomos Nanakos | # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 | f3525003 | Chrysostomos Nanakos | # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 | f3525003 | Chrysostomos Nanakos | # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 | f3525003 | Chrysostomos Nanakos | # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 | f3525003 | Chrysostomos Nanakos | # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 | f3525003 | Chrysostomos Nanakos | # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 | f3525003 | Chrysostomos Nanakos | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 | f3525003 | Chrysostomos Nanakos | # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 | f3525003 | Chrysostomos Nanakos | # POSSIBILITY OF SUCH DAMAGE.
|
28 | f3525003 | Chrysostomos Nanakos | #
|
29 | f3525003 | Chrysostomos Nanakos | # The views and conclusions contained in the software and
|
30 | f3525003 | Chrysostomos Nanakos | # documentation are those of the authors and should not be
|
31 | f3525003 | Chrysostomos Nanakos | # interpreted as representing official policies, either expressed
|
32 | f3525003 | Chrysostomos Nanakos | # or implied, of GRNET S.A.
|
33 | f3525003 | Chrysostomos Nanakos | |
34 | f3525003 | Chrysostomos Nanakos | from hashlib import new as newhasher |
35 | f3525003 | Chrysostomos Nanakos | from binascii import hexlify |
36 | b5636704 | Chrysostomos Nanakos | import os |
37 | b5636704 | Chrysostomos Nanakos | import re |
38 | f3525003 | Chrysostomos Nanakos | |
39 | f3525003 | Chrysostomos Nanakos | from context_archipelago import ArchipelagoObject, file_sync_read_chunks |
40 | f3525003 | Chrysostomos Nanakos | from archipelago.common import ( |
41 | f3525003 | Chrysostomos Nanakos | Request, |
42 | f3525003 | Chrysostomos Nanakos | xseg_reply_info, |
43 | f3525003 | Chrysostomos Nanakos | string_at, |
44 | f3525003 | Chrysostomos Nanakos | ) |
45 | f3525003 | Chrysostomos Nanakos | |
46 | b5636704 | Chrysostomos Nanakos | from pithos.workers import ( |
47 | b5636704 | Chrysostomos Nanakos | glue, |
48 | b5636704 | Chrysostomos Nanakos | monkey, |
49 | b5636704 | Chrysostomos Nanakos | ) |
50 | f3525003 | Chrysostomos Nanakos | |
51 | f3525003 | Chrysostomos Nanakos | monkey.patch_Request() |
52 | f3525003 | Chrysostomos Nanakos | |
53 | f3525003 | Chrysostomos Nanakos | from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF |
54 | f3525003 | Chrysostomos Nanakos | |
55 | b5636704 | Chrysostomos Nanakos | |
56 | f3525003 | Chrysostomos Nanakos | class ArchipelagoBlocker(object): |
57 | f3525003 | Chrysostomos Nanakos | """Blocker.
|
58 | f3525003 | Chrysostomos Nanakos | Required constructor parameters: blocksize, hashtype.
|
59 | f3525003 | Chrysostomos Nanakos | """
|
60 | f3525003 | Chrysostomos Nanakos | |
61 | f3525003 | Chrysostomos Nanakos | blocksize = None
|
62 | f3525003 | Chrysostomos Nanakos | blockpool = None
|
63 | f3525003 | Chrysostomos Nanakos | hashtype = None
|
64 | f3525003 | Chrysostomos Nanakos | |
65 | f3525003 | Chrysostomos Nanakos | def __init__(self, **params): |
66 | f3525003 | Chrysostomos Nanakos | cfg = {} |
67 | f3525003 | Chrysostomos Nanakos | bcfg = open(BACKEND_ARCHIPELAGO_CONF).read()
|
68 | f3525003 | Chrysostomos Nanakos | cfg['blockerb'] = re.search('\'blockerb_port\'\s*:\s*\d+', |
69 | b5636704 | Chrysostomos Nanakos | bcfg).group(0).split(':')[1] |
70 | f3525003 | Chrysostomos Nanakos | blocksize = params['blocksize']
|
71 | f3525003 | Chrysostomos Nanakos | hashtype = params['hashtype']
|
72 | f3525003 | Chrysostomos Nanakos | try:
|
73 | f3525003 | Chrysostomos Nanakos | hasher = newhasher(hashtype) |
74 | f3525003 | Chrysostomos Nanakos | except ValueError: |
75 | f3525003 | Chrysostomos Nanakos | msg = "Variable hashtype '%s' is not available from hashlib"
|
76 | f3525003 | Chrysostomos Nanakos | raise ValueError(msg % (hashtype,)) |
77 | f3525003 | Chrysostomos Nanakos | |
78 | f3525003 | Chrysostomos Nanakos | hasher.update("")
|
79 | f3525003 | Chrysostomos Nanakos | emptyhash = hasher.digest() |
80 | f3525003 | Chrysostomos Nanakos | |
81 | f3525003 | Chrysostomos Nanakos | self.blocksize = blocksize
|
82 | f3525003 | Chrysostomos Nanakos | self.ioctx_pool = glue.WorkerGlue().ioctx_pool
|
83 | f3525003 | Chrysostomos Nanakos | self.dst_port = int(cfg['blockerb']) |
84 | f3525003 | Chrysostomos Nanakos | self.hashtype = hashtype
|
85 | f3525003 | Chrysostomos Nanakos | self.hashlen = len(emptyhash) |
86 | f3525003 | Chrysostomos Nanakos | self.emptyhash = emptyhash
|
87 | f3525003 | Chrysostomos Nanakos | |
88 | f3525003 | Chrysostomos Nanakos | def _pad(self, block): |
89 | f3525003 | Chrysostomos Nanakos | return block + ('\x00' * (self.blocksize - len(block))) |
90 | f3525003 | Chrysostomos Nanakos | |
91 | f3525003 | Chrysostomos Nanakos | def _get_rear_block(self, blkhash, create=0): |
92 | f3525003 | Chrysostomos Nanakos | name = hexlify(blkhash) |
93 | b5636704 | Chrysostomos Nanakos | return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create) |
94 | f3525003 | Chrysostomos Nanakos | |
95 | f3525003 | Chrysostomos Nanakos | def _check_rear_block(self, blkhash): |
96 | f3525003 | Chrysostomos Nanakos | filename = hexlify(blkhash) |
97 | f3525003 | Chrysostomos Nanakos | ioctx = self.ioctx_pool.pool_get()
|
98 | b5636704 | Chrysostomos Nanakos | req = Request.get_info_request(ioctx, self.dst_port, filename)
|
99 | f3525003 | Chrysostomos Nanakos | req.submit() |
100 | f3525003 | Chrysostomos Nanakos | req.wait() |
101 | f3525003 | Chrysostomos Nanakos | ret = req.success() |
102 | f3525003 | Chrysostomos Nanakos | req.put() |
103 | f3525003 | Chrysostomos Nanakos | self.ioctx_pool.pool_put(ioctx)
|
104 | f3525003 | Chrysostomos Nanakos | if ret:
|
105 | f3525003 | Chrysostomos Nanakos | return True |
106 | f3525003 | Chrysostomos Nanakos | else:
|
107 | f3525003 | Chrysostomos Nanakos | return False |
108 | f3525003 | Chrysostomos Nanakos | |
109 | f3525003 | Chrysostomos Nanakos | def block_hash(self, data): |
110 | f3525003 | Chrysostomos Nanakos | """Hash a block of data"""
|
111 | f3525003 | Chrysostomos Nanakos | hasher = newhasher(self.hashtype)
|
112 | f3525003 | Chrysostomos Nanakos | hasher.update(data.rstrip('\x00'))
|
113 | f3525003 | Chrysostomos Nanakos | return hasher.digest()
|
114 | f3525003 | Chrysostomos Nanakos | |
115 | f3525003 | Chrysostomos Nanakos | def block_ping(self, hashes): |
116 | f3525003 | Chrysostomos Nanakos | """Check hashes for existence and
|
117 | f3525003 | Chrysostomos Nanakos | return those missing from block storage.
|
118 | f3525003 | Chrysostomos Nanakos | """
|
119 | f3525003 | Chrysostomos Nanakos | notfound = [] |
120 | f3525003 | Chrysostomos Nanakos | append = notfound.append |
121 | f3525003 | Chrysostomos Nanakos | |
122 | f3525003 | Chrysostomos Nanakos | for h in hashes: |
123 | f3525003 | Chrysostomos Nanakos | if h not in notfound and not self._check_rear_block(h): |
124 | f3525003 | Chrysostomos Nanakos | append(h) |
125 | f3525003 | Chrysostomos Nanakos | |
126 | f3525003 | Chrysostomos Nanakos | return notfound
|
127 | f3525003 | Chrysostomos Nanakos | |
128 | f3525003 | Chrysostomos Nanakos | def block_retr(self, hashes): |
129 | f3525003 | Chrysostomos Nanakos | """Retrieve blocks from storage by their hashes."""
|
130 | f3525003 | Chrysostomos Nanakos | blocksize = self.blocksize
|
131 | f3525003 | Chrysostomos Nanakos | blocks = [] |
132 | f3525003 | Chrysostomos Nanakos | append = blocks.append |
133 | f3525003 | Chrysostomos Nanakos | block = None
|
134 | f3525003 | Chrysostomos Nanakos | |
135 | f3525003 | Chrysostomos Nanakos | for h in hashes: |
136 | f3525003 | Chrysostomos Nanakos | if h == self.emptyhash: |
137 | f3525003 | Chrysostomos Nanakos | append(self._pad('')) |
138 | f3525003 | Chrysostomos Nanakos | continue
|
139 | f3525003 | Chrysostomos Nanakos | with self._get_rear_block(h, 0) as rbl: |
140 | f3525003 | Chrysostomos Nanakos | if not rbl: |
141 | f3525003 | Chrysostomos Nanakos | break
|
142 | f3525003 | Chrysostomos Nanakos | for block in rbl.sync_read_chunks(blocksize, 1, 0): |
143 | f3525003 | Chrysostomos Nanakos | break # there should be just one block there |
144 | f3525003 | Chrysostomos Nanakos | if not block: |
145 | f3525003 | Chrysostomos Nanakos | break
|
146 | f3525003 | Chrysostomos Nanakos | append(self._pad(block))
|
147 | f3525003 | Chrysostomos Nanakos | |
148 | f3525003 | Chrysostomos Nanakos | return blocks
|
149 | f3525003 | Chrysostomos Nanakos | |
150 | f3525003 | Chrysostomos Nanakos | def block_retr_archipelago(self, hashes): |
151 | f3525003 | Chrysostomos Nanakos | """Retrieve blocks from storage by their hashes"""
|
152 | f3525003 | Chrysostomos Nanakos | blocks = [] |
153 | f3525003 | Chrysostomos Nanakos | append = blocks.append |
154 | f3525003 | Chrysostomos Nanakos | block = None
|
155 | f3525003 | Chrysostomos Nanakos | |
156 | f3525003 | Chrysostomos Nanakos | ioctx = self.ioctx_pool.pool_get()
|
157 | f3525003 | Chrysostomos Nanakos | archip_emptyhash = hexlify(self.emptyhash)
|
158 | f3525003 | Chrysostomos Nanakos | |
159 | f3525003 | Chrysostomos Nanakos | for h in hashes: |
160 | f3525003 | Chrysostomos Nanakos | if h == archip_emptyhash:
|
161 | f3525003 | Chrysostomos Nanakos | append(self._pad('')) |
162 | f3525003 | Chrysostomos Nanakos | continue
|
163 | f3525003 | Chrysostomos Nanakos | req = Request.get_info_request(ioctx, self.dst_port, h)
|
164 | f3525003 | Chrysostomos Nanakos | req.submit() |
165 | f3525003 | Chrysostomos Nanakos | req.wait() |
166 | f3525003 | Chrysostomos Nanakos | ret = req.success() |
167 | f3525003 | Chrysostomos Nanakos | if ret:
|
168 | f3525003 | Chrysostomos Nanakos | info = req.get_data(_type=xseg_reply_info) |
169 | f3525003 | Chrysostomos Nanakos | size = info.contents.size |
170 | f3525003 | Chrysostomos Nanakos | req.put() |
171 | f3525003 | Chrysostomos Nanakos | req_data = Request.get_read_request(ioctx, self.dst_port, h,
|
172 | f3525003 | Chrysostomos Nanakos | size=size) |
173 | f3525003 | Chrysostomos Nanakos | req_data.submit() |
174 | f3525003 | Chrysostomos Nanakos | req_data.wait() |
175 | f3525003 | Chrysostomos Nanakos | ret_data = req_data.success() |
176 | f3525003 | Chrysostomos Nanakos | if ret_data:
|
177 | f3525003 | Chrysostomos Nanakos | append(self._pad(string_at(req_data.get_data(), size)))
|
178 | f3525003 | Chrysostomos Nanakos | req_data.put() |
179 | f3525003 | Chrysostomos Nanakos | else:
|
180 | f3525003 | Chrysostomos Nanakos | req_data.put() |
181 | f3525003 | Chrysostomos Nanakos | self.ioctx_pool.put(ioctx)
|
182 | f3525003 | Chrysostomos Nanakos | raise Exception("Cannot retrieve Archipelago data.") |
183 | f3525003 | Chrysostomos Nanakos | else:
|
184 | f3525003 | Chrysostomos Nanakos | req.put() |
185 | f3525003 | Chrysostomos Nanakos | self.ioctx_pool.pool_put(ioctx)
|
186 | f3525003 | Chrysostomos Nanakos | raise Exception("Bad block file.") |
187 | f3525003 | Chrysostomos Nanakos | self.ioctx_pool.pool_put(ioctx)
|
188 | f3525003 | Chrysostomos Nanakos | return blocks
|
189 | f3525003 | Chrysostomos Nanakos | |
190 | f3525003 | Chrysostomos Nanakos | def block_stor(self, blocklist): |
191 | f3525003 | Chrysostomos Nanakos | """Store a bunch of blocks and return (hashes, missing).
|
192 | f3525003 | Chrysostomos Nanakos | Hashes is a list of the hashes of the blocks,
|
193 | f3525003 | Chrysostomos Nanakos | missing is a list of indices in that list indicating
|
194 | f3525003 | Chrysostomos Nanakos | which blocks were missing from the store.
|
195 | f3525003 | Chrysostomos Nanakos | """
|
196 | f3525003 | Chrysostomos Nanakos | block_hash = self.block_hash
|
197 | f3525003 | Chrysostomos Nanakos | hashlist = [block_hash(b) for b in blocklist] |
198 | f3525003 | Chrysostomos Nanakos | missing = [i for i, h in enumerate(hashlist) if not |
199 | f3525003 | Chrysostomos Nanakos | self._check_rear_block(h)]
|
200 | f3525003 | Chrysostomos Nanakos | for i in missing: |
201 | f3525003 | Chrysostomos Nanakos | with self._get_rear_block(hashlist[i], 1) as rbl: |
202 | f3525003 | Chrysostomos Nanakos | rbl.sync_write(blocklist[i]) # XXX: verify?
|
203 | f3525003 | Chrysostomos Nanakos | |
204 | f3525003 | Chrysostomos Nanakos | return hashlist, missing
|
205 | f3525003 | Chrysostomos Nanakos | |
206 | f3525003 | Chrysostomos Nanakos | def block_delta(self, blkhash, offset, data): |
207 | f3525003 | Chrysostomos Nanakos | """Construct and store a new block from a given block
|
208 | f3525003 | Chrysostomos Nanakos | and a data 'patch' applied at offset. Return:
|
209 | f3525003 | Chrysostomos Nanakos | (the hash of the new block, if the block already existed)
|
210 | f3525003 | Chrysostomos Nanakos | """
|
211 | f3525003 | Chrysostomos Nanakos | |
212 | f3525003 | Chrysostomos Nanakos | blocksize = self.blocksize
|
213 | f3525003 | Chrysostomos Nanakos | if offset >= blocksize or not data: |
214 | f3525003 | Chrysostomos Nanakos | return None, None |
215 | f3525003 | Chrysostomos Nanakos | |
216 | f3525003 | Chrysostomos Nanakos | block = self.block_retr((blkhash,))
|
217 | f3525003 | Chrysostomos Nanakos | if not block: |
218 | f3525003 | Chrysostomos Nanakos | return None, None |
219 | f3525003 | Chrysostomos Nanakos | |
220 | f3525003 | Chrysostomos Nanakos | block = block[0]
|
221 | f3525003 | Chrysostomos Nanakos | newblock = block[:offset] + data |
222 | f3525003 | Chrysostomos Nanakos | if len(newblock) > blocksize: |
223 | f3525003 | Chrysostomos Nanakos | newblock = newblock[:blocksize] |
224 | f3525003 | Chrysostomos Nanakos | elif len(newblock) < blocksize: |
225 | f3525003 | Chrysostomos Nanakos | newblock += block[len(newblock):]
|
226 | f3525003 | Chrysostomos Nanakos | |
227 | f3525003 | Chrysostomos Nanakos | h, a = self.block_stor((newblock,))
|
228 | f3525003 | Chrysostomos Nanakos | return h[0], 1 if a else 0 |
229 | f3525003 | Chrysostomos Nanakos | |
230 | f3525003 | Chrysostomos Nanakos | def block_hash_file(self, archipelagoobject): |
231 | f3525003 | Chrysostomos Nanakos | """Return the list of hashes (hashes map)
|
232 | f3525003 | Chrysostomos Nanakos | for the blocks in a buffered file.
|
233 | f3525003 | Chrysostomos Nanakos | Helper method, does not affect store.
|
234 | f3525003 | Chrysostomos Nanakos | """
|
235 | f3525003 | Chrysostomos Nanakos | hashes = [] |
236 | f3525003 | Chrysostomos Nanakos | append = hashes.append |
237 | f3525003 | Chrysostomos Nanakos | block_hash = self.block_hash
|
238 | f3525003 | Chrysostomos Nanakos | |
239 | b5636704 | Chrysostomos Nanakos | for block in file_sync_read_chunks(archipelagoobject, |
240 | b5636704 | Chrysostomos Nanakos | self.blocksize, 1, 0): |
241 | f3525003 | Chrysostomos Nanakos | append(block_hash(block)) |
242 | f3525003 | Chrysostomos Nanakos | |
243 | f3525003 | Chrysostomos Nanakos | return hashes
|
244 | f3525003 | Chrysostomos Nanakos | |
245 | f3525003 | Chrysostomos Nanakos | def block_stor_file(self, archipelagoobject): |
246 | f3525003 | Chrysostomos Nanakos | """Read blocks from buffered file object and store them. Return:
|
247 | f3525003 | Chrysostomos Nanakos | (bytes read, list of hashes, list of hashes that were missing)
|
248 | f3525003 | Chrysostomos Nanakos | """
|
249 | f3525003 | Chrysostomos Nanakos | blocksize = self.blocksize
|
250 | f3525003 | Chrysostomos Nanakos | block_stor = self.block_stor
|
251 | f3525003 | Chrysostomos Nanakos | hashlist = [] |
252 | f3525003 | Chrysostomos Nanakos | hextend = hashlist.extend |
253 | f3525003 | Chrysostomos Nanakos | storedlist = [] |
254 | f3525003 | Chrysostomos Nanakos | sextend = storedlist.extend |
255 | f3525003 | Chrysostomos Nanakos | lastsize = 0
|
256 | f3525003 | Chrysostomos Nanakos | |
257 | f3525003 | Chrysostomos Nanakos | for block in file_sync_read_chunks(archipelagoobject, blocksize, 1, 0): |
258 | f3525003 | Chrysostomos Nanakos | hl, sl = block_stor((block,)) |
259 | f3525003 | Chrysostomos Nanakos | hextend(hl) |
260 | f3525003 | Chrysostomos Nanakos | sextend(sl) |
261 | f3525003 | Chrysostomos Nanakos | lastsize = len(block)
|
262 | f3525003 | Chrysostomos Nanakos | |
263 | f3525003 | Chrysostomos Nanakos | size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0 |
264 | f3525003 | Chrysostomos Nanakos | return size, hashlist, storedlist |