Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagoblocker.py @ b5636704

History | View | Annotate | Download (8.7 kB)

1 f3525003 Chrysostomos Nanakos
# Copyright 2013 GRNET S.A. All rights reserved.
2 f3525003 Chrysostomos Nanakos
#
3 f3525003 Chrysostomos Nanakos
# Redistribution and use in source and binary forms, with or
4 f3525003 Chrysostomos Nanakos
# without modification, are permitted provided that the following
5 f3525003 Chrysostomos Nanakos
# conditions are met:
6 f3525003 Chrysostomos Nanakos
#
7 f3525003 Chrysostomos Nanakos
#   1. Redistributions of source code must retain the above
8 f3525003 Chrysostomos Nanakos
#      copyright notice, this list of conditions and the following
9 f3525003 Chrysostomos Nanakos
#      disclaimer.
10 f3525003 Chrysostomos Nanakos
#
11 f3525003 Chrysostomos Nanakos
#   2. Redistributions in binary form must reproduce the above
12 f3525003 Chrysostomos Nanakos
#      copyright notice, this list of conditions and the following
13 f3525003 Chrysostomos Nanakos
#      disclaimer in the documentation and/or other materials
14 f3525003 Chrysostomos Nanakos
#      provided with the distribution.
15 f3525003 Chrysostomos Nanakos
#
16 f3525003 Chrysostomos Nanakos
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 f3525003 Chrysostomos Nanakos
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 f3525003 Chrysostomos Nanakos
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 f3525003 Chrysostomos Nanakos
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 f3525003 Chrysostomos Nanakos
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 f3525003 Chrysostomos Nanakos
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 f3525003 Chrysostomos Nanakos
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 f3525003 Chrysostomos Nanakos
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 f3525003 Chrysostomos Nanakos
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 f3525003 Chrysostomos Nanakos
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 f3525003 Chrysostomos Nanakos
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 f3525003 Chrysostomos Nanakos
# POSSIBILITY OF SUCH DAMAGE.
28 f3525003 Chrysostomos Nanakos
#
29 f3525003 Chrysostomos Nanakos
# The views and conclusions contained in the software and
30 f3525003 Chrysostomos Nanakos
# documentation are those of the authors and should not be
31 f3525003 Chrysostomos Nanakos
# interpreted as representing official policies, either expressed
32 f3525003 Chrysostomos Nanakos
# or implied, of GRNET S.A.
33 f3525003 Chrysostomos Nanakos
34 f3525003 Chrysostomos Nanakos
from hashlib import new as newhasher
35 f3525003 Chrysostomos Nanakos
from binascii import hexlify
36 b5636704 Chrysostomos Nanakos
import os
37 b5636704 Chrysostomos Nanakos
import re
38 f3525003 Chrysostomos Nanakos
39 f3525003 Chrysostomos Nanakos
from context_archipelago import ArchipelagoObject, file_sync_read_chunks
40 f3525003 Chrysostomos Nanakos
from archipelago.common import (
41 f3525003 Chrysostomos Nanakos
    Request,
42 f3525003 Chrysostomos Nanakos
    xseg_reply_info,
43 f3525003 Chrysostomos Nanakos
    string_at,
44 f3525003 Chrysostomos Nanakos
    )
45 f3525003 Chrysostomos Nanakos
46 b5636704 Chrysostomos Nanakos
from pithos.workers import (
47 b5636704 Chrysostomos Nanakos
    glue,
48 b5636704 Chrysostomos Nanakos
    monkey,
49 b5636704 Chrysostomos Nanakos
    )
50 f3525003 Chrysostomos Nanakos
51 f3525003 Chrysostomos Nanakos
monkey.patch_Request()
52 f3525003 Chrysostomos Nanakos
53 f3525003 Chrysostomos Nanakos
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF
54 f3525003 Chrysostomos Nanakos
55 b5636704 Chrysostomos Nanakos
56 f3525003 Chrysostomos Nanakos
class ArchipelagoBlocker(object):
57 f3525003 Chrysostomos Nanakos
    """Blocker.
58 f3525003 Chrysostomos Nanakos
       Required constructor parameters: blocksize, hashtype.
59 f3525003 Chrysostomos Nanakos
    """
