Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagoblocker.py @ f75f40cb

History | View | Annotate | Download (8.7 kB)

1 32293ec0 Chrysostomos Nanakos
# Copyright 2013 GRNET S.A. All rights reserved.
2 32293ec0 Chrysostomos Nanakos
#
3 32293ec0 Chrysostomos Nanakos
# Redistribution and use in source and binary forms, with or
4 32293ec0 Chrysostomos Nanakos
# without modification, are permitted provided that the following
5 32293ec0 Chrysostomos Nanakos
# conditions are met:
6 32293ec0 Chrysostomos Nanakos
#
7 32293ec0 Chrysostomos Nanakos
#   1. Redistributions of source code must retain the above
8 32293ec0 Chrysostomos Nanakos
#      copyright notice, this list of conditions and the following
9 32293ec0 Chrysostomos Nanakos
#      disclaimer.
10 32293ec0 Chrysostomos Nanakos
#
11 32293ec0 Chrysostomos Nanakos
#   2. Redistributions in binary form must reproduce the above
12 32293ec0 Chrysostomos Nanakos
#      copyright notice, this list of conditions and the following
13 32293ec0 Chrysostomos Nanakos
#      disclaimer in the documentation and/or other materials
14 32293ec0 Chrysostomos Nanakos
#      provided with the distribution.
15 32293ec0 Chrysostomos Nanakos
#
16 32293ec0 Chrysostomos Nanakos
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 32293ec0 Chrysostomos Nanakos
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 32293ec0 Chrysostomos Nanakos
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 32293ec0 Chrysostomos Nanakos
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 32293ec0 Chrysostomos Nanakos
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 32293ec0 Chrysostomos Nanakos
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 32293ec0 Chrysostomos Nanakos
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 32293ec0 Chrysostomos Nanakos
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 32293ec0 Chrysostomos Nanakos
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 32293ec0 Chrysostomos Nanakos
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 32293ec0 Chrysostomos Nanakos
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 32293ec0 Chrysostomos Nanakos
# POSSIBILITY OF SUCH DAMAGE.
28 32293ec0 Chrysostomos Nanakos
#
29 32293ec0 Chrysostomos Nanakos
# The views and conclusions contained in the software and
30 32293ec0 Chrysostomos Nanakos
# documentation are those of the authors and should not be
31 32293ec0 Chrysostomos Nanakos
# interpreted as representing official policies, either expressed
32 32293ec0 Chrysostomos Nanakos
# or implied, of GRNET S.A.
33 32293ec0 Chrysostomos Nanakos
34 32293ec0 Chrysostomos Nanakos
from hashlib import new as newhasher
35 32293ec0 Chrysostomos Nanakos
from binascii import hexlify
36 f75f40cb Chrysostomos Nanakos
import os
37 f75f40cb Chrysostomos Nanakos
import re
38 32293ec0 Chrysostomos Nanakos
39 32293ec0 Chrysostomos Nanakos
from context_archipelago import ArchipelagoObject, file_sync_read_chunks
40 32293ec0 Chrysostomos Nanakos
from archipelago.common import (
41 32293ec0 Chrysostomos Nanakos
    Request,
42 32293ec0 Chrysostomos Nanakos
    xseg_reply_info,
43 32293ec0 Chrysostomos Nanakos
    string_at,
44 32293ec0 Chrysostomos Nanakos
    )
45 32293ec0 Chrysostomos Nanakos
46 f75f40cb Chrysostomos Nanakos
from pithos.workers import (
47 f75f40cb Chrysostomos Nanakos
    glue,
48 f75f40cb Chrysostomos Nanakos
    monkey,
49 f75f40cb Chrysostomos Nanakos
    )
50 32293ec0 Chrysostomos Nanakos
51 32293ec0 Chrysostomos Nanakos
monkey.patch_Request()
52 32293ec0 Chrysostomos Nanakos
53 32293ec0 Chrysostomos Nanakos
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF
54 32293ec0 Chrysostomos Nanakos
55 f75f40cb Chrysostomos Nanakos
56 32293ec0 Chrysostomos Nanakos
class ArchipelagoBlocker(object):
57 32293ec0 Chrysostomos Nanakos
    """Blocker.
58 32293ec0 Chrysostomos Nanakos
       Required constructor parameters: blocksize, hashtype.
59 32293ec0 Chrysostomos Nanakos
    """
