Revision c30635bf

b/snf-pithos-backend/pithos/backends/lib/hashfiler/blocker.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from os import makedirs
35
from os.path import isdir, realpath, exists, join
36 34
from hashlib import new as newhasher
37 35
from binascii import hexlify
38 36

  
39
from context_file import ContextFile, file_sync_read_chunks
37
from radosblocker import RadosBlocker
38
from fileblocker import FileBlocker
39

  
40
def intersect(a, b):
41
    """ return the intersection of two lists """
42
    return list(set(a) & set(b))
43

  
44
def union(a, b):
45
    """ return the union of two lists """
46
    return list(set(a) | set(b))
40 47

  
41 48

  
42 49
class Blocker(object):
43 50
    """Blocker.
44
       Required constructor parameters: blocksize, blockpath, hashtype.
51
       Required constructor parameters: blocksize, blockpath, hashtype,
52
       blockpool.
45 53
    """
46 54

  
47
    blocksize = None
48
    blockpath = None
49
    hashtype = None
50

  
51 55
    def __init__(self, **params):
52
        blocksize = params['blocksize']
53
        blockpath = params['blockpath']
54
        blockpath = realpath(blockpath)
55
        if not isdir(blockpath):
56
            if not exists(blockpath):
57
                makedirs(blockpath)
58
            else:
59
                raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
60

  
61
        hashtype = params['hashtype']
62
        try:
63
            hasher = newhasher(hashtype)
64
        except ValueError:
65
            msg = "Variable hashtype '%s' is not available from hashlib"
66
            raise ValueError(msg % (hashtype,))
67

  
68
        hasher.update("")
69
        emptyhash = hasher.digest()
70

  
71
        self.blocksize = blocksize
72
        self.blockpath = blockpath
73
        self.hashtype = hashtype
74
        self.hashlen = len(emptyhash)
75
        self.emptyhash = emptyhash
76

  
77
    def _pad(self, block):
78
        return block + ('\x00' * (self.blocksize - len(block)))
79

  
80
    def _get_rear_block(self, blkhash, create=0):
81
        filename = hexlify(blkhash)
82
        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
83
        if not exists(dir):
84
            makedirs(dir)
85
        name = join(dir, filename)
86
        return ContextFile(name, create)
87

  
88
    def _check_rear_block(self, blkhash):
89
        filename = hexlify(blkhash)
90
        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
91
        name = join(dir, filename)
92
        return exists(name)
56
        params['blockpool'] = 'blocks'
57
        self.rblocker = RadosBlocker(**params)
58
        self.fblocker = FileBlocker(**params)
59
        self.hashlen = self.rblocker.hashlen
60

  
61
#    def _get_rear_block(self, blkhash, create=0):
62
#        return self.rblocker._get_rear_block(blkhash, create)
63

  
64
#    def _check_rear_block(self, blkhash):
65
#        return self.rblocker._check_rear_block(blkhash)
66
#        return self.rblocker._check_rear_block(blkhash) and
67
#                self.fblocker._check_rear_block(blkhash)
93 68

  
94 69
    def block_hash(self, data):
95 70
        """Hash a block of data"""
96
        hasher = newhasher(self.hashtype)
97
        hasher.update(data.rstrip('\x00'))
98
        return hasher.digest()
71
        return self.rblocker.block_hash(data)
99 72

  
100 73
    def block_ping(self, hashes):
101 74
        """Check hashes for existence and
102 75
           return those missing from block storage.
103 76
        """
104
        notfound = []
105
        append = notfound.append
106

  
107
        for h in hashes:
108
            if h not in notfound and not self._check_rear_block(h):
109
                append(h)
110

  
111
        return notfound
77
#        return self.rblocker.block_ping(hashes)
78
        r = self.rblocker.block_ping(hashes)
79
        f = self.fblocker.block_ping(hashes)
80
        return union(r, f)
112 81

  
113 82
    def block_retr(self, hashes):
114 83
        """Retrieve blocks from storage by their hashes."""
115
        blocksize = self.blocksize
116
        blocks = []
117
        append = blocks.append
118
        block = None
119

  
120
        for h in hashes:
121
            if h == self.emptyhash:
122
                append(self._pad(''))
123
                continue
124
            with self._get_rear_block(h, 0) as rbl:
125
                if not rbl:
126
                    break
127
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
128
                    break # there should be just one block there
129
            if not block:
130
                break
131
            append(self._pad(block))
132

  
133
        return blocks
84
        return self.fblocker.block_retr(hashes)
134 85

  
135 86
    def block_stor(self, blocklist):
136 87
        """Store a bunch of blocks and return (hashes, missing).
......
138 89
           missing is a list of indices in that list indicating
139 90
           which blocks were missing from the store.
140 91
        """
141
        block_hash = self.block_hash
142
        hashlist = [block_hash(b) for b in blocklist]
143
        mf = None
144
        missing = [i for i, h in enumerate(hashlist) if not self._check_rear_block(h)]
145
        for i in missing:
146
            with self._get_rear_block(hashlist[i], 1) as rbl:
147
                 rbl.sync_write(blocklist[i]) #XXX: verify?
92
#        return self.rblocker.block_stor(blocklist)
93
        (hashes, r_missing) = self.rblocker.block_stor(blocklist)
94
        (_, f_missing) = self.fblocker.block_stor(blocklist)
95
        return (hashes, union(r_missing, f_missing))
148 96

  
149
        return hashlist, missing
150 97

  
151 98
    def block_delta(self, blkhash, offset, data):
152 99
        """Construct and store a new block from a given block
153 100
           and a data 'patch' applied at offset. Return:
154 101
           (the hash of the new block, if the block already existed)
155 102
        """
103
#        return self.rblocker.block_delta(blkhash, offset, data)
156 104

  
157
        blocksize = self.blocksize
158
        if offset >= blocksize or not data:
105
        (f_hash, f_existed) = self.fblocker.block_delta(blkhash, offset, data)
106
        (r_hash, r_existed) = self.rblocker.block_delta(blkhash, offset, data)
107
        if not r_hash and not f_hash:
159 108
            return None, None
160

  
161
        block = self.block_retr((blkhash,))
162
        if not block:
163
            return None, None
164
        
165
        block = block[0]
166
        newblock = block[:offset] + data
167
        if len(newblock) > blocksize:
168
            newblock = newblock[:blocksize]
169
        elif len(newblock) < blocksize:
170
            newblock += block[len(newblock):]
171

  
172
        h, a = self.block_stor((newblock,))
173
        return h[0], 1 if a else 0
174

  
175
    def block_hash_file(self, openfile):
176
        """Return the list of hashes (hashes map)
177
           for the blocks in a buffered file.
178
           Helper method, does not affect store.
179
        """
180
        hashes = []
181
        append = hashes.append
182
        block_hash = self.block_hash
183

  
184
        for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
185
            append(block_hash(block))
186

  
187
        return hashes
188

  
189
    def block_stor_file(self, openfile):
190
        """Read blocks from buffered file object and store them. Return:
191
           (bytes read, list of hashes, list of hashes that were missing)
192
        """
193
        blocksize = self.blocksize
194
        block_stor = self.block_stor
195
        hashlist = []
196
        hextend = hashlist.extend
197
        storedlist = []
198
        sextend = storedlist.extend
199
        lastsize = 0
200

  
201
        for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
202
            hl, sl = block_stor((block,))
203
            hextend(hl)
204
            sextend(sl)
205
            lastsize = len(block)
206

  
207
        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
208
        return size, hashlist, storedlist
209

  
109
        if not r_hash:
110
            block = self.fblocker.block_retr((blkhash,))
111
            if not block:
112
                return None, None
113
            block = block[0]
114
            newblock = block[:offset] + data
115
            if len(newblock) > blocksize:
116
                newblock = newblock[:blocksize]
117
            elif len(newblock) < blocksize:
118
                newblock += block[len(newblock):]
119
            r_hash, r_existed = self.rblocker.block_stor((newblock,))
120

  
121
        return f_hash, 1 if r_existed and f_existed else 0
b/snf-pithos-backend/pithos/backends/lib/hashfiler/context_object.py
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from os import SEEK_CUR, SEEK_SET
35
from rados import Ioctx, ObjectNotFound
36

  
37
_zeros = ''
38

  
39

  
40
def zeros(nr):
41
    global _zeros