60 f3525003 Chrysostomos Nanakos
61 f3525003 Chrysostomos Nanakos
    blocksize = None
62 f3525003 Chrysostomos Nanakos
    blockpool = None
63 f3525003 Chrysostomos Nanakos
    hashtype = None
64 f3525003 Chrysostomos Nanakos
65 f3525003 Chrysostomos Nanakos
    def __init__(self, **params):
66 f3525003 Chrysostomos Nanakos
        cfg = {}
67 f3525003 Chrysostomos Nanakos
        bcfg = open(BACKEND_ARCHIPELAGO_CONF).read()
68 f3525003 Chrysostomos Nanakos
        cfg['blockerb'] = re.search('\'blockerb_port\'\s*:\s*\d+',
69 b5636704 Chrysostomos Nanakos
                                    bcfg).group(0).split(':')[1]
70 f3525003 Chrysostomos Nanakos
        blocksize = params['blocksize']
71 f3525003 Chrysostomos Nanakos
        hashtype = params['hashtype']
72 f3525003 Chrysostomos Nanakos
        try:
73 f3525003 Chrysostomos Nanakos
            hasher = newhasher(hashtype)
74 f3525003 Chrysostomos Nanakos
        except ValueError:
75 f3525003 Chrysostomos Nanakos
            msg = "Variable hashtype '%s' is not available from hashlib"
76 f3525003 Chrysostomos Nanakos
            raise ValueError(msg % (hashtype,))
77 f3525003 Chrysostomos Nanakos
78 f3525003 Chrysostomos Nanakos
        hasher.update("")
79 f3525003 Chrysostomos Nanakos
        emptyhash = hasher.digest()
80 f3525003 Chrysostomos Nanakos
81 f3525003 Chrysostomos Nanakos
        self.blocksize = blocksize
82 f3525003 Chrysostomos Nanakos
        self.ioctx_pool = glue.WorkerGlue().ioctx_pool
83 f3525003 Chrysostomos Nanakos
        self.dst_port = int(cfg['blockerb'])
84 f3525003 Chrysostomos Nanakos
        self.hashtype = hashtype
85 f3525003 Chrysostomos Nanakos
        self.hashlen = len(emptyhash)
86 f3525003 Chrysostomos Nanakos
        self.emptyhash = emptyhash
87 f3525003 Chrysostomos Nanakos
88 f3525003 Chrysostomos Nanakos
    def _pad(self, block):
89 f3525003 Chrysostomos Nanakos
        return block + ('\x00' * (self.blocksize - len(block)))
90 f3525003 Chrysostomos Nanakos
91 f3525003 Chrysostomos Nanakos
    def _get_rear_block(self, blkhash, create=0):
92 f3525003 Chrysostomos Nanakos
        name = hexlify(blkhash)
93 b5636704 Chrysostomos Nanakos
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
94 f3525003 Chrysostomos Nanakos
95 f3525003 Chrysostomos Nanakos
    def _check_rear_block(self, blkhash):
96 f3525003 Chrysostomos Nanakos
        filename = hexlify(blkhash)
97 f3525003 Chrysostomos Nanakos
        ioctx = self.ioctx_pool.pool_get()
98 b5636704 Chrysostomos Nanakos
        req = Request.get_info_request(ioctx, self.dst_port, filename)
99 f3525003 Chrysostomos Nanakos
        req.submit()
100 f3525003 Chrysostomos Nanakos
        req.wait()
101 f3525003 Chrysostomos Nanakos
        ret = req.success()
102 f3525003 Chrysostomos Nanakos
        req.put()
103 f3525003 Chrysostomos Nanakos
        self.ioctx_pool.pool_put(ioctx)
104 f3525003 Chrysostomos Nanakos
        if ret:
105 f3525003 Chrysostomos Nanakos
            return True
106 f3525003 Chrysostomos Nanakos
        else:
107 f3525003 Chrysostomos Nanakos
            return False
