Revision f3525003

b/snf-pithos-backend/pithos/backends/lib/hashfiler/archipelagoblocker.py
1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
import os, re
37

  
38
from context_archipelago import ArchipelagoObject, file_sync_read_chunks
39
from archipelago.common import (
40
    Request,
41
    xseg_reply_info,
42
    string_at,
43
    )
44

  
45
from pithos.workers import glue, monkey
46

  
47
monkey.patch_Request()
48

  
49
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF
50

  
51
class ArchipelagoBlocker(object):
52
    """Blocker.
53
       Required constructor parameters: blocksize, hashtype.
54
    """
55

  
56
    blocksize = None
57
    blockpool = None
58
    hashtype = None
59

  
60
    def __init__(self, **params):
61
        cfg = {}
62
        bcfg = open(BACKEND_ARCHIPELAGO_CONF).read()
63
        cfg['blockerb'] = re.search('\'blockerb_port\'\s*:\s*\d+',
64
                bcfg).group(0).split(':')[1]
65
        blocksize = params['blocksize']
66
        hashtype = params['hashtype']
67
        try:
68
            hasher = newhasher(hashtype)
69
        except ValueError:
70
            msg = "Variable hashtype '%s' is not available from hashlib"
71
            raise ValueError(msg % (hashtype,))
72

  
73
        hasher.update("")
74
        emptyhash = hasher.digest()
75

  
76
        self.blocksize = blocksize
77
        self.ioctx_pool = glue.WorkerGlue().ioctx_pool
78
        self.dst_port = int(cfg['blockerb'])
79
        self.hashtype = hashtype
80
        self.hashlen = len(emptyhash)
81
        self.emptyhash = emptyhash
82

  
83
    def _pad(self, block):
84
        return block + ('\x00' * (self.blocksize - len(block)))
85

  
86
    def _get_rear_block(self, blkhash, create=0):
87
        name = hexlify(blkhash)
88
        return ArchipelagoObject(name, self.ioctx_pool,self.dst_port,create)
89

  
90
    def _check_rear_block(self, blkhash):
91
        filename = hexlify(blkhash)
92
        ioctx = self.ioctx_pool.pool_get()
93
        req = Request.get_info_request(ioctx,self.dst_port,filename)
94
        req.submit()
95
        req.wait()
96
        ret = req.success()
97
        req.put()
98
        self.ioctx_pool.pool_put(ioctx)
99
        if ret:
100
            return True
101
        else:
102
            return False
103

  
104
    def block_hash(self, data):
105
        """Hash a block of data"""
106
        hasher = newhasher(self.hashtype)
107
        hasher.update(data.rstrip('\x00'))
108
        return hasher.digest()
109

  
110
    def block_ping(self, hashes):
111
        """Check hashes for existence and
112
           return those missing from block storage.
113
        """
114
        notfound = []
115
        append = notfound.append
116

  
117
        for h in hashes:
118
            if h not in notfound and not self._check_rear_block(h):
119
                append(h)
120

  
121
        return notfound
122

  
123
    def block_retr(self, hashes):
124
        """Retrieve blocks from storage by their hashes."""
125
        blocksize = self.blocksize
126
        blocks = []
127
        append = blocks.append
128
        block = None
129

  
130
        for h in hashes:
131
            if h == self.emptyhash:
132
                append(self._pad(''))
133
                continue
134
            with self._get_rear_block(h, 0) as rbl:
135
                if not rbl:
136
                    break
137
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
138
                    break  # there should be just one block there
139
            if not block:
140
                break
141
            append(self._pad(block))
142

  
143
        return blocks
144

  
145
    def block_retr_archipelago(self, hashes):
146
        """Retrieve blocks from storage by their hashes"""
147
        blocks = []
148
        append = blocks.append
149
        block = None
150

  
151
        ioctx = self.ioctx_pool.pool_get()
152
        archip_emptyhash = hexlify(self.emptyhash)
153

  
154
        for h in hashes:
155
            if h == archip_emptyhash:
156
                append(self._pad(''))
157
                continue
158
            req = Request.get_info_request(ioctx, self.dst_port, h)
159
            req.submit()
160
            req.wait()
161
            ret = req.success()
162
            if ret:
163
                info = req.get_data(_type=xseg_reply_info)
164
                size = info.contents.size
165
                req.put()
166
                req_data = Request.get_read_request(ioctx, self.dst_port, h,
167
                                                    size=size)
168
                req_data.submit()