42
    size = len(_zeros)
43
    if nr == size:
44
        return _zeros
45

  
46
    if nr > size:
47
        _zeros += '\0' * (nr - size)
48
        return _zeros
49

  
50
    if nr < size:
51
        _zeros = _zeros[:nr]
52
        return _zeros
53

  
54

  
55
def file_sync_write_chunks(radosobject, chunksize, offset, chunks, size=None):
56
    """Write given chunks to the given buffered file object.
57
       Writes never span across chunk boundaries.
58
       If size is given stop after or pad until size bytes have been written.
59
    """
60
    padding = 0
61
    cursize = chunksize * offset
62
    radosobject.seek(cursize)
63
    for chunk in chunks:
64
        if padding:
65
            radosobject.sync_write(buffer(zeros(chunksize), 0, padding))
66
        if size is not None and cursize + chunksize >= size:
67
            chunk = chunk[:chunksize - (cursize - size)]
68
            radosobject.sync_write(chunk)
69
            cursize += len(chunk)
70
            break
71
        radosobject.sync_write(chunk)
72
        padding = chunksize - len(chunk)
73

  
74
    padding = size - cursize if size is not None else 0
75
    if padding <= 0:
76
        return
77

  
78
    q, r = divmod(padding, chunksize)
79
    for x in xrange(q):
80
        radosobject.sunc_write(zeros(chunksize))
81
    radosobject.sync_write(buffer(zeros(chunksize), 0, r))
82

  
83
def file_sync_read_chunks(radosobject, chunksize, nr, offset=0):
84
    """Read and yield groups of chunks from a buffered file object at offset.
85
       Reads never span accros chunksize boundaries.
86
    """
87
    radosobject.seek(offset * chunksize)
88
    while nr:
89
        remains = chunksize
90
        chunk = ''
91
        while 1:
92
            s = radosobject.sync_read(remains)
93
            if not s:
94
                if chunk:
95
                    yield chunk
96
                return
97
            chunk += s
98
            remains -= len(s)
99
            if remains <= 0:
100
                break
101
        yield chunk
102
        nr -= 1
103

  
104
class RadosObject(object):
105
    __slots__ = ("name", "ioctx", "create", "offset")
106

  
107
    def __init__(self, name, ioctx, create=0):
108
        self.name = name
109
        self.ioctx = ioctx
110
        self.create = create
111
        self.offset = 0
112
        #self.dirty = 0
113

  
114
    def __enter__(self):
115
        return self
116

  
117
    def __exit__(self, exc, arg, trace):
118
        return False
119

  
120
    def seek(self, offset, whence=SEEK_SET):
121
        if whence == SEEK_CUR:
122
            offset += self.offset
123
        self.offset = offset
124
        return offset
125

  
126
    def tell(self):
127
        return self.offset
128

  
129
    def truncate(self, size):
130
        self.ioctx.trunc(self.name, size)
131

  
132
    def sync_write(self, data):
133
        #self.dirty = 1
134
        self.ioctx.write(self.name, data, self.offset)
135
        self.offset += len(data)
136

  
137
    def sync_write_chunks(self, chunksize, offset, chunks, size=None):
138
        #self.dirty = 1
139
        return file_sync_write_chunks(self, chunksize, offset, chunks, size)
140

  
141
    def sync_read(self, size):
142
        read = self.ioctx.read
143
        data = ''
144
        datalen = 0
145
        while 1:
146
            try:
147
                s = read(self.name, size-datalen, self.offset)
148
            except ObjectNotFound:
149
                s = None
150
            if not s:
151
                break
152
            data += s
153
            datalen += len(s)
154
            self.offset += len(s)
155
            if datalen >= size:
156
                break
157
        return data
158

  
159
    def sync_read_chunks(self, chunksize, nr, offset=0):
160
        return file_sync_read_chunks(self, chunksize, nr, offset)