108 f3525003 Chrysostomos Nanakos
109 f3525003 Chrysostomos Nanakos
    def block_hash(self, data):
110 f3525003 Chrysostomos Nanakos
        """Hash a block of data"""
111 f3525003 Chrysostomos Nanakos
        hasher = newhasher(self.hashtype)
112 f3525003 Chrysostomos Nanakos
        hasher.update(data.rstrip('\x00'))
113 f3525003 Chrysostomos Nanakos
        return hasher.digest()
114 f3525003 Chrysostomos Nanakos
115 f3525003 Chrysostomos Nanakos
    def block_ping(self, hashes):
116 f3525003 Chrysostomos Nanakos
        """Check hashes for existence and
117 f3525003 Chrysostomos Nanakos
           return those missing from block storage.
118 f3525003 Chrysostomos Nanakos
        """
119 f3525003 Chrysostomos Nanakos
        notfound = []
120 f3525003 Chrysostomos Nanakos
        append = notfound.append
121 f3525003 Chrysostomos Nanakos
122 f3525003 Chrysostomos Nanakos
        for h in hashes:
123 f3525003 Chrysostomos Nanakos
            if h not in notfound and not self._check_rear_block(h):
124 f3525003 Chrysostomos Nanakos
                append(h)
125 f3525003 Chrysostomos Nanakos
126 f3525003 Chrysostomos Nanakos
        return notfound
127 f3525003 Chrysostomos Nanakos
128 f3525003 Chrysostomos Nanakos
    def block_retr(self, hashes):
129 f3525003 Chrysostomos Nanakos
        """Retrieve blocks from storage by their hashes."""
130 f3525003 Chrysostomos Nanakos
        blocksize = self.blocksize
131 f3525003 Chrysostomos Nanakos
        blocks = []
132 f3525003 Chrysostomos Nanakos
        append = blocks.append
133 f3525003 Chrysostomos Nanakos
        block = None
134 f3525003 Chrysostomos Nanakos
135 f3525003 Chrysostomos Nanakos
        for h in hashes:
136 f3525003 Chrysostomos Nanakos
            if h == self.emptyhash:
137 f3525003 Chrysostomos Nanakos
                append(self._pad(''))
138 f3525003 Chrysostomos Nanakos
                continue
139 f3525003 Chrysostomos Nanakos
            with self._get_rear_block(h, 0) as rbl:
140 f3525003 Chrysostomos Nanakos
                if not rbl:
141 f3525003 Chrysostomos Nanakos
                    break
142 f3525003 Chrysostomos Nanakos
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
143 f3525003 Chrysostomos Nanakos
                    break  # there should be just one block there
144 f3525003 Chrysostomos Nanakos
            if not block:
145 f3525003 Chrysostomos Nanakos
                break
146 f3525003 Chrysostomos Nanakos
            append(self._pad(block))
147 f3525003 Chrysostomos Nanakos
148 f3525003 Chrysostomos Nanakos
        return blocks
149 f3525003 Chrysostomos Nanakos
150 f3525003 Chrysostomos Nanakos
    def block_retr_archipelago(self, hashes):
151 f3525003 Chrysostomos Nanakos
        """Retrieve blocks from storage by their hashes"""
152 f3525003 Chrysostomos Nanakos
        blocks = []
153 f3525003 Chrysostomos Nanakos
        append = blocks.append
154 f3525003 Chrysostomos Nanakos
        block = None
155 f3525003 Chrysostomos Nanakos
156 f3525003 Chrysostomos Nanakos
        ioctx = self.ioctx_pool.pool_get()
157 f3525003 Chrysostomos Nanakos
        archip_emptyhash = hexlify(self.emptyhash)
158 f3525003 Chrysostomos Nanakos
159 f3525003 Chrysostomos Nanakos
        for h in hashes:
160 f3525003 Chrysostomos Nanakos
            if h == archip_emptyhash:
161 f3525003 Chrysostomos Nanakos
                append(self._pad(''))
162 f3525003 Chrysostomos Nanakos
                continue
163 f3525003 Chrysostomos Nanakos
            req = Request.get_info_request(ioctx, self.dst_port, h)