169
                req_data.wait()
170
                ret_data = req_data.success()
171
                if ret_data:
172
                    append(self._pad(string_at(req_data.get_data(), size)))
173
                    req_data.put()
174
                else:
175
                    req_data.put()
176
                    self.ioctx_pool.put(ioctx)
177
                    raise Exception("Cannot retrieve Archipelago data.")
178
            else:
179
                req.put()
180
                self.ioctx_pool.pool_put(ioctx)
181
                raise Exception("Bad block file.")
182
        self.ioctx_pool.pool_put(ioctx)
183
        return blocks
184

  
185

  
186
    def block_stor(self, blocklist):
187
        """Store a bunch of blocks and return (hashes, missing).
188
           Hashes is a list of the hashes of the blocks,
189
           missing is a list of indices in that list indicating
190
           which blocks were missing from the store.
191
        """
192
        block_hash = self.block_hash
193
        hashlist = [block_hash(b) for b in blocklist]
194
        missing = [i for i, h in enumerate(hashlist) if not
195
                   self._check_rear_block(h)]
196
        for i in missing:
197
            with self._get_rear_block(hashlist[i], 1) as rbl:
198
                rbl.sync_write(blocklist[i])  # XXX: verify?
199

  
200
        return hashlist, missing
201

  
202
    def block_delta(self, blkhash, offset, data):
203
        """Construct and store a new block from a given block
204
           and a data 'patch' applied at offset. Return:
205
           (the hash of the new block, if the block already existed)
206
        """
207

  
208
        blocksize = self.blocksize
209
        if offset >= blocksize or not data:
210
            return None, None
211

  
212
        block = self.block_retr((blkhash,))
213
        if not block:
214
            return None, None
215

  
216
        block = block[0]
217
        newblock = block[:offset] + data
218
        if len(newblock) > blocksize:
219
            newblock = newblock[:blocksize]
220
        elif len(newblock) < blocksize:
221
            newblock += block[len(newblock):]
222

  
223
        h, a = self.block_stor((newblock,))
224
        return h[0], 1 if a else 0
225

  
226
    def block_hash_file(self, archipelagoobject):
227
        """Return the list of hashes (hashes map)
228
           for the blocks in a buffered file.
229
           Helper method, does not affect store.
230
        """
231
        hashes = []
232
        append = hashes.append
233
        block_hash = self.block_hash
234

  
235
        for block in file_sync_read_chunks(archipelagoobject, self.blocksize, 1, 0):
236
            append(block_hash(block))
237

  
238
        return hashes
239

  
240
    def block_stor_file(self, archipelagoobject):
241
        """Read blocks from buffered file object and store them. Return:
242
           (bytes read, list of hashes, list of hashes that were missing)
243
        """
244
        blocksize = self.blocksize
245
        block_stor = self.block_stor
246
        hashlist = []
247
        hextend = hashlist.extend
248
        storedlist = []
249
        sextend = storedlist.extend
250
        lastsize = 0
251

  
252
        for block in file_sync_read_chunks(archipelagoobject, blocksize, 1, 0):
253
            hl, sl = block_stor((block,))
254
            hextend(hl)
255
            sextend(sl)
256
            lastsize = len(block)
257

  
258
        size = (len(hashlist) - 1) * blocksize + lastsize if hashlist else 0
259
        return size, hashlist, storedlist
b/snf-pithos-backend/pithos/backends/lib/hashfiler/archipelagomapper.py
1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from binascii import hexlify
35
import os, re
36
import ctypes
37

  
38
from context_archipelago import ArchipelagoObject
39
from archipelago.common import (
40
    Request,
41
    xseg_reply_info,
42
    xseg_reply_map,
43
    xseg_reply_map_scatterlist,
44
    string_at,
45
    )
46

  
47
from pithos.workers import (
48
    glue,
49
    monkey,
50
    )
51

  
52
monkey.patch_Request()
53

  
54
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF
55

  
56

  
57
class ArchipelagoMapper(object):
58
    """Mapper.
59
       Required constructor parameters: namelen.
60
    """
61

  
62
    namelen = None
63

  
64
    def __init__(self, **params):
65
        self.params = params
66
        self.namelen = params['namelen']
67
        ioctx_pool = glue.WorkerGlue().ioctx_pool
68
        cfg = {}
69
        bcfg = open(BACKEND_ARCHIPELAGO_CONF).read()
70
        cfg['blockerm'] = re.search('\'blockerm_port\'\s*:\s*\d+',
71
                                        bcfg).group(0).split(':')[1]