161

  
b/snf-pithos-backend/pithos/backends/lib/hashfiler/fileblocker.py
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from os import makedirs
35
from os.path import isdir, realpath, exists, join
36
from hashlib import new as newhasher
37
from binascii import hexlify
38

  
39
from context_file import ContextFile, file_sync_read_chunks
40

  
41

  
42
class FileBlocker(object):
43
    """Blocker.
44
       Required constructor parameters: blocksize, blockpath, hashtype.
45
    """
46

  
47
    blocksize = None
48
    blockpath = None
49
    hashtype = None
50

  
51
    def __init__(self, **params):
52
        blocksize = params['blocksize']
53
        blockpath = params['blockpath']
54
        blockpath = realpath(blockpath)
55
        if not isdir(blockpath):
56
            if not exists(blockpath):
57
                makedirs(blockpath)
58
            else:
59
                raise ValueError("Variable blockpath '%s' is not a directory" % (blockpath,))
60

  
61
        hashtype = params['hashtype']
62
        try:
63
            hasher = newhasher(hashtype)
64
        except ValueError:
65
            msg = "Variable hashtype '%s' is not available from hashlib"
66
            raise ValueError(msg % (hashtype,))
67

  
68
        hasher.update("")
69
        emptyhash = hasher.digest()
70

  
71
        self.blocksize = blocksize
72
        self.blockpath = blockpath
73
        self.hashtype = hashtype
74
        self.hashlen = len(emptyhash)
75
        self.emptyhash = emptyhash
76

  
77
    def _pad(self, block):
78
        return block + ('\x00' * (self.blocksize - len(block)))
79

  
80
    def _get_rear_block(self, blkhash, create=0):
81
        filename = hexlify(blkhash)
82
        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
83
        if not exists(dir):
84
            makedirs(dir)
85
        name = join(dir, filename)
86
        return ContextFile(name, create)
87

  
88
    def _check_rear_block(self, blkhash):
89
        filename = hexlify(blkhash)
90
        dir = join(self.blockpath, filename[0:2], filename[2:4], filename[4:6])
91
        name = join(dir, filename)
92
        return exists(name)
93

  
94
    def block_hash(self, data):
95
        """Hash a block of data"""
96
        hasher = newhasher(self.hashtype)
97
        hasher.update(data.rstrip('\x00'))
98
        return hasher.digest()
99

  
100
    def block_ping(self, hashes):
101
        """Check hashes for existence and
102
           return those missing from block storage.
103
        """
104
        notfound = []
105
        append = notfound.append
106

  
107
        for h in hashes:
108
            if h not in notfound and not self._check_rear_block(h):
109
                append(h)
110

  
111
        return notfound
112

  
113
    def block_retr(self, hashes):
114
        """Retrieve blocks from storage by their hashes."""
115
        blocksize = self.blocksize
116
        blocks = []
117
        append = blocks.append
118
        block = None
119

  
120
        for h in hashes:
121
            if h == self.emptyhash:
122
                append(self._pad(''))
123
                continue
124
            with self._get_rear_block(h, 0) as rbl:
125
                if not rbl:
126
                    break
127
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
128
                    break # there should be just one block there
129
            if not block:
130
                break
131
            append(self._pad(block))
132

  
133
        return blocks
134

  
135
    def block_stor(self, blocklist):
136
        """Store a bunch of blocks and return (hashes, missing).
137
           Hashes is a list of the hashes of the blocks,
138
           missing is a list of indices in that list indicating
139
           which blocks were missing from the store.
140
        """
141
        block_hash = self.block_hash
142
        hashlist = [block_hash(b) for b in blocklist]
143
        mf = None
144
        missing = [i for i, h in enumerate(hashlist) if not self._check_rear_block(h)]
145
        for i in missing:
146
            with self._get_rear_block(hashlist[i], 1) as rbl:
147
                 rbl.sync_write(blocklist[i]) #XXX: verify?
148

  
149
        return hashlist, missing
150

  
151
    def block_delta(self, blkhash, offset, data):
152
        """Construct and store a new block from a given block
153
           and a data 'patch' applied at offset. Return:
154
           (the hash of the new block, if the block already existed)
155
        """
156

  
157
        blocksize = self.blocksize
158
        if offset >= blocksize or not data:
159
            return None, None
160

  
161
        block = self.block_retr((blkhash,))
