Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagoblocker.py @ 3759eddb

History | View | Annotate | Download (8.7 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
import os
37
import re
38

    
39
from context_archipelago import ArchipelagoObject, file_sync_read_chunks
40
from archipelago.common import (
41
    Request,
42
    xseg_reply_info,
43
    string_at,
44
    )
45

    
46
from pithos.workers import (
47
    glue,
48
    monkey,
49
    )
50

    
51
monkey.patch_Request()
52

    
53

    
54
class ArchipelagoBlocker(object):
55
    """Blocker.
56
       Required constructor parameters: blocksize, hashtype.
57
    """
58

    
59
    blocksize = None
60
    blockpool = None
61
    hashtype = None
62

    
63
    def __init__(self, **params):
64
        cfg = {}
65
        bcfg = open(glue.WorkerGlue.ArchipelagoConfFile).read()
66
        cfg['blockerb'] = re.search('\'blockerb_port\'\s*:\s*\d+',
67
                                    bcfg).group(0).split(':')[1]
68
        blocksize = params['blocksize']
69
        hashtype = params['hashtype']
70
        try:
71
            hasher = newhasher(hashtype)
72
        except ValueError:
73
            msg = "Variable hashtype '%s' is not available from hashlib"
74
            raise ValueError(msg % (hashtype,))
75

    
76
        hasher.update("")
77
        emptyhash = hasher.digest()
78

    
79
        self.blocksize = blocksize
80
        self.ioctx_pool = glue.WorkerGlue().ioctx_pool
81
        self.dst_port = int(cfg['blockerb'])
82
        self.hashtype = hashtype
83
        self.hashlen = len(emptyhash)
84
        self.emptyhash = emptyhash
85

    
86
    def _pad(self, block):
87
        return block + ('\x00' * (self.blocksize - len(block)))
88

    
89
    def _get_rear_block(self, blkhash, create=0):
90
        name = hexlify(blkhash)
91
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
92

    
93
    def _check_rear_block(self, blkhash):
94
        filename = hexlify(blkhash)
95
        ioctx = self.ioctx_pool.pool_get()
96
        req = Request.get_info_request(ioctx, self.dst_port, filename)
97
        req.submit()
98
        req.wait()
99
        ret = req.success()
100
        req.put()
101
        self.ioctx_pool.pool_put(ioctx)
102
        if ret:
103
            return True
104
        else:
105
            return False
106

    
107
    def block_hash(self, data):
108
        """Hash a block of data"""
109
        hasher = newhasher(self.hashtype)
110
        hasher.update(data.rstrip('\x00'))
111
        return hasher.digest()
112

    
113
    def block_ping(self, hashes):
114
        """Check hashes for existence and
115
           return those missing from block storage.
116
        """
117
        notfound = []
118
        append = notfound.append
119

    
120
        for h in hashes:
121
            if h not in notfound and not self._check_rear_block(h):
122
                append(h)
123

    
124
        return notfound
125

    
126
    def block_retr(self, hashes):
127
        """Retrieve blocks from storage by their hashes."""
128
        blocksize = self.blocksize
129
        blocks = []
130
        append = blocks.append
131
        block = None
132

    
133
        for h in hashes:
134
            if h == self.emptyhash:
135
                append(self._pad(''))
136
                continue
137
            with self._get_rear_block(h, 0) as rbl:
138
                if not rbl:
139
                    break
140
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
141
                    break  # there should be just one block there
142
            if not block:
143
                break
144
            append(self._pad(block))
145

    
146
        return blocks
147

    
148
    def block_retr_archipelago(self, hashes):
149
        """Retrieve blocks from storage by their hashes"""
150
        blocks = []
151
        append = blocks.append
152
        block = None
153

    
154
        ioctx = self.ioctx_pool.pool_get()
155
        archip_emptyhash = hexlify(self.emptyhash)
156

    
157
        for h in hashes:
158
            if h == archip_emptyhash:
159
                append(self._pad(''))
160
                continue
161
            req = Request.get_info_request(ioctx, self.dst_port, h)
162
            req.submit()
163
            req.wait()
164
            ret = req.success()
165
            if ret:
166
                info = req.get_data(_type=xseg_reply_info)
167
                size = info.contents.size
168
                req.put()
169
                req_data = Request.get_read_request(ioctx, self.dst_port, h,
170
                                                    size=size)
171
                req_data.submit()
172
                req_data.wait()
173
                ret_data = req_data.success()
174
                if ret_data:
175
                    append(self._pad(string_at(req_data.get_data(), size)))
176
                    req_data.put()
177
                else:
178
                    req_data.put()
179
                    self.ioctx_pool.put(ioctx)
180
                    raise Exception("Cannot retrieve Archipelago data.")
181
            else:
182
                req.put()
183
                self.ioctx_pool.pool_put(ioctx)
184
                raise Exception("Bad block file.")
185
        self.ioctx_pool.pool_put(ioctx)
186
        return blocks
187

    
188
    def block_stor(self, blocklist):
189
        """Store a bunch of blocks and return (hashes, missing).
190
           Hashes is a list of the hashes of the blocks,
191
           missing is a list of indices in that list indicating
192
           which blocks were missing from the store.
193
        """
194
        block_hash = self.block_hash
195
        hashlist = [block_hash(b) for b in blocklist]
196
        missing = [i for i, h in enumerate(hashlist) if not
197
                   self._check_rear_block(h)]
198
        for i in missing:
199
            with self._get_rear_block(hashlist[i], 1) as rbl:
200
                rbl.sync_write(blocklist[i])  # XXX: verify?
201

    
202
        return hashlist, missing
203

    
204
    def block_delta(self, blkhash, offset, data):
205
        """Construct and store a new block from a given block
206
           and a data 'patch' applied at offset. Return:
207
           (the hash of the new block, if the block already existed)
208
        """
209

    
210
        blocksize = self.blocksize
211
        if offset >= blocksize or not data:
212
            return None, None
213

    
214
        block = self.block_retr((blkhash,))
215
        if not block:
216
            return None, None
217

    
218
        block = block[0]
219
        newblock = block[:offset] + data
220
        if len(newblock) > blocksize:
221
            newblock = newblock[:blocksize]
222
        elif len(newblock) < blocksize:
223
            newblock += block[len(newblock):]
224

    
225
        h, a = self.block_stor((newblock,))
226
        return h[0], 1 if a else 0
227

    
228
    def block_hash_file(self, archipelagoobject):
229
        """Return the list of hashes (hashes map)
230
           for the blocks in a buffered file.
231
           Helper method, does not affect store.
232
        """
233
        hashes = []
234
        append = hashes.append
235
        block_hash = self.block_hash
236

    
237
        for block in file_sync_read_chunks(archipelagoobject,
238
                                           self.blocksize, 1, 0):
239
            append(block_hash(block))
240

    
241
        return hashes
242

    
243
    def block_stor_file(self, archipelagoobject):
244
        """Read blocks from buffered file object and store them. Return:
245
           (bytes read, list of hashes, list of hashes that were missing)
246
        """
247
        blocksize = self.blocksize
248
        block_stor = self.block_stor
249
        hashlist = []
250
        hextend = hashlist.extend
251
        storedlist = []
252
        sextend = storedlist.extend
253
        lastsize = 0
254

    
255
        for block in file_sync_read_chunks(archipelagoobject, blocksize, 1, 0):
256
            hl, sl = block_stor((block,))
257
            hextend(hl)
258
            sextend(sl)
259
            lastsize = len(block)
260

    
261
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
262
        return size, hashlist, storedlist