164 f3525003 Chrysostomos Nanakos
            req.submit()
165 f3525003 Chrysostomos Nanakos
            req.wait()
166 f3525003 Chrysostomos Nanakos
            ret = req.success()
167 f3525003 Chrysostomos Nanakos
            if ret:
168 f3525003 Chrysostomos Nanakos
                info = req.get_data(_type=xseg_reply_info)
169 f3525003 Chrysostomos Nanakos
                size = info.contents.size
170 f3525003 Chrysostomos Nanakos
                req.put()
171 f3525003 Chrysostomos Nanakos
                req_data = Request.get_read_request(ioctx, self.dst_port, h,
172 f3525003 Chrysostomos Nanakos
                                                    size=size)
173 f3525003 Chrysostomos Nanakos
                req_data.submit()
174 f3525003 Chrysostomos Nanakos
                req_data.wait()
175 f3525003 Chrysostomos Nanakos
                ret_data = req_data.success()
176 f3525003 Chrysostomos Nanakos
                if ret_data:
177 f3525003 Chrysostomos Nanakos
                    append(self._pad(string_at(req_data.get_data(), size)))
178 f3525003 Chrysostomos Nanakos
                    req_data.put()
179 f3525003 Chrysostomos Nanakos
                else:
180 f3525003 Chrysostomos Nanakos
                    req_data.put()
181 f3525003 Chrysostomos Nanakos
                    self.ioctx_pool.put(ioctx)
182 f3525003 Chrysostomos Nanakos
                    raise Exception("Cannot retrieve Archipelago data.")
183 f3525003 Chrysostomos Nanakos
            else:
184 f3525003 Chrysostomos Nanakos
                req.put()
185 f3525003 Chrysostomos Nanakos
                self.ioctx_pool.pool_put(ioctx)
186 f3525003 Chrysostomos Nanakos
                raise Exception("Bad block file.")
187 f3525003 Chrysostomos Nanakos
        self.ioctx_pool.pool_put(ioctx)
188 f3525003 Chrysostomos Nanakos
        return blocks
189 f3525003 Chrysostomos Nanakos
190 f3525003 Chrysostomos Nanakos
    def block_stor(self, blocklist):
191 f3525003 Chrysostomos Nanakos
        """Store a bunch of blocks and return (hashes, missing).
192 f3525003 Chrysostomos Nanakos
           Hashes is a list of the hashes of the blocks,
193 f3525003 Chrysostomos Nanakos
           missing is a list of indices in that list indicating
194 f3525003 Chrysostomos Nanakos
           which blocks were missing from the store.
195 f3525003 Chrysostomos Nanakos
        """
196 f3525003 Chrysostomos Nanakos
        block_hash = self.block_hash
197 f3525003 Chrysostomos Nanakos
        hashlist = [block_hash(b) for b in blocklist]
198 f3525003 Chrysostomos Nanakos
        missing = [i for i, h in enumerate(hashlist) if not
199 f3525003 Chrysostomos Nanakos
                   self._check_rear_block(h)]
200 f3525003 Chrysostomos Nanakos
        for i in missing:
201 f3525003 Chrysostomos Nanakos
            with self._get_rear_block(hashlist[i], 1) as rbl:
202 f3525003 Chrysostomos Nanakos
                rbl.sync_write(blocklist[i])  # XXX: verify?
203 f3525003 Chrysostomos Nanakos
204 f3525003 Chrysostomos Nanakos
        return hashlist, missing
205 f3525003 Chrysostomos Nanakos
206 f3525003 Chrysostomos Nanakos
    def block_delta(self, blkhash, offset, data):
207 f3525003 Chrysostomos Nanakos
        """Construct and store a new block from a given block
208 f3525003 Chrysostomos Nanakos
           and a data 'patch' applied at offset. Return:
209 f3525003 Chrysostomos Nanakos
           (the hash of the new block, if the block already existed)
210 f3525003 Chrysostomos Nanakos
        """
211 f3525003 Chrysostomos Nanakos
212 f3525003 Chrysostomos Nanakos
        blocksize = self.blocksize