162
        if not block:
163
            return None, None
164
        
165
        block = block[0]
166
        newblock = block[:offset] + data
167
        if len(newblock) > blocksize:
168
            newblock = newblock[:blocksize]
169
        elif len(newblock) < blocksize:
170
            newblock += block[len(newblock):]
171

  
172
        h, a = self.block_stor((newblock,))
173
        return h[0], 1 if a else 0
174

  
175
    def block_hash_file(self, openfile):
176
        """Return the list of hashes (hashes map)
177
           for the blocks in a buffered file.
178
           Helper method, does not affect store.
179
        """
180
        hashes = []
181
        append = hashes.append
182
        block_hash = self.block_hash
183

  
184
        for block in file_sync_read_chunks(openfile, self.blocksize, 1, 0):
185
            append(block_hash(block))
186

  
187
        return hashes
188

  
189
    def block_stor_file(self, openfile):
190
        """Read blocks from buffered file object and store them. Return:
191
           (bytes read, list of hashes, list of hashes that were missing)
192
        """
193
        blocksize = self.blocksize
194
        block_stor = self.block_stor
195
        hashlist = []
196
        hextend = hashlist.extend
197
        storedlist = []
198
        sextend = storedlist.extend
199
        lastsize = 0
200

  
201
        for block in file_sync_read_chunks(openfile, blocksize, 1, 0):
202
            hl, sl = block_stor((block,))
203
            hextend(hl)
204
            sextend(sl)
205
            lastsize = len(block)
206

  
207
        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
208
        return size, hashlist, storedlist
209

  
b/snf-pithos-backend/pithos/backends/lib/hashfiler/filemapper.py
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from os import makedirs, unlink
35
from os.path import isdir, realpath, exists, join
36
from binascii import hexlify
37

  
38
from context_file import ContextFile
39

  
40

  
41
class FileMapper(object):
42
    """Mapper.
43
       Required constructor parameters: mappath, namelen.
44
    """
45
    
46
    mappath = None
47
    namelen = None
48

  
49
    def __init__(self, **params):
50
        self.params = params
51
        self.namelen = params['namelen']
52
        mappath = realpath(params['mappath'])
53
        if not isdir(mappath):
54
            if not exists(mappath):
55
                makedirs(mappath)
56
            else:
57
                raise ValueError("Variable mappath '%s' is not a directory" % (mappath,))
58
        self.mappath = mappath
59

  
60
    def _get_rear_map(self, maphash, create=0):
61
        filename = hexlify(maphash)
62
        dir = join(self.mappath, filename[0:2], filename[2:4], filename[4:6])
63
        if not exists(dir):
64
            makedirs(dir)
65
        name = join(dir, filename)
66
        return ContextFile(name, create)
67

  
68
    def _check_rear_map(self, maphash):
69
        filename = hexlify(maphash)
70
        dir = join(self.mappath, filename[0:2], filename[2:4], filename[4:6])
71
        name = join(dir, filename)
72
        return exists(name)
73

  
74
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
75
        """Return as a list, part of the hashes map of an object
76
           at the given block offset.
77
           By default, return the whole hashes map.
78
        """
79
        namelen = self.namelen
80
        hashes = ()
81

  
82
        with self._get_rear_map(maphash, 0) as rmap:
83
            if rmap:
84
                hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
85
        return hashes
86

  
87
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
88
        """Store hashes in the given hashes map."""
89
        namelen = self.namelen
90
        if self._check_rear_map(maphash):
91
            return
92
        with self._get_rear_map(maphash, 1) as rmap:
93
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
94

  
b/snf-pithos-backend/pithos/backends/lib/hashfiler/mapper.py
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from os import makedirs, unlink
35
from os.path import isdir, realpath, exists, join
36 34
from binascii import hexlify
37 35

  
38
from context_file import ContextFile
39

  
36
from radosmapper import RadosMapper
37
from filemapper import FileMapper
40 38

  
41 39
class Mapper(object):
42 40
    """Mapper.
43
       Required constructor parameters: mappath, namelen.
41
       Required constructor parameters: mappath, namelen, mappool.
44 42
    """
45
    
46
    mappath = None
