Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagoblocker.py @ dc7159be

History | View | Annotate | Download (8.7 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
import os
37
import re
38
import ConfigParser
39

    
40
from context_archipelago import ArchipelagoObject, file_sync_read_chunks
41
from archipelago.common import (
42
    Request,
43
    xseg_reply_info,
44
    string_at,
45
    )
46

    
47
from pithos.workers import (
48
    glue,
49
    monkey,
50
    )
51

    
52
monkey.patch_Request()
53

    
54

    
55
class ArchipelagoBlocker(object):
56
    """Blocker.
57
       Required constructor parameters: blocksize, hashtype.
58
    """
59

    
60
    blocksize = None
61
    blockpool = None
62
    hashtype = None
63

    
64
    def __init__(self, **params):
65
        cfg = {}
66
        bcfg = ConfigParser.ConfigParser()
67
        bcfg.readfp(open(glue.WorkerGlue.ArchipelagoConfFile))
68
        cfg['blockerb'] = bcfg.getint('mapperd','blockerb_port')
69
        blocksize = params['blocksize']
70
        hashtype = params['hashtype']
71
        try:
72
            hasher = newhasher(hashtype)
73
        except ValueError:
74
            msg = "Variable hashtype '%s' is not available from hashlib"
75
            raise ValueError(msg % (hashtype,))
76

    
77
        hasher.update("")
78
        emptyhash = hasher.digest()
79

    
80
        self.blocksize = blocksize
81
        self.ioctx_pool = glue.WorkerGlue().ioctx_pool
82
        self.dst_port = int(cfg['blockerb'])
83
        self.hashtype = hashtype
84
        self.hashlen = len(emptyhash)
85
        self.emptyhash = emptyhash
86

    
87
    def _pad(self, block):
88
        return block + ('\x00' * (self.blocksize - len(block)))
89

    
90
    def _get_rear_block(self, blkhash, create=0):
91
        name = hexlify(blkhash)
92
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
93

    
94
    def _check_rear_block(self, blkhash):
95
        filename = hexlify(blkhash)
96
        ioctx = self.ioctx_pool.pool_get()
97
        req = Request.get_info_request(ioctx, self.dst_port, filename)
98
        req.submit()
99
        req.wait()
100
        ret = req.success()
101
        req.put()
102
        self.ioctx_pool.pool_put(ioctx)
103
        if ret:
104
            return True
105
        else:
106
            return False
107

    
108
    def block_hash(self, data):
109
        """Hash a block of data"""
110
        hasher = newhasher(self.hashtype)
111
        hasher.update(data.rstrip('\x00'))
112
        return hasher.digest()
113

    
114
    def block_ping(self, hashes):
115
        """Check hashes for existence and
116
           return those missing from block storage.
117
        """
118
        notfound = []
119
        append = notfound.append
120

    
121
        for h in hashes:
122
            if h not in notfound and not self._check_rear_block(h):
123
                append(h)
124

    
125
        return notfound
126

    
127
    def block_retr(self, hashes):
128
        """Retrieve blocks from storage by their hashes."""
129
        blocksize = self.blocksize
130
        blocks = []
131
        append = blocks.append
132
        block = None
133

    
134
        for h in hashes:
135
            if h == self.emptyhash:
136
                append(self._pad(''))
137
                continue
138
            with self._get_rear_block(h, 0) as rbl:
139
                if not rbl:
140
                    break
141
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
142
                    break  # there should be just one block there
143
            if not block:
144
                break
145
            append(self._pad(block))
146

    
147
        return blocks
148

    
149
    def block_retr_archipelago(self, hashes):
150
        """Retrieve blocks from storage by their hashes"""
151
        blocks = []
152
        append = blocks.append
153
        block = None
154

    
155
        ioctx = self.ioctx_pool.pool_get()
156
        archip_emptyhash = hexlify(self.emptyhash)
157

    
158
        for h in hashes:
159
            if h == archip_emptyhash:
160
                append(self._pad(''))
161
                continue
162
            req = Request.get_info_request(ioctx, self.dst_port, h)
163
            req.submit()
164
            req.wait()
165
            ret = req.success()
166
            if ret:
167
                info = req.get_data(_type=xseg_reply_info)
168
                size = info.contents.size
169
                req.put()
170
                req_data = Request.get_read_request(ioctx, self.dst_port, h,
171
                                                    size=size)
172
                req_data.submit()
173
                req_data.wait()
174
                ret_data = req_data.success()
175
                if ret_data:
176
                    append(self._pad(string_at(req_data.get_data(), size)))
177
                    req_data.put()
178
                else:
179
                    req_data.put()
180
                    self.ioctx_pool.put(ioctx)
181
                    raise Exception("Cannot retrieve Archipelago data.")
182
            else:
183
                req.put()
184
                self.ioctx_pool.pool_put(ioctx)
185
                raise Exception("Bad block file.")
186
        self.ioctx_pool.pool_put(ioctx)
187
        return blocks
188

    
189
    def block_stor(self, blocklist):
190
        """Store a bunch of blocks and return (hashes, missing).
191
           Hashes is a list of the hashes of the blocks,
192
           missing is a list of indices in that list indicating
193
           which blocks were missing from the store.
194
        """
195
        block_hash = self.block_hash
196
        hashlist = [block_hash(b) for b in blocklist]
197
        missing = [i for i, h in enumerate(hashlist) if not
198
                   self._check_rear_block(h)]
199
        for i in missing:
200
            with self._get_rear_block(hashlist[i], 1) as rbl:
201
                rbl.sync_write(blocklist[i])  # XXX: verify?
202

    
203
        return hashlist, missing
204

    
205
    def block_delta(self, blkhash, offset, data):
206
        """Construct and store a new block from a given block
207
           and a data 'patch' applied at offset. Return:
208
           (the hash of the new block, if the block already existed)
209
        """
210

    
211
        blocksize = self.blocksize
212
        if offset >= blocksize or not data:
213
            return None, None
214

    
215
        block = self.block_retr((blkhash,))
216
        if not block:
217
            return None, None
218

    
219
        block = block[0]
220
        newblock = block[:offset] + data
221
        if len(newblock) > blocksize:
222
            newblock = newblock[:blocksize]
223
        elif len(newblock) < blocksize:
224
            newblock += block[len(newblock):]
225

    
226
        h, a = self.block_stor((newblock,))
227
        return h[0], 1 if a else 0
228

    
229
    def block_hash_file(self, archipelagoobject):
230
        """Return the list of hashes (hashes map)
231
           for the blocks in a buffered file.
232
           Helper method, does not affect store.
233
        """
234
        hashes = []
235
        append = hashes.append
236
        block_hash = self.block_hash
237

    
238
        for block in file_sync_read_chunks(archipelagoobject,
239
                                           self.blocksize, 1, 0):
240
            append(block_hash(block))
241

    
242
        return hashes
243

    
244
    def block_stor_file(self, archipelagoobject):
245
        """Read blocks from buffered file object and store them. Return:
246
           (bytes read, list of hashes, list of hashes that were missing)
247
        """
248
        blocksize = self.blocksize
249
        block_stor = self.block_stor
250
        hashlist = []
251
        hextend = hashlist.extend
252
        storedlist = []
253
        sextend = storedlist.extend
254
        lastsize = 0
255

    
256
        for block in file_sync_read_chunks(archipelagoobject, blocksize, 1, 0):
257
            hl, sl = block_stor((block,))
258
            hextend(hl)
259
            sextend(sl)
260
            lastsize = len(block)
261

    
262
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
263
        return size, hashlist, storedlist