72
        cfg['mapperd'] = re.search('\'mapper_port\'\s*:\s*\d+',
73
                                        bcfg).group(0).split(':')[1]
74
        self.ioctx_pool = ioctx_pool
75
        self.dst_port = int(cfg['blockerm'])
76
        self.mapperd_port = int(cfg['mapperd'])
77

  
78
    def _get_rear_map(self, maphash, create=0):
79
        name = hexlify(maphash)
80
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
81

  
82
    def _check_rear_map(self, maphash):
83
        name = hexlify(maphash)
84
        ioctx = self.ioctx_pool.pool_get()
85
        req = Request.get_info_request(ioctx, self.dst_port, name)
86
        req.submit()
87
        req.wait()
88
        ret = req.success()
89
        req.put()
90
        self.ioctx_pool.pool_put(ioctx)
91
        if ret:
92
            return True
93
        else:
94
            return False
95

  
96
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
97
        """Return as a list, part of the hashes map of an object
98
           at the given block offset.
99
           By default, return the whole hashes map.
100
        """
101
        namelen = self.namelen
102
        hashes = ()
103
        ioctx = self.ioctx_pool.pool_get()
104
        req = Request.get_info_request(ioctx, self.dst_port,
105
                                       hexlify(maphash))
106
        req.submit()
107
        req.wait()
108
        ret = req.success()
109
        if ret:
110
            info = req.get_data(_type=xseg_reply_info)
111
            size = int(info.contents.size)
112
            req.put()
113
        else:
114
            req.put()
115
            self.ioctx_pool.pool_put(ioctx)
116
            raise RuntimeError("Hashmap '%s' doesn't exists" % hexlify(maphash))
117
        req = Request.get_read_request(ioctx, self.dst_port,
118
                                       hexlify(maphash), size = size)
119
        req.submit()
120
        req.wait()
121
        ret = req.success()
122
        if ret:
123
            data = string_at(req.get_data(),size)
124
            req.put()
125
            self.ioctx_pool.pool_put(ioctx)
126
            for idx in xrange(0,len(data),namelen):
127
                hashes = hashes + (data[idx:idx+namelen],)
128
            hashes = list(hashes)
129
        else:
130
            req.put()
131
            self.ioctx_pool.pool_put(ioctx)
132
            raise RuntimeError("Hashmap '%s' doesn't exists" % hexlify(maphash))
133
        return hashes
134

  
135
    def map_retr_archipelago(self, maphash, size):
136
        """Retrieve Archipelago mapfile"""
137
        ioctx = self.ioctx_pool.pool_get()
138
        maphash = maphash.split("archip:")[1]
139
        req = Request.get_mapr_request(ioctx, self.mapperd_port, maphash,
140
                                       offset=0, size=size)
141
        req.submit()
142
        req.wait()
143
        ret = req.success()
144
        if ret:
145
            data = req.get_data(xseg_reply_map)
146
            Segsarray = xseg_reply_map_scatterlist * data.contents.cnt
147
            segs = Segsarray.from_address(ctypes.addressof(data.contents.segs))
148
            req.put()
149
        else:
150
            req.put()
151
            self.ioctx_pool.pool_put(ioctx)
152
            raise Exception("Could not retrieve Archipelago mapfile.")
153
        self.ioctx_pool.pool_put(ioctx)
154
        return [string_at(segs[idx].target, segs[idx].targetlen) for idx in xrange(len(segs))]
155

  
156
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
157
        """Store hashes in the given hashes map."""
158
        namelen = self.namelen
159
        if self._check_rear_map(maphash):
160
            return
161
        with self._get_rear_map(maphash, 1) as rmap:
162
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
b/snf-pithos-backend/pithos/backends/lib/hashfiler/context_archipelago.py
1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from os import SEEK_CUR, SEEK_SET
35
from archipelago.common import (
36
                        Request,
37
                        string_at,
38
                    )
39
from pithos.workers import monkey
40
monkey.patch_Request()
41

  
42
_zeros = ''
43

  
44

  
45
def zeros(nr):
46
    global _zeros
47
    size = len(_zeros)
48
    if nr == size:
49
        return _zeros
50

  
51
    if nr > size:
52
        _zeros += '\0' * (nr - size)
53
        return _zeros
54

  
55
    if nr < size:
56
        _zeros = _zeros[:nr]
57
        return _zeros