47
    namelen = None
48 43

  
49 44
    def __init__(self, **params):
50
        self.params = params
51
        self.namelen = params['namelen']
52
        mappath = realpath(params['mappath'])
53
        if not isdir(mappath):
54
            if not exists(mappath):
55
                makedirs(mappath)
56
            else:
57
                raise ValueError("Variable mappath '%s' is not a directory" % (mappath,))
58
        self.mappath = mappath
45
        params['mappool'] = 'maps'
46
        self.rmap = RadosMapper(**params)
47
        self.fmap = FileMapper(**params)
59 48

  
60
    def _get_rear_map(self, maphash, create=0):
61
        filename = hexlify(maphash)
62
        dir = join(self.mappath, filename[0:2], filename[2:4], filename[4:6])
63
        if not exists(dir):
64
            makedirs(dir)
65
        name = join(dir, filename)
66
        return ContextFile(name, create)
49
#    def _get_rear_map(self, maphash, create=0):
50
#        return self.fmap._get_rear_map(maphash, create)
67 51

  
68
    def _check_rear_map(self, maphash):
69
        filename = hexlify(maphash)
70
        dir = join(self.mappath, filename[0:2], filename[2:4], filename[4:6])
71
        name = join(dir, filename)
72
        return exists(name)
52
#    def _check_rear_map(self, maphash):
53
#        return self.rmap._check_rear_map(maphash)
54
#        return self.rmap._check_rear_map(maphash) and
55
#                self.fmap._check_rear_map(maphash)
73 56

  
74 57
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
75 58
        """Return as a list, part of the hashes map of an object
76 59
           at the given block offset.
77 60
           By default, return the whole hashes map.
78 61
        """
79
        namelen = self.namelen
80
        hashes = ()
81

  
82
        with self._get_rear_map(maphash, 0) as rmap:
83
            if rmap:
84
                hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
85
        return hashes
62
        return self.fmap.map_retr(maphash, blkoff, nr)
86 63

  
87 64
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
88 65
        """Store hashes in the given hashes map."""
89
        namelen = self.namelen
90
        if self._check_rear_map(maphash):
91
            return
92
        with self._get_rear_map(maphash, 1) as rmap:
93
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
94

  
66
        self.rmap.map_stor(maphash, hashes, blkoff, create)
67
        self.fmap.map_stor(maphash, hashes, blkoff, create)
b/snf-pithos-backend/pithos/backends/lib/hashfiler/radosblocker.py
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from hashlib import new as newhasher
35
from binascii import hexlify
36
from rados import *
37

  
38
from context_object import RadosObject, file_sync_read_chunks
39

  
40
CEPH_CONF_FILE="/etc/ceph/ceph.conf"
41

  
42
class RadosBlocker(object):
43
    """Blocker.
44
       Required constructor parameters: blocksize, blockpath, hashtype.
45
    """
46

  
47
    blocksize = None
48
    blockpool = None
49
    hashtype = None
50

  
51
    def __init__(self, **params):
52
        blocksize = params['blocksize']
53
        blockpool = params['blockpool']
54

  
55
        rados = Rados(conffile=CEPH_CONF_FILE)
56
        rados.connect()
57
        if not rados.pool_exists(blockpool):
58
            rados.pool_create(blockpool)
59

  
60
        ioctx = rados.open_ioctx(blockpool)
61

  
62
        hashtype = params['hashtype']
63
        try:
64
            hasher = newhasher(hashtype)
65
        except ValueError:
66
            msg = "Variable hashtype '%s' is not available from hashlib"
67
            raise ValueError(msg % (hashtype,))
68

  
69
        hasher.update("")
70
        emptyhash = hasher.digest()
71

  
72
        self.blocksize = blocksize
73
        self.blockpool = blockpool
74
        self.rados = rados
75
        self.ioctx = ioctx
76
        self.hashtype = hashtype
77
        self.hashlen = len(emptyhash)
78
        self.emptyhash = emptyhash
79

  
80
    def _pad(self, block):
81
        return block + ('\x00' * (self.blocksize - len(block)))
82

  
83
    def _get_rear_block(self, blkhash, create=0):
