Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagoblocker.py @ f75f40cb

History | View | Annotate | Download (8.7 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
import os
37
import re
38

    
39
from context_archipelago import ArchipelagoObject, file_sync_read_chunks
40
from archipelago.common import (
41
    Request,
42
    xseg_reply_info,
43
    string_at,
44
    )
45

    
46
from pithos.workers import (
47
    glue,
48
    monkey,
49
    )
50

    
51
monkey.patch_Request()
52

    
53
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF
54

    
55

    
56
class ArchipelagoBlocker(object):
57
    """Blocker.
58
       Required constructor parameters: blocksize, hashtype.
59
    """
60

    
61
    blocksize = None
62
    blockpool = None
63
    hashtype = None
64

    
65
    def __init__(self, **params):
66
        cfg = {}
67
        bcfg = open(BACKEND_ARCHIPELAGO_CONF).read()
68
        cfg['blockerb'] = re.search('\'blockerb_port\'\s*:\s*\d+',
69
                                    bcfg).group(0).split(':')[1]
70
        blocksize = params['blocksize']
71
        hashtype = params['hashtype']
72
        try:
73
            hasher = newhasher(hashtype)
74
        except ValueError:
75
            msg = "Variable hashtype '%s' is not available from hashlib"
76
            raise ValueError(msg % (hashtype,))
77

    
78
        hasher.update("")
79
        emptyhash = hasher.digest()
80

    
81
        self.blocksize = blocksize
82
        self.ioctx_pool = glue.WorkerGlue().ioctx_pool
83
        self.dst_port = int(cfg['blockerb'])
84
        self.hashtype = hashtype
85
        self.hashlen = len(emptyhash)
86
        self.emptyhash = emptyhash
87

    
88
    def _pad(self, block):
89
        return block + ('\x00' * (self.blocksize - len(block)))
90

    
91
    def _get_rear_block(self, blkhash, create=0):
92
        name = hexlify(blkhash)
93
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
94

    
95
    def _check_rear_block(self, blkhash):
96
        filename = hexlify(blkhash)
97
        ioctx = self.ioctx_pool.pool_get()
98
        req = Request.get_info_request(ioctx, self.dst_port, filename)
99
        req.submit()
100
        req.wait()
101
        ret = req.success()
102
        req.put()
103
        self.ioctx_pool.pool_put(ioctx)
104
        if ret:
105
            return True
106
        else:
107
            return False
108

    
109
    def block_hash(self, data):
110
        """Hash a block of data"""
111
        hasher = newhasher(self.hashtype)
112
        hasher.update(data.rstrip('\x00'))
113
        return hasher.digest()
114

    
115
    def block_ping(self, hashes):
116
        """Check hashes for existence and
117
           return those missing from block storage.
118
        """
119
        notfound = []
120
        append = notfound.append
121

    
122
        for h in hashes:
123
            if h not in notfound and not self._check_rear_block(h):
124
                append(h)
125

    
126
        return notfound
127

    
128
    def block_retr(self, hashes):
129
        """Retrieve blocks from storage by their hashes."""
130
        blocksize = self.blocksize
131
        blocks = []
132
        append = blocks.append
133
        block = None
134

    
135
        for h in hashes:
136
            if h == self.emptyhash:
137
                append(self._pad(''))
138
                continue
139
            with self._get_rear_block(h, 0) as rbl:
140
                if not rbl:
141
                    break
142
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
143
                    break  # there should be just one block there
144
            if not block:
145
                break
146
            append(self._pad(block))
147

    
148
        return blocks
149

    
150
    def block_retr_archipelago(self, hashes):
151
        """Retrieve blocks from storage by their hashes"""
152
        blocks = []
153
        append = blocks.append
154
        block = None
155

    
156
        ioctx = self.ioctx_pool.pool_get()
157
        archip_emptyhash = hexlify(self.emptyhash)
158

    
159
        for h in hashes:
160
            if h == archip_emptyhash:
161
                append(self._pad(''))
162
                continue
163
            req = Request.get_info_request(ioctx, self.dst_port, h)
164
            req.submit()
165
            req.wait()
166
            ret = req.success()
167
            if ret:
168
                info = req.get_data(_type=xseg_reply_info)
169
                size = info.contents.size
170
                req.put()
171
                req_data = Request.get_read_request(ioctx, self.dst_port, h,
172
                                                    size=size)
173
                req_data.submit()
174
                req_data.wait()
175
                ret_data = req_data.success()
176
                if ret_data:
177
                    append(self._pad(string_at(req_data.get_data(), size)))
178
                    req_data.put()
179
                else:
180
                    req_data.put()
181
                    self.ioctx_pool.put(ioctx)
182
                    raise Exception("Cannot retrieve Archipelago data.")
183
            else:
184
                req.put()
185
                self.ioctx_pool.pool_put(ioctx)
186
                raise Exception("Bad block file.")
187
        self.ioctx_pool.pool_put(ioctx)
188
        return blocks
189

    
190
    def block_stor(self, blocklist):
191
        """Store a bunch of blocks and return (hashes, missing).
192
           Hashes is a list of the hashes of the blocks,
193
           missing is a list of indices in that list indicating
194
           which blocks were missing from the store.
195
        """
196
        block_hash = self.block_hash
197
        hashlist = [block_hash(b) for b in blocklist]
198
        missing = [i for i, h in enumerate(hashlist) if not
199
                   self._check_rear_block(h)]
200
        for i in missing:
201
            with self._get_rear_block(hashlist[i], 1) as rbl:
202
                rbl.sync_write(blocklist[i])  # XXX: verify?
203

    
204
        return hashlist, missing
205

    
206
    def block_delta(self, blkhash, offset, data):
207
        """Construct and store a new block from a given block
208
           and a data 'patch' applied at offset. Return:
209
           (the hash of the new block, if the block already existed)
210
        """
211

    
212
        blocksize = self.blocksize
213
        if offset >= blocksize or not data:
214
            return None, None
215

    
216
        block = self.block_retr((blkhash,))
217
        if not block:
218
            return None, None
219

    
220
        block = block[0]
221
        newblock = block[:offset] + data
222
        if len(newblock) > blocksize:
223
            newblock = newblock[:blocksize]
224
        elif len(newblock) < blocksize:
225
            newblock += block[len(newblock):]
226

    
227
        h, a = self.block_stor((newblock,))
228
        return h[0], 1 if a else 0
229

    
230
    def block_hash_file(self, archipelagoobject):
231
        """Return the list of hashes (hashes map)
232
           for the blocks in a buffered file.
233
           Helper method, does not affect store.
234
        """
235
        hashes = []
236
        append = hashes.append
237
        block_hash = self.block_hash
238

    
239
        for block in file_sync_read_chunks(archipelagoobject,
240
                                           self.blocksize, 1, 0):
241
            append(block_hash(block))
242

    
243
        return hashes
244

    
245
    def block_stor_file(self, archipelagoobject):
246
        """Read blocks from buffered file object and store them. Return:
247
           (bytes read, list of hashes, list of hashes that were missing)
248
        """
249
        blocksize = self.blocksize
250
        block_stor = self.block_stor
251
        hashlist = []
252
        hextend = hashlist.extend
253
        storedlist = []
254
        sextend = storedlist.extend
255
        lastsize = 0
256

    
257
        for block in file_sync_read_chunks(archipelagoobject, blocksize, 1, 0):
258
            hl, sl = block_stor((block,))
259
            hextend(hl)
260
            sextend(sl)
261
            lastsize = len(block)
262

    
263
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
264
        return size, hashlist, storedlist