Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagoblocker.py @ f3525003

History | View | Annotate | Download (8.6 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
import os, re
37

    
38
from context_archipelago import ArchipelagoObject, file_sync_read_chunks
39
from archipelago.common import (
40
    Request,
41
    xseg_reply_info,
42
    string_at,
43
    )
44

    
45
from pithos.workers import glue, monkey
46

    
47
monkey.patch_Request()
48

    
49
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF
50

    
51
class ArchipelagoBlocker(object):
52
    """Blocker.
53
       Required constructor parameters: blocksize, hashtype.
54
    """
55

    
56
    blocksize = None
57
    blockpool = None
58
    hashtype = None
59

    
60
    def __init__(self, **params):
61
        cfg = {}
62
        bcfg = open(BACKEND_ARCHIPELAGO_CONF).read()
63
        cfg['blockerb'] = re.search('\'blockerb_port\'\s*:\s*\d+',
64
                bcfg).group(0).split(':')[1]
65
        blocksize = params['blocksize']
66
        hashtype = params['hashtype']
67
        try:
68
            hasher = newhasher(hashtype)
69
        except ValueError:
70
            msg = "Variable hashtype '%s' is not available from hashlib"
71
            raise ValueError(msg % (hashtype,))
72

    
73
        hasher.update("")
74
        emptyhash = hasher.digest()
75

    
76
        self.blocksize = blocksize
77
        self.ioctx_pool = glue.WorkerGlue().ioctx_pool
78
        self.dst_port = int(cfg['blockerb'])
79
        self.hashtype = hashtype
80
        self.hashlen = len(emptyhash)
81
        self.emptyhash = emptyhash
82

    
83
    def _pad(self, block):
84
        return block + ('\x00' * (self.blocksize - len(block)))
85

    
86
    def _get_rear_block(self, blkhash, create=0):
87
        name = hexlify(blkhash)
88
        return ArchipelagoObject(name, self.ioctx_pool,self.dst_port,create)
89

    
90
    def _check_rear_block(self, blkhash):
91
        filename = hexlify(blkhash)
92
        ioctx = self.ioctx_pool.pool_get()
93
        req = Request.get_info_request(ioctx,self.dst_port,filename)
94
        req.submit()
95
        req.wait()
96
        ret = req.success()
97
        req.put()
98
        self.ioctx_pool.pool_put(ioctx)
99
        if ret:
100
            return True
101
        else:
102
            return False
103

    
104
    def block_hash(self, data):
105
        """Hash a block of data"""
106
        hasher = newhasher(self.hashtype)
107
        hasher.update(data.rstrip('\x00'))
108
        return hasher.digest()
109

    
110
    def block_ping(self, hashes):
111
        """Check hashes for existence and
112
           return those missing from block storage.
113
        """
114
        notfound = []
115
        append = notfound.append
116

    
117
        for h in hashes:
118
            if h not in notfound and not self._check_rear_block(h):
119
                append(h)
120

    
121
        return notfound
122

    
123
    def block_retr(self, hashes):
124
        """Retrieve blocks from storage by their hashes."""
125
        blocksize = self.blocksize
126
        blocks = []
127
        append = blocks.append
128
        block = None
129

    
130
        for h in hashes:
131
            if h == self.emptyhash:
132
                append(self._pad(''))
133
                continue
134
            with self._get_rear_block(h, 0) as rbl:
135
                if not rbl:
136
                    break
137
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
138
                    break  # there should be just one block there
139
            if not block:
140
                break
141
            append(self._pad(block))
142

    
143
        return blocks
144

    
145
    def block_retr_archipelago(self, hashes):
146
        """Retrieve blocks from storage by their hashes"""
147
        blocks = []
148
        append = blocks.append
149
        block = None
150

    
151
        ioctx = self.ioctx_pool.pool_get()
152
        archip_emptyhash = hexlify(self.emptyhash)
153

    
154
        for h in hashes:
155
            if h == archip_emptyhash:
156
                append(self._pad(''))
157
                continue
158
            req = Request.get_info_request(ioctx, self.dst_port, h)
159
            req.submit()
160
            req.wait()
161
            ret = req.success()
162
            if ret:
163
                info = req.get_data(_type=xseg_reply_info)
164
                size = info.contents.size
165
                req.put()
166
                req_data = Request.get_read_request(ioctx, self.dst_port, h,
167
                                                    size=size)
168
                req_data.submit()
169
                req_data.wait()
170
                ret_data = req_data.success()
171
                if ret_data:
172
                    append(self._pad(string_at(req_data.get_data(), size)))
173
                    req_data.put()
174
                else:
175
                    req_data.put()
176
                    self.ioctx_pool.put(ioctx)
177
                    raise Exception("Cannot retrieve Archipelago data.")
178
            else:
179
                req.put()
180
                self.ioctx_pool.pool_put(ioctx)
181
                raise Exception("Bad block file.")
182
        self.ioctx_pool.pool_put(ioctx)
183
        return blocks
184

    
185

    
186
    def block_stor(self, blocklist):
187
        """Store a bunch of blocks and return (hashes, missing).
188
           Hashes is a list of the hashes of the blocks,
189
           missing is a list of indices in that list indicating
190
           which blocks were missing from the store.
191
        """
192
        block_hash = self.block_hash
193
        hashlist = [block_hash(b) for b in blocklist]
194
        missing = [i for i, h in enumerate(hashlist) if not
195
                   self._check_rear_block(h)]
196
        for i in missing:
197
            with self._get_rear_block(hashlist[i], 1) as rbl:
198
                rbl.sync_write(blocklist[i])  # XXX: verify?
199

    
200
        return hashlist, missing
201

    
202
    def block_delta(self, blkhash, offset, data):
203
        """Construct and store a new block from a given block
204
           and a data 'patch' applied at offset. Return:
205
           (the hash of the new block, if the block already existed)
206
        """
207

    
208
        blocksize = self.blocksize
209
        if offset >= blocksize or not data:
210
            return None, None
211

    
212
        block = self.block_retr((blkhash,))
213
        if not block:
214
            return None, None
215

    
216
        block = block[0]
217
        newblock = block[:offset] + data
218
        if len(newblock) > blocksize:
219
            newblock = newblock[:blocksize]
220
        elif len(newblock) < blocksize:
221
            newblock += block[len(newblock):]
222

    
223
        h, a = self.block_stor((newblock,))
224
        return h[0], 1 if a else 0
225

    
226
    def block_hash_file(self, archipelagoobject):
227
        """Return the list of hashes (hashes map)
228
           for the blocks in a buffered file.
229
           Helper method, does not affect store.
230
        """
231
        hashes = []
232
        append = hashes.append
233
        block_hash = self.block_hash
234

    
235
        for block in file_sync_read_chunks(archipelagoobject, self.blocksize, 1, 0):
236
            append(block_hash(block))
237

    
238
        return hashes
239

    
240
    def block_stor_file(self, archipelagoobject):
241
        """Read blocks from buffered file object and store them. Return:
242
           (bytes read, list of hashes, list of hashes that were missing)
243
        """
244
        blocksize = self.blocksize
245
        block_stor = self.block_stor
246
        hashlist = []
247
        hextend = hashlist.extend
248
        storedlist = []
249
        sextend = storedlist.extend
250
        lastsize = 0
251

    
252
        for block in file_sync_read_chunks(archipelagoobject, blocksize, 1, 0):
253
            hl, sl = block_stor((block,))
254
            hextend(hl)
255
            sextend(sl)
256
            lastsize = len(block)
257

    
258
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
259
        return size, hashlist, storedlist