84
        name = hexlify(blkhash)
85
        return RadosObject(name, self.ioctx, create)
86

  
87
    def _check_rear_block(self, blkhash):
88
        filename = hexlify(blkhash)
89
        try:
90
            self.ioctx.stat(filename)
91
            return True
92
        except ObjectNotFound:
93
            return False
94

  
95
    def block_hash(self, data):
96
        """Hash a block of data"""
97
        hasher = newhasher(self.hashtype)
98
        hasher.update(data.rstrip('\x00'))
99
        return hasher.digest()
100

  
101
    def block_ping(self, hashes):
102
        """Check hashes for existence and
103
           return those missing from block storage.
104
        """
105
        notfound = []
106
        append = notfound.append
107

  
108
        for h in hashes:
109
            if h not in notfound and not self._check_rear_block(h):
110
                append(h)
111

  
112
        return notfound
113

  
114
    def block_retr(self, hashes):
115
        """Retrieve blocks from storage by their hashes."""
116
        blocksize = self.blocksize
117
        blocks = []
118
        append = blocks.append
119
        block = None
120

  
121
        for h in hashes:
122
            if h == self.emptyhash:
123
                append(self._pad(''))
124
                continue
125
            with self._get_rear_block(h, 0) as rbl:
126
                if not rbl:
127
                    break
128
                for block in rbl.sync_read_chunks(blocksize, 1, 0):
129
                    break # there should be just one block there
130
            if not block:
131
                break
132
            append(self._pad(block))
133

  
134
        return blocks
135

  
136
    def block_stor(self, blocklist):
137
        """Store a bunch of blocks and return (hashes, missing).
138
           Hashes is a list of the hashes of the blocks,
139
           missing is a list of indices in that list indicating
140
           which blocks were missing from the store.
141
        """
142
        block_hash = self.block_hash
143
        hashlist = [block_hash(b) for b in blocklist]
144
        mf = None
145
        missing = [i for i, h in enumerate(hashlist) if not self._check_rear_block(h)]
146
        for i in missing:
147
            with self._get_rear_block(hashlist[i], 1) as rbl:
148
                 rbl.sync_write(blocklist[i]) #XXX: verify?
149

  
150
        return hashlist, missing
151

  
152
    def block_delta(self, blkhash, offset, data):
153
        """Construct and store a new block from a given block
154
           and a data 'patch' applied at offset. Return:
155
           (the hash of the new block, if the block already existed)
156
        """
157

  
158
        blocksize = self.blocksize
159
        if offset >= blocksize or not data:
160
            return None, None
161

  
162
        block = self.block_retr((blkhash,))
163
        if not block:
164
            return None, None
165

  
166
        block = block[0]
167
        newblock = block[:offset] + data
168
        if len(newblock) > blocksize:
169
            newblock = newblock[:blocksize]
170
        elif len(newblock) < blocksize:
171
            newblock += block[len(newblock):]
172

  
173
        h, a = self.block_stor((newblock,))
174
        return h[0], 1 if a else 0
175

  
176
    def block_hash_file(self, radosobject):
177
        """Return the list of hashes (hashes map)
178
           for the blocks in a buffered file.
179
           Helper method, does not affect store.
180
        """
181
        hashes = []
182
        append = hashes.append
183
        block_hash = self.block_hash
184

  
185
        for block in file_sync_read_chunks(radosobject, self.blocksize, 1, 0):
186
            append(block_hash(block))
187

  
188
        return hashes
189

  
190
    def block_stor_file(self, radosobject):
191
        """Read blocks from buffered file object and store them. Return:
192
           (bytes read, list of hashes, list of hashes that were missing)
193
        """
194
        blocksize = self.blocksize
195
        block_stor = self.block_stor
196
        hashlist = []
197
        hextend = hashlist.extend
198
        storedlist = []
199
        sextend = storedlist.extend
200
        lastsize = 0
201

  
202
        for block in file_sync_read_chunks(radosobject, blocksize, 1, 0):
203
            hl, sl = block_stor((block,))
204
            hextend(hl)
205
            sextend(sl)
206
            lastsize = len(block)
207

  
208
        size = (len(hashlist) -1) * blocksize + lastsize if hashlist else 0