60 32293ec0 Chrysostomos Nanakos
61 32293ec0 Chrysostomos Nanakos
    blocksize = None
62 32293ec0 Chrysostomos Nanakos
    blockpool = None
63 32293ec0 Chrysostomos Nanakos
    hashtype = None
64 32293ec0 Chrysostomos Nanakos
65 32293ec0 Chrysostomos Nanakos
    def __init__(self, **params):
66 32293ec0 Chrysostomos Nanakos
        cfg = {}
67 32293ec0 Chrysostomos Nanakos
        bcfg = open(BACKEND_ARCHIPELAGO_CONF).read()
68 32293ec0 Chrysostomos Nanakos
        cfg['blockerb'] = re.search('\'blockerb_port\'\s*:\s*\d+',
69 f75f40cb Chrysostomos Nanakos
                                    bcfg).group(0).split(':')[1]
70 32293ec0 Chrysostomos Nanakos
        blocksize = params['blocksize']
71 32293ec0 Chrysostomos Nanakos
        hashtype = params['hashtype']
72 32293ec0 Chrysostomos Nanakos
        try:
73 32293ec0 Chrysostomos Nanakos
            hasher = newhasher(hashtype)
74 32293ec0 Chrysostomos Nanakos
        except ValueError:
75 32293ec0 Chrysostomos Nanakos
            msg = "Variable hashtype '%s' is not available from hashlib"
76 32293ec0 Chrysostomos Nanakos
            raise ValueError(msg % (hashtype,))
77 32293ec0 Chrysostomos Nanakos
78 32293ec0 Chrysostomos Nanakos
        hasher.update("")
79 32293ec0 Chrysostomos Nanakos
        emptyhash = hasher.digest()
80 32293ec0 Chrysostomos Nanakos
81 32293ec0 Chrysostomos Nanakos
        self.blocksize = blocksize
82 32293ec0 Chrysostomos Nanakos
        self.ioctx_pool = glue.WorkerGlue().ioctx_pool
83 32293ec0 Chrysostomos Nanakos
        self.dst_port = int(cfg['blockerb'])
84 32293ec0 Chrysostomos Nanakos
        self.hashtype = hashtype
85 32293ec0 Chrysostomos Nanakos
        self.hashlen = len(emptyhash)
86 32293ec0 Chrysostomos Nanakos
        self.emptyhash = emptyhash
87 32293ec0 Chrysostomos Nanakos
88 32293ec0 Chrysostomos Nanakos
    def _pad(self, block):
89 32293ec0 Chrysostomos Nanakos
        return block + ('\x00' * (self.blocksize - len(block)))
90 32293ec0 Chrysostomos Nanakos
91 32293ec0 Chrysostomos Nanakos
    def _get_rear_block(self, blkhash, create=0):
92 32293ec0 Chrysostomos Nanakos
        name = hexlify(blkhash)
93 f75f40cb Chrysostomos Nanakos
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
94 32293ec0 Chrysostomos Nanakos
95 32293ec0 Chrysostomos Nanakos
    def _check_rear_block(self, blkhash):
96 32293ec0 Chrysostomos Nanakos
        filename = hexlify(blkhash)
97 32293ec0 Chrysostomos Nanakos
        ioctx = self.ioctx_pool.pool_get()
98 f75f40cb Chrysostomos Nanakos
        req = Request.get_info_request(ioctx, self.dst_port, filename)
99 32293ec0 Chrysostomos Nanakos
        req.submit()
100 32293ec0 Chrysostomos Nanakos
        req.wait()
101 32293ec0 Chrysostomos Nanakos
        ret = req.success()
102 32293ec0 Chrysostomos Nanakos
        req.put()
103 32293ec0 Chrysostomos Nanakos
        self.ioctx_pool.pool_put(ioctx)
104 32293ec0 Chrysostomos Nanakos
        if ret:
105 32293ec0 Chrysostomos Nanakos
            return True
106 32293ec0 Chrysostomos Nanakos
        else:
107 32293ec0 Chrysostomos Nanakos
            return False
108 32293ec0 Chrysostomos Nanakos
109 32293ec0 Chrysostomos Nanakos
    def block_hash(self, data):
110 32293ec0 Chrysostomos Nanakos
        """Hash a block of data"""
111 32293ec0 Chrysostomos Nanakos
        hasher = newhasher(self.hashtype)
112 32293ec0 Chrysostomos Nanakos
        hasher.update(data.rstrip('\x00'))
113 32293ec0 Chrysostomos Nanakos
        return hasher.digest()
114 32293ec0 Chrysostomos Nanakos
115 32293ec0 Chrysostomos Nanakos
    def block_ping(self, hashes):
116 32293ec0 Chrysostomos Nanakos
        """Check hashes for existence and
117 32293ec0 Chrysostomos Nanakos
           return those missing from block storage.
118 32293ec0 Chrysostomos Nanakos
        """
119 32293ec0 Chrysostomos Nanakos
        notfound = []
120 32293ec0 Chrysostomos Nanakos
        append = notfound.append
121 32293ec0 Chrysostomos Nanakos
122 32293ec0 Chrysostomos Nanakos
        for h in hashes:
123 32293ec0 Chrysostomos Nanakos
            if h not in notfound and not self._check_rear_block(h):
124 32293ec0 Chrysostomos Nanakos
                append(h)
125 32293ec0 Chrysostomos Nanakos
126 32293ec0 Chrysostomos Nanakos
        return notfound
127 32293ec0 Chrysostomos Nanakos
128 32293ec0 Chrysostomos Nanakos
    def block_retr(self, hashes):
129 32293ec0 Chrysostomos Nanakos
        """Retrieve blocks from storage by their hashes."""
130 32293ec0 Chrysostomos Nanakos
        blocksize = self.blocksize
131 32293ec0 Chrysostomos Nanakos
        blocks = []
132 32293ec0 Chrysostomos Nanakos
        append = blocks.append
133 32293ec0 Chrysostomos Nanakos
        block = None
134 32293ec0 Chrysostomos Nanakos
135 32293ec0 Chrysostomos Nanakos
        for h in hashes:
136 32293ec0 Chrysostomos Nanakos
            if h == self.emptyhash:
137 32293ec0 Chrysostomos Nanakos
                append(self._pad(''))
138 32293ec0 Chrysostomos Nanakos
                continue
139 32293ec0 Chrysostomos Nanakos
            with self._get_rear_block(h, 0) as rbl:
140 32293ec0 Chrysostomos Nanakos
                if not rbl:
141 32293ec0 Chrysostomos Nanakos
                    break
142 32293ec0 Chrysostomos Nanakos
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
143 32293ec0 Chrysostomos Nanakos
                    break  # there should be just one block there
144 32293ec0 Chrysostomos Nanakos
            if not block:
145 32293ec0 Chrysostomos Nanakos
                break
146 32293ec0 Chrysostomos Nanakos
            append(self._pad(block))
147 32293ec0 Chrysostomos Nanakos
148 32293ec0 Chrysostomos Nanakos
        return blocks
149 32293ec0 Chrysostomos Nanakos
150 32293ec0 Chrysostomos Nanakos
    def block_retr_archipelago(self, hashes):
151 32293ec0 Chrysostomos Nanakos
        """Retrieve blocks from storage by their hashes"""
152 32293ec0 Chrysostomos Nanakos
        blocks = []
153 32293ec0 Chrysostomos Nanakos
        append = blocks.append
154 32293ec0 Chrysostomos Nanakos
        block = None
155 32293ec0 Chrysostomos Nanakos
156 32293ec0 Chrysostomos Nanakos
        ioctx = self.ioctx_pool.pool_get()
157 32293ec0 Chrysostomos Nanakos
        archip_emptyhash = hexlify(self.emptyhash)
158 32293ec0 Chrysostomos Nanakos
159 32293ec0 Chrysostomos Nanakos
        for h in hashes:
160 32293ec0 Chrysostomos Nanakos
            if h == archip_emptyhash:
161 32293ec0 Chrysostomos Nanakos
                append(self._pad(''))
162 32293ec0 Chrysostomos Nanakos
                continue
163 32293ec0 Chrysostomos Nanakos
            req = Request.get_info_request(ioctx, self.dst_port, h)