213 f3525003 Chrysostomos Nanakos
        if offset >= blocksize or not data:
214 f3525003 Chrysostomos Nanakos
            return None, None
215 f3525003 Chrysostomos Nanakos
216 f3525003 Chrysostomos Nanakos
        block = self.block_retr((blkhash,))
217 f3525003 Chrysostomos Nanakos
        if not block:
218 f3525003 Chrysostomos Nanakos
            return None, None
219 f3525003 Chrysostomos Nanakos
220 f3525003 Chrysostomos Nanakos
        block = block[0]
221 f3525003 Chrysostomos Nanakos
        newblock = block[:offset] + data
222 f3525003 Chrysostomos Nanakos
        if len(newblock) > blocksize:
223 f3525003 Chrysostomos Nanakos
            newblock = newblock[:blocksize]
224 f3525003 Chrysostomos Nanakos
        elif len(newblock) < blocksize:
225 f3525003 Chrysostomos Nanakos
            newblock += block[len(newblock):]
226 f3525003 Chrysostomos Nanakos
227 f3525003 Chrysostomos Nanakos
        h, a = self.block_stor((newblock,))
228 f3525003 Chrysostomos Nanakos
        return h[0], 1 if a else 0
229 f3525003 Chrysostomos Nanakos
230 f3525003 Chrysostomos Nanakos
    def block_hash_file(self, archipelagoobject):
231 f3525003 Chrysostomos Nanakos
        """Return the list of hashes (hashes map)
232 f3525003 Chrysostomos Nanakos
           for the blocks in a buffered file.
233 f3525003 Chrysostomos Nanakos
           Helper method, does not affect store.
234 f3525003 Chrysostomos Nanakos
        """
235 f3525003 Chrysostomos Nanakos
        hashes = []
236 f3525003 Chrysostomos Nanakos
        append = hashes.append
237 f3525003 Chrysostomos Nanakos
        block_hash = self.block_hash
238 f3525003 Chrysostomos Nanakos
239 b5636704 Chrysostomos Nanakos
        for block in file_sync_read_chunks(archipelagoobject,
240 b5636704 Chrysostomos Nanakos
                                           self.blocksize, 1, 0):
241 f3525003 Chrysostomos Nanakos
            append(block_hash(block))
242 f3525003 Chrysostomos Nanakos
243 f3525003 Chrysostomos Nanakos
        return hashes
244 f3525003 Chrysostomos Nanakos
245 f3525003 Chrysostomos Nanakos
    def block_stor_file(self, archipelagoobject):
246 f3525003 Chrysostomos Nanakos
        """Read blocks from buffered file object and store them. Return:
247 f3525003 Chrysostomos Nanakos
           (bytes read, list of hashes, list of hashes that were missing)
248 f3525003 Chrysostomos Nanakos
        """
249 f3525003 Chrysostomos Nanakos
        blocksize = self.blocksize
250 f3525003 Chrysostomos Nanakos
        block_stor = self.block_stor
251 f3525003 Chrysostomos Nanakos
        hashlist = []
252 f3525003 Chrysostomos Nanakos
        hextend = hashlist.extend
253 f3525003 Chrysostomos Nanakos
        storedlist = []
254 f3525003 Chrysostomos Nanakos
        sextend = storedlist.extend
255 f3525003 Chrysostomos Nanakos
        lastsize = 0
256 f3525003 Chrysostomos Nanakos
257 f3525003 Chrysostomos Nanakos
        for block in file_sync_read_chunks(archipelagoobject, blocksize, 1, 0):
258 f3525003 Chrysostomos Nanakos
            hl, sl = block_stor((block,))
259 f3525003 Chrysostomos Nanakos
            hextend(hl)
260 f3525003 Chrysostomos Nanakos
            sextend(sl)
261 f3525003 Chrysostomos Nanakos
            lastsize = len(block)
262 f3525003 Chrysostomos Nanakos
263 f3525003 Chrysostomos Nanakos
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
264 f3525003 Chrysostomos Nanakos
        return size, hashlist, storedlist