209
        return size, hashlist, storedlist
210

  
b/snf-pithos-backend/pithos/backends/lib/hashfiler/radosmapper.py
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from binascii import hexlify
35

  
36
from context_object import RadosObject, file_sync_read_chunks
37
from rados import *
38

  
39
CEPH_CONF_FILE="/etc/ceph/ceph.conf"
40

  
41
class RadosMapper(object):
42
    """Mapper.
43
       Required constructor parameters: mappath, namelen.
44
    """
45

  
46
    mappool = None
47
    namelen = None
48

  
49
    def __init__(self, **params):
50
        self.params = params
51
        self.namelen = params['namelen']
52
        mappool = params['mappool']
53

  
54
        rados = Rados(conffile=CEPH_CONF_FILE)
55
        rados.connect()
56
        if not rados.pool_exists(mappool):
57
            rados.pool_create(mappool)
58

  
59
        ioctx = rados.open_ioctx(mappool)
60

  
61
        self.mappool = mappool
62
        self.rados = rados
63
        self.ioctx = ioctx
64
        self.mappool = mappool
65

  
66
    def _get_rear_map(self, maphash, create=0):
67
        name = hexlify(maphash)
68
        return RadosObject(name, self.ioctx, create)
69

  
70
    def _check_rear_map(self, maphash):
71
        name = hexlify(maphash)
72
        try:
73
            self.ioctx.stat(name)
74
            return True
75
        except ObjectNotFound:
76
            return False
77

  
78
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
79
        """Return as a list, part of the hashes map of an object
80
           at the given block offset.
81
           By default, return the whole hashes map.
82
        """
83
        namelen = self.namelen
84
        hashes = ()
85

  
86
        with self._get_rear_map(maphash, 0) as rmap:
87
            if rmap:
88
                hashes = list(rmap.sync_read_chunks(namelen, nr, blkoff))
89
        return hashes
90

  
91
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
92
        """Store hashes in the given hashes map."""
93
        namelen = self.namelen
94
        if self._check_rear_map(maphash):
95
            return
96
        with self._get_rear_map(maphash, 1) as rmap:
97
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)
98

  
b/snf-pithos-backend/pithos/backends/lib/hashfiler/store.py
40 40
    """Store.
41 41
       Required constructor parameters: path, block_size, hash_algorithm, umask.
42 42
    """
43
    
43

  
44 44
    def __init__(self, **params):
45 45
        umask = params['umask']
46 46
        if umask is not None:
47 47
            os.umask(umask)
48
        
48

  
49 49
        path = params['path']
50 50
        if path and not os.path.exists(path):
51 51
            os.makedirs(path)
52 52
        if not os.path.isdir(path):
53 53
            raise RuntimeError("Cannot open path '%s'" % (path,))
54
        
54

  
55 55
        p = {'blocksize': params['block_size'],
56 56
             'blockpath': os.path.join(path + '/blocks'),
57 57
             'hashtype': params['hash_algorithm']}
......
59 59
        p = {'mappath': os.path.join(path + '/maps'),
60 60
             'namelen': self.blocker.hashlen}
61 61
        self.mapper = Mapper(**p)
62
    
62

  
63 63
    def map_get(self, name):
64 64
        return self.mapper.map_retr(name)
65
    
65

  
66 66
    def map_put(self, name, map):
67 67
        self.mapper.map_stor(name, map)
68
    
68

  
69 69
    def map_delete(self, name):
70 70
        pass
71
    
71

  
72 72
    def block_get(self, hash):
73 73
        blocks = self.blocker.block_retr((hash,))
74 74
        if not blocks:
75 75
            return None
76 76
        return blocks[0]
77
    
77

  
78 78
    def block_put(self, data):
79 79
        hashes, absent = self.blocker.block_stor((data,))
80 80
        return hashes[0]
81
    
81

  
82 82
    def block_update(self, hash, offset, data):
83 83
        h, e = self.blocker.block_delta(hash, offset, data)
84 84
        return h
85
    
85

  
86 86
    def block_search(self, map):
87 87
        return self.blocker.block_ping(map)
88 88

  

Also available in: Unified diff