164 32293ec0 Chrysostomos Nanakos
            req.submit()
165 32293ec0 Chrysostomos Nanakos
            req.wait()
166 32293ec0 Chrysostomos Nanakos
            ret = req.success()
167 32293ec0 Chrysostomos Nanakos
            if ret:
168 32293ec0 Chrysostomos Nanakos
                info = req.get_data(_type=xseg_reply_info)
169 32293ec0 Chrysostomos Nanakos
                size = info.contents.size
170 32293ec0 Chrysostomos Nanakos
                req.put()
171 32293ec0 Chrysostomos Nanakos
                req_data = Request.get_read_request(ioctx, self.dst_port, h,
172 32293ec0 Chrysostomos Nanakos
                                                    size=size)
173 32293ec0 Chrysostomos Nanakos
                req_data.submit()
174 32293ec0 Chrysostomos Nanakos
                req_data.wait()
175 32293ec0 Chrysostomos Nanakos
                ret_data = req_data.success()
176 32293ec0 Chrysostomos Nanakos
                if ret_data:
177 32293ec0 Chrysostomos Nanakos
                    append(self._pad(string_at(req_data.get_data(), size)))
178 32293ec0 Chrysostomos Nanakos
                    req_data.put()
179 32293ec0 Chrysostomos Nanakos
                else:
180 32293ec0 Chrysostomos Nanakos
                    req_data.put()
181 32293ec0 Chrysostomos Nanakos
                    self.ioctx_pool.put(ioctx)
182 32293ec0 Chrysostomos Nanakos
                    raise Exception("Cannot retrieve Archipelago data.")
183 32293ec0 Chrysostomos Nanakos
            else:
184 32293ec0 Chrysostomos Nanakos
                req.put()
185 32293ec0 Chrysostomos Nanakos
                self.ioctx_pool.pool_put(ioctx)
186 32293ec0 Chrysostomos Nanakos
                raise Exception("Bad block file.")
187 32293ec0 Chrysostomos Nanakos
        self.ioctx_pool.pool_put(ioctx)
188 32293ec0 Chrysostomos Nanakos
        return blocks
189 32293ec0 Chrysostomos Nanakos
190 32293ec0 Chrysostomos Nanakos
    def block_stor(self, blocklist):
191 32293ec0 Chrysostomos Nanakos
        """Store a bunch of blocks and return (hashes, missing).
192 32293ec0 Chrysostomos Nanakos
           Hashes is a list of the hashes of the blocks,
193 32293ec0 Chrysostomos Nanakos
           missing is a list of indices in that list indicating
194 32293ec0 Chrysostomos Nanakos
           which blocks were missing from the store.
195 32293ec0 Chrysostomos Nanakos
        """
196 32293ec0 Chrysostomos Nanakos
        block_hash = self.block_hash
197 32293ec0 Chrysostomos Nanakos
        hashlist = [block_hash(b) for b in blocklist]
198 32293ec0 Chrysostomos Nanakos
        missing = [i for i, h in enumerate(hashlist) if not
199 32293ec0 Chrysostomos Nanakos
                   self._check_rear_block(h)]
200 32293ec0 Chrysostomos Nanakos
        for i in missing:
201 32293ec0 Chrysostomos Nanakos
            with self._get_rear_block(hashlist[i], 1) as rbl:
202 32293ec0 Chrysostomos Nanakos
                rbl.sync_write(blocklist[i])  # XXX: verify?
203 32293ec0 Chrysostomos Nanakos
204 32293ec0 Chrysostomos Nanakos
        return hashlist, missing
205 32293ec0 Chrysostomos Nanakos
206 32293ec0 Chrysostomos Nanakos
    def block_delta(self, blkhash, offset, data):
207 32293ec0 Chrysostomos Nanakos
        """Construct and store a new block from a given block
208 32293ec0 Chrysostomos Nanakos
           and a data 'patch' applied at offset. Return:
209 32293ec0 Chrysostomos Nanakos
           (the hash of the new block, if the block already existed)
210 32293ec0 Chrysostomos Nanakos
        """
211 32293ec0 Chrysostomos Nanakos
212 32293ec0 Chrysostomos Nanakos
        blocksize = self.blocksize
213 32293ec0 Chrysostomos Nanakos
        if offset >= blocksize or not data:
214 32293ec0 Chrysostomos Nanakos
            return None, None
215 32293ec0 Chrysostomos Nanakos
216 32293ec0 Chrysostomos Nanakos
        block = self.block_retr((blkhash,))
217 32293ec0 Chrysostomos Nanakos
        if not block:
218 32293ec0 Chrysostomos Nanakos
            return None, None
219 32293ec0 Chrysostomos Nanakos
220 32293ec0 Chrysostomos Nanakos
        block = block[0]
221 32293ec0 Chrysostomos Nanakos
        newblock = block[:offset] + data
222 32293ec0 Chrysostomos Nanakos
        if len(newblock) > blocksize:
223 32293ec0 Chrysostomos Nanakos
            newblock = newblock[:blocksize]
224 32293ec0 Chrysostomos Nanakos
        elif len(newblock) < blocksize:
225 32293ec0 Chrysostomos Nanakos
            newblock += block[len(newblock):]
226 32293ec0 Chrysostomos Nanakos
227 32293ec0 Chrysostomos Nanakos
        h, a = self.block_stor((newblock,))
228 32293ec0 Chrysostomos Nanakos
        return h[0], 1 if a else 0
229 32293ec0 Chrysostomos Nanakos
230 32293ec0 Chrysostomos Nanakos
    def block_hash_file(self, archipelagoobject):
231 32293ec0 Chrysostomos Nanakos
        """Return the list of hashes (hashes map)
232 32293ec0 Chrysostomos Nanakos
           for the blocks in a buffered file.
233 32293ec0 Chrysostomos Nanakos
           Helper method, does not affect store.
234 32293ec0 Chrysostomos Nanakos
        """
235 32293ec0 Chrysostomos Nanakos
        hashes = []
236 32293ec0 Chrysostomos Nanakos
        append = hashes.append
237 32293ec0 Chrysostomos Nanakos
        block_hash = self.block_hash
238 32293ec0 Chrysostomos Nanakos
239 f75f40cb Chrysostomos Nanakos
        for block in file_sync_read_chunks(archipelagoobject,
240 f75f40cb Chrysostomos Nanakos
                                           self.blocksize, 1, 0):
241 32293ec0 Chrysostomos Nanakos
            append(block_hash(block))
242 32293ec0 Chrysostomos Nanakos
243 32293ec0 Chrysostomos Nanakos
        return hashes
244 32293ec0 Chrysostomos Nanakos
245 32293ec0 Chrysostomos Nanakos
    def block_stor_file(self, archipelagoobject):
246 32293ec0 Chrysostomos Nanakos
        """Read blocks from buffered file object and store them. Return:
247 32293ec0 Chrysostomos Nanakos
           (bytes read, list of hashes, list of hashes that were missing)
248 32293ec0 Chrysostomos Nanakos
        """
249 32293ec0 Chrysostomos Nanakos
        blocksize = self.blocksize
250 32293ec0 Chrysostomos Nanakos
        block_stor = self.block_stor
251 32293ec0 Chrysostomos Nanakos
        hashlist = []
252 32293ec0 Chrysostomos Nanakos
        hextend = hashlist.extend
253 32293ec0 Chrysostomos Nanakos
        storedlist = []
254 32293ec0 Chrysostomos Nanakos
        sextend = storedlist.extend
255 32293ec0 Chrysostomos Nanakos
        lastsize = 0
256 32293ec0 Chrysostomos Nanakos
257 32293ec0 Chrysostomos Nanakos
        for block in file_sync_read_chunks(archipelagoobject, blocksize, 1, 0):
258 32293ec0 Chrysostomos Nanakos
            hl, sl = block_stor((block,))
259 32293ec0 Chrysostomos Nanakos
            hextend(hl)
260 32293ec0 Chrysostomos Nanakos
            sextend(sl)
261 32293ec0 Chrysostomos Nanakos
            lastsize = len(block)
262 32293ec0 Chrysostomos Nanakos
263 32293ec0 Chrysostomos Nanakos
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
264 32293ec0 Chrysostomos Nanakos
        return size, hashlist, storedlist