58

  
59

  
60
def file_sync_write_chunks(archipelagoobject, chunksize, offset,
61
                           chunks, size=None):
62
    """Write given chunks to the given buffered file object.
63
       Writes never span across chunk boundaries.
64
       If size is given stop after or pad until size bytes have been written.
65
    """
66
    padding = 0
67
    cursize = chunksize * offset
68
    archipelagoobject.seek(cursize)
69
    for chunk in chunks:
70
        if padding:
71
            archipelagoobject.sync_write(buffer(zeros(chunksize), 0, padding))
72
        if size is not None and cursize + chunksize >= size:
73
            chunk = chunk[:chunksize - (cursize - size)]
74
            archipelagoobject.sync_write(chunk)
75
            cursize += len(chunk)
76
            break
77
        archipelagoobject.sync_write(chunk)
78
        padding = chunksize - len(chunk)
79

  
80
    padding = size - cursize if size is not None else 0
81
    if padding <= 0:
82
        return
83

  
84
    q, r = divmod(padding, chunksize)
85
    for x in xrange(q):
86
        archipelagoobject.sync_write(zeros(chunksize))
87
    archipelagoobject.sync_write(buffer(zeros(chunksize), 0, r))
88

  
89

  
90
def file_sync_read_chunks(archipelagoobject, chunksize, nr, offset=0):
91
    """Read and yield groups of chunks from a buffered file object at offset.
92
       Reads never span accros chunksize boundaries.
93
    """
94
    archipelagoobject.seek(offset * chunksize)
95
    while nr:
96
        remains = chunksize
97
        chunk = ''
98
        while 1:
99
            s = archipelagoobject.sync_read(remains)
100
            if not s:
101
                if chunk:
102
                    yield chunk
103
                return
104
            chunk += s
105
            remains -= len(s)
106
            if remains <= 0:
107
                break
108
        yield chunk
109
        nr -= 1
110

  
111

  
112
class ArchipelagoObject(object):
113
    __slots__ = ("name", "ioctx_pool", "dst_port", "create", "offset")
114

  
115
    def __init__(self, name, ioctx_pool, dst_port=None, create=0):
116
        self.name = name
117
        self.ioctx_pool = ioctx_pool
118
        self.create = create
119
        self.dst_port = dst_port
120
        self.offset = 0
121

  
122
    def __enter__(self):
123
        return self
124

  
125
    def __exit__(self, exc, arg, trace):
126
        return False
127

  
128
    def seek(self, offset, whence=SEEK_SET):
129
        if whence == SEEK_CUR:
130
            offset += self.offset
131
        self.offset = offset
132
        return offset
133

  
134
    def tell(self):
135
        return self.offset
136

  
137
    def truncate(self, size):
138
        raise NotImplementedError("File truncation is not implemented yet \
139
                                   in archipelago")
140

  
141
    def sync_write(self, data):
142
        ioctx = self.ioctx_pool.pool_get() 
143
        req = Request.get_write_request(ioctx, self.dst_port, self.name,
144
                                        data=data, offset=self.offset,
145
                                        datalen=len(data))
146
        req.submit()
147
        req.wait()
148
        ret = req.success()
149
        req.put()
150
        self.ioctx_pool.pool_put(ioctx)
151
        if ret:
152
            self.offset += len(data)
153
        else:
154
            raise IOError("archipelago: Write request error")
155

  
156
    def sync_write_chunks(self, chunksize, offset, chunks, size=None):
157
        return file_sync_write_chunks(self, chunksize, offset, chunks,size)
158

  
159
    def sync_read(self, size):
160
        read = Request.get_read_request
161
        data = ''
162
        datalen = 0
163
        dsize = size
164
        while 1:
165
            ioctx = self.ioctx_pool.pool_get() 
166
            req = read(ioctx, self.dst_port,
167
                       self.name,size=dsize-datalen,offset=self.offset)
168
            req.submit()
169
            req.wait()
170
            ret = req.success()
171
            if ret:
172
                s = string_at(req.get_data(),dsize-datalen)
173
            else:
174
                s = None
175
            req.put()
176
            self.ioctx_pool.pool_put(ioctx)
177
            if not s:
178
                break
179
            data += s
180
            datalen += len(s)
181
            self.offset += len(s)
182
            if datalen >= size:
183
                break
184
        return data
185

  
186
    def sync_read_chunks(self, chunksize, nr, offset=0):
187
        return file_sync_read_chunks(self, chunksize, nr, offset)

Also available in